在上一博文中 跟你們講了RPC的實現思路 思路畢竟只是思路 那麼這篇就帶着源碼給你們講解下實現過程當中的各個具體問題git
/** *調用端代碼及spring配置 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ClientContext.xml"}) public class Client { @Test public void start(){ Service service= (Service) RPC.call(Service.class); System.out.println("測試Integer,Double類型傳參與返回String對象:"+service.stringMethodIntegerArgsTest(233,666.66)); //輸出string233666.66 } } /** *Service抽象及其實現 *調用與實現端共同依賴Service */ public interface Service { String stringMethodIntegerArgsTest(Integer a,Double b); } /** * ServiceImpl實現端對接口的具體實現 */ public class ServiceImpl implements Service { @Override public String stringMethodIntegerArgsTest(Integer a, Double b) { return "String"+a+b; } }
1.0版本分3個包github
調用端只需如此調用 定義接口 傳入接口類類型 後面調用的接口內的方法 所有是由實現端實現spring
Service service= (Service) RPC.call(Service.class);
這句的做用其實就是生成調用端的動態代理json
/** * 暴露調用端使用的靜態方法 爲抽象接口生成動態代理對象 * TODO 考慮後面優化不在使用時仍需強轉 * @param cls 抽象接口的類類型 * @return 接口生成的動態代理對象 */ public static Object call(Class cls){ RPCProxyHandler handler=new RPCProxyHandler(); Object proxyObj=Proxy.newProxyInstance(cls.getClassLoader(),new Class<?>[]{cls},handler); return proxyObj; }
RPCProxyHandler爲動態代理的方法被調用後的回調方法 每一個方法被調用時都會執行這個invoke數組
/** * 代理抽象接口調用的方法 * 發送方法信息給服務端 加鎖等待服務端返回 * @param proxy * @param method * @param args * @return * @throws Throwable */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RPCRequest request=new RPCRequest(); request.setRequestID(buildRequestID(method.getName())); request.setClassName(method.getDeclaringClass().getName());//返回表示聲明由此 Method 對象表示的方法的類或接口的Class對象 request.setMethodName(method.getName()); // request.setParameterTypes(method.getParameterTypes());//返回形參類型 request.setParameters(args);//輸入的實參 RPCRequestNet.requestLockMap.put(request.getRequestID(),request); RPCRequestNet.connect().send(request); //調用用結束後移除對應的condition映射關係 RPCRequestNet.requestLockMap.remove(request.getRequestID()); return request.getResult();//目標方法的返回結果 }
也就是收集對應調用的接口的信息 而後send給實現端 那麼這個requestLockMap又是做何做用的呢服務器
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String responseJson= (String) msg; RPCResponse response= (RPCResponse) RPC.responseDecode(responseJson); synchronized (RPCRequestNet.requestLockMap.get(response.getRequestID())) { //喚醒在該對象鎖上wait的線程 RPCRequest request= (RPCRequest) RPCRequestNet.requestLockMap.get(response.getRequestID()); request.setResult(response.getResult()); request.notifyAll(); } }
接下來是RPCRequestNet.connect().send(request);方法 connect方法實際上是單例模式返回RPCRequestNet實例 RPCRequestNet構造方法是使用netty對實現端進行TCP連接 send方法以下網絡
try { //判斷鏈接是否已完成 只在鏈接啓動時會產生阻塞 if (RPCRequestHandler.channelCtx==null){ connectlock.lock(); //掛起等待鏈接成功 System.out.println("正在等待鏈接實現端"); connectCondition.await(); connectlock.unlock(); } //編解碼對象爲json 發送請求 String requestJson= null; try { requestJson = RPC.requestEncode(request); } catch (JsonProcessingException e) { e.printStackTrace(); } ByteBuf requestBuf= Unpooled.copiedBuffer(requestJson.getBytes()); RPCRequestHandler.channelCtx.writeAndFlush(requestBuf); System.out.println("調用"+request.getRequestID()+"已發送"); //掛起等待實現端處理完畢返回 TODO 後續配置超時時間 synchronized (request) { //放棄對象鎖 並阻塞等待notify request.wait(); } System.out.println("調用"+request.getRequestID()+"接收完畢"); } catch (InterruptedException e) { e.printStackTrace(); }
condition和lock一樣是爲了同步等待異步IO返回用的 send方法基本是編解碼json後發送給實現端app
/** *實現端代碼及spring配置 */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations={"file:src/test/java/rpcTest/ServerContext.xml"}) public class Server { @Test public void start(){ //啓動spring後纔可啓動 防止容器還沒有加載完畢 RPC.start(); } }
出了配置spring以外 實現端就一句 RPC.start() 其實就是啓動netty服務器 服務端的處理客戶端信息回調以下負載均衡
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { String requestJson= (String) msg; System.out.println("receive request:"+requestJson); RPCRequest request= RPC.requestDeocde(requestJson); Object result=InvokeServiceUtil.invoke(request); //netty的write方法並無直接寫入通道(爲避免屢次喚醒多路複用選擇器) //而是把待發送的消息放到緩衝數組中,flush方法再所有寫到通道中 // ctx.write(resp); //記得加分隔符 否則客戶端一直不會處理 RPCResponse response=new RPCResponse(); response.setRequestID(request.getRequestID()); response.setResult(result); String respStr=RPC.responseEncode(response); ByteBuf responseBuf= Unpooled.copiedBuffer(respStr.getBytes()); ctx.writeAndFlush(responseBuf); }
主要是編解碼json 反射對應的方法 咱們看看反射的工具類
/** * 反射調用相應實現類並結果 * @param request * @return */ public static Object invoke(RPCRequest request){ Object result=null;//內部變量必須賦值 全局變量纔不用 //實現類名 String implClassName= RPC.getServerConfig().getServerImplMap().get(request.getClassName()); try { Class implClass=Class.forName(implClassName); Object[] parameters=request.getParameters(); int parameterNums=request.getParameters().length; Class[] parameterTypes=new Class[parameterNums]; for (int i = 0; i <parameterNums ; i++) { parameterTypes[i]=parameters[i].getClass(); } Method method=implClass.getDeclaredMethod(request.getMethodName(),parameterTypes); Object implObj=implClass.newInstance(); result=method.invoke(implObj,parameters); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } return result; }
解析Parameters getClass獲取他們的類類型 反射調用對應的方法
最後是藉助spring配置基礎配置 我寫了兩個類 ServerConfig ClientConfig 做爲調用端和服務端的配置 只需在spring中配置這兩個bean 並啓動IOC容器便可
調用端
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.meizhuo.rpc.client.ClientConfig"> <property name="host" value="127.0.0.1"></property> <property name="port" value="9999"></property> </bean> </beans>
實現端
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean class="org.meizhuo.rpc.server.ServerConfig"> <property name="port" value="9999"></property> <property name="serverImplMap"> <map> <!--配置對應的抽象接口及其實現--> <entry key="rpcTest.Service" value="rpcTest.ServiceImpl"></entry> </map> </property> </bean> </beans>
咱們的框架是做爲一個依賴包引入的 咱們不可能在咱們的框架中讀取對應的spring xml 這樣徹底是去了框架的靈活性 那咱們怎麼在運行過程當中得到咱們所處於的IOC容器 已得到咱們的正確配置信息呢 答案是spring提供的ApplicationContextAware接口
/** * Created by wephone on 17-12-26. */ public class ClientConfig implements ApplicationContextAware { private String host; private int port; //調用超時時間 private long overtime; public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } public long getOvertime() { return overtime; } public void setOvertime(long overtime) { this.overtime = overtime; } /** * 加載Spring配置文件時,若是Spring配置文件中所定義的Bean類 * 若是該類實現了ApplicationContextAware接口 * 那麼在加載Spring配置文件時,會自動調用ApplicationContextAware接口中的 * @param applicationContext * @throws BeansException */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RPC.clientContext=applicationContext; } }
這樣咱們在RPC類內部就維護了一個靜態IOC容器的context 只需如此獲取配置 RPC.getServerConfig().getPort()
public static ServerConfig getServerConfig(){ return serverContext.getBean(ServerConfig.class); }
本例程僅爲1.0版本 後續博客中 會加入異常處理 zookeeper支持 負載均衡策略等 博客:zookeeper支持 歡迎持續關注 歡迎star 提issue