帶你手寫基於 Spring 的可插拔式 RPC 框架(三)通訊協議模塊

在寫代碼以前咱們先要想清楚幾個問題。java

  1. 咱們的框架到底要實現什麼功能?
    咱們要實現一個遠程調用的 RPC 協議。
  2. 最終實現效果是什麼樣的?
    咱們能像調用本地服務同樣調用遠程的服務。
  3. 怎樣實現上面的效果?
    前面幾章已經給你們說了,使用動態代理,在客戶端生成接口代理類使用,在代理類的 invoke 方法裏面將方法參數等信息組裝成 request 發給服務端,服務端須要起一個服務器一直等待接收這種消息,接收以後使用反射調
    用對應接口的實現類。

首先咱們須要實現底層的通訊的服務端和客戶端,能夠有一下幾種實現:spring

  1. 基於 Socket 的客戶端和服務端(同步阻塞式,不推薦),你們能夠看成一個編程練習,整個和系統沒有進行整合,純粹練習使用。
    基於 Socket 的服務端。
    啓動一個阻塞式的 socket server,加入一個線程池實現僞異步。編程

    public class SocketServer {
    
         private static SocketServer INSTANCE = new SocketServer();
    
         private SocketServer(){};
    
         public static SocketServer getInstance() {
             return INSTANCE;
         }
    
         //沒有核心線程數量控制的線程池,最大線程數是 Integer 的最大值,多線程實現僞異步
         ExecutorService executorService = Executors.newCachedThreadPool();
    
         /**
          * 發佈服務,bio 模型
          * @param service
          * @param port
          */
         public void publiser(int port){
             try (ServerSocket serverSocket  = new ServerSocket(port);)
             {
                 while (true){
                     Socket socket = serverSocket.accept();//接收請求
                     executorService.execute(new SocketHandler(socket));
                 }
             } catch (IOException e) {
                 e.printStackTrace();
             }
         }
     }

    對應的 hanlder,使用反射調用對應的服務,並經過 sokcet 寫回結果。bootstrap

    public class SocketHandler implements Runnable{
    
         private Socket socket;
    
         public SocketHandler(Socket socket) {
             this.socket = socket;
         }
    
         @Override
         public void run() {
             try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                  ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());)
             {
                 Object o = inputStream.readObject(); //readObject 是 java 反序列化的過程
                 System.out.println(o);
                 Object result = invoke((RpcRequest) o);
                 //寫回結果
                 outputStream.writeObject(result);
                 outputStream.flush();
             } catch (IOException e) {
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
         }
    
         private Object invoke(RpcRequest invocation){
    
             //根據方法名和參數類型在 service 裏獲取方法
             try {
                 String interFaceName = invocation.getInterfaceName();
                 Class impClass = Class.forName(invocation.getImpl());
    
                 Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
                 String result = (String)method.invoke(impClass.newInstance(),invocation.getParams());
                 return result;
             } catch (NoSuchMethodException e) {
                 e.printStackTrace();
             } catch (IllegalAccessException e) {
                 e.printStackTrace();
             } catch (InvocationTargetException e) {
                 e.printStackTrace();
             } catch (InstantiationException e) {
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
    
             return null;
         }
     }

    在看客戶端,拼裝參數,發送給 socket 服務端。緩存

    public class SocketClient {
    
         private static SocketClient INSTANCE = new SocketClient();
    
         private SocketClient(){};
    
         public static SocketClient getInstance() {
             return INSTANCE;
         }
    
         private Socket newSocket(String host, Integer port) {
             System.out.println("建立一個新的 socket 鏈接");
             try {
                 Socket socket = new Socket(host, port);
                 return socket;
             } catch (IOException e) {
                 System.out.println("創建鏈接失敗");
                 e.printStackTrace();
             }
             return null;
         }
    
         public Object sendRequest(String host, Integer port,RpcRequest rpcRequest) {
             Socket socket = newSocket(host,port);
             try (
                 ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
                 ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());)
             {
                 outputStream.writeObject(rpcRequest);
                 outputStream.flush();
    
    
                 Object result = inputStream.readObject();
    
                 inputStream.close();
                 outputStream.close();
                 return result;
    
             } catch (Exception e) {
                 e.printStackTrace();
             }
             return null;
         }
     }
    經過上面的代碼相信你們已經明白了這個流程了,就是一個客戶端與服務端通訊的過程,將須要調用的方法的參數傳到服務端,服務端經過反射完成調用,最後返回結果給客戶端。
    下面正式開始。
  2. 基於 Http 請求的客戶端和基於 Tomcat 的服務端。
    基於 Tomcat 的服務端,單例模式,只有一個啓動服務的 start 方法,監聽到的請求經過 DispatcherServlet 處理。tomcat

    public class HttpServer {
    
         private static HttpServer INSTANCE = new HttpServer();
    
         private HttpServer(){}
    
         public static HttpServer getInstance(){
             return INSTANCE;
         }
    
    
         /**
          *
          * servlet 容器,tomcat
          * @param hostname
          * @param port
          */
    
         public void start(String hostname,Integer port){
    
             Tomcat tomcat = new Tomcat();
             Server server = tomcat.getServer();
             Service service = server.findService("Tomcat");
    
             Connector connector = new Connector();
             connector.setPort(port);
    
             Engine engine = new StandardEngine();
             engine.setDefaultHost(hostname);
    
             Host host = new StandardHost();
             host.setName(hostname);
    
             String contextPath = "";
             Context context = new StandardContext();
             context.setPath(contextPath);
             context.addLifecycleListener(new Tomcat.FixContextListener()); //聲明週期監聽器
    
             host.addChild(context);
             engine.addChild(host);
    
             service.setContainer(engine);
             service.addConnector(connector);
    
             tomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet());
             context.addServletMappingDecoded("/*","dispatcher");
    
             try {
                 tomcat.start();
                 tomcat.getServer().await();
             } catch (LifecycleException e) {
                 e.printStackTrace();
             }
         }
     }

    下面來看請求分發器 DispatcherServlet 的實現,將請求派發給 HttpServletHandler 實現。服務器

    /**
      * tomcat 是 servlet 容器,寫一個 servlet
      *
      */
     public class DispatcherServlet extends HttpServlet {
    
         @Override
         protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
             new HttpServletHandler().handler(req,resp);
         }
     }

    HttpServletHandler 的實現其實就是解析 request,經過反射調用最後返回結果。多線程

    public class HttpServletHandler{
    
         public void handler(HttpServletRequest req, HttpServletResponse resp) {
    
             try(InputStream inputStream = req.getInputStream();
                 OutputStream outputStream =resp.getOutputStream();){
                 ObjectInputStream ois = new ObjectInputStream(inputStream);
                 RpcRequest invocation = (RpcRequest) ois.readObject();
    
                 // 從註冊中心根據接口,找接口的實現類
                 String interFaceName = invocation.getInterfaceName();
                 Class impClass = Class.forName(invocation.getImpl());
    
    
                 Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
                 Object result = method.invoke(impClass.newInstance(),invocation.getParams());
    
                 RpcResponse rpcResponse = new RpcResponse();
                 rpcResponse.setResponseId(invocation.getRequestId());
                 rpcResponse.setData(result);
                 IOUtils.write(toByteArray(rpcResponse),outputStream);
             }catch (IOException e){
                 e.printStackTrace();
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             } catch (NoSuchMethodException e) {
                 e.printStackTrace();
             } catch (IllegalAccessException e) {
                 e.printStackTrace();
             } catch (InvocationTargetException e) {
                 e.printStackTrace();
             } catch (InstantiationException e) {
                 e.printStackTrace();
             }
         }
    
         public byte[] toByteArray (Object obj) {
             byte[] bytes = null;
             ByteArrayOutputStream bos = new ByteArrayOutputStream();
             try {
                 ObjectOutputStream oos = new ObjectOutputStream(bos);
                 oos.writeObject(obj);
                 oos.flush();
                 bytes = bos.toByteArray ();
                 oos.close();
                 bos.close();
             } catch (IOException ex) {
                 ex.printStackTrace();
             }
             return bytes;
         }
    
     }

    最後來看客戶端的實現,經過 post 方法發送數據,最後解析服務端返回的結果。併發

    public class HttpClient {
    
         private static HttpClient INSTANCE = new HttpClient();
    
         private HttpClient(){}
    
         public static HttpClient getInstance(){
             return INSTANCE;
         }
    
         public Object post(String hostname, Integer port, RpcRequest invocation){
    
             try{
                 URL url = new URL("http",hostname,port,"/");
                 HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
    
                 httpURLConnection.setRequestMethod("POST");
                 httpURLConnection.setDoOutput(true);
    
                 OutputStream outputStream = httpURLConnection.getOutputStream();
                 ObjectOutputStream oos = new ObjectOutputStream(outputStream);
                 oos.writeObject(invocation);
                 oos.flush();
                 oos.close();
    
                 InputStream inputStream = httpURLConnection.getInputStream();
                 RpcResponse rpcResponse =  (RpcResponse)toObject(IOUtils.toByteArray(inputStream));
                 return rpcResponse.getData();
    
    
             } catch (MalformedURLException e) {
                 e.printStackTrace();
             } catch (IOException e) {
                 e.printStackTrace();
             }
             return null;
    
         }
    
         public Object toObject (byte[] bytes) {
             Object obj = null;
             try {
                 ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
                 ObjectInputStream ois = new ObjectInputStream (bis);
                 obj = ois.readObject();
                 ois.close();
                 bis.close();
             } catch (IOException ex) {
                 ex.printStackTrace();
             } catch (ClassNotFoundException ex) {
                 ex.printStackTrace();
             }
             return obj;
         }
     }
  3. Netty 模型的客戶端和服務端。
    基於 Netty 的服務端,裏面的編碼器和解碼器是咱們本身實現的,你們能夠先用我註釋掉的那部分,等咱們寫到編碼解碼器的時候再替換。app

    public class NettyServer {
    
         private static NettyServer INSTANCE = new NettyServer();
    
         private static Executor executor = Executors.newCachedThreadPool();
    
         private final static int MESSAGE_LENGTH = 4;
    
         private NettyServer(){};
    
         public static NettyServer getInstance(){
             return INSTANCE;
         }
    
    
         private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
    
         public static void submit(Runnable t){
             executor.execute(t);
         }
    
         public void start(String host, Integer port){
             EventLoopGroup bossGroup = new NioEventLoopGroup(1);
             EventLoopGroup workerGroup = new NioEventLoopGroup();
    
             try{
                 final ServerBootstrap bootstrap = new ServerBootstrap();
                 bootstrap.group(bossGroup,workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 128)
                 .childOption(ChannelOption.SO_KEEPALIVE, true)
                 .childHandler(new ChannelInitializer<SocketChannel>(){
    
                     @Override
                     protected void initChannel(SocketChannel arg0) throws Exception {
                         ChannelPipeline pipeline = arg0.pipeline();
                          //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder
                         //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4); 
     //                  pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, NettyServer.MESSAGE_LENGTH, 0, NettyServer.MESSAGE_LENGTH));
                         //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭
     //                  pipeline.addLast(new LengthFieldPrepender(NettyServer.MESSAGE_LENGTH));
     //                  pipeline.addLast(new ObjectEncoder());
                         //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可
     //                  pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                         //註冊解碼器NettyDecoderHandler
                         pipeline.addLast(new NettyDecoderHandler(RpcRequest.class, serializeType));
                         //註冊編碼器NettyEncoderHandler
                         pipeline.addLast(new NettyEncoderHandler(serializeType));
                         pipeline.addLast("handler", new NettyServerHandler());
    
                     }
    
                 });
                 Channel channel = bootstrap.bind(host, port).sync().channel();
                 System.out.println("Server start listen at " + port);
             }catch(Exception e){
                 bossGroup.shutdownGracefully();
                 workerGroup.shutdownGracefully();
             }
         }
    
    
     }

    服務端對應的 handler,netty 都是這種 handler 模式,handler 裏面也是將這個接收的 request 放入線程池中處理。

    public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
    
         private ChannelHandlerContext context;
    
    
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception {
             System.out.println("server channelRead...");
             System.out.println(ctx.channel().remoteAddress() + "->server:" + rpcRequest.toString());
             InvokeTask it = new InvokeTask(rpcRequest,ctx);
             NettyServer.submit(it);
         }
    
    
         @Override
         public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
             this.context = ctx;     
         }
    
     }

    給出 InvokeTask 的對應實現。

    public class InvokeTask implements Runnable{
    
         private RpcRequest invocation;
         private ChannelHandlerContext ctx;
    
         public InvokeTask(RpcRequest invocation,ChannelHandlerContext ctx) {
             super();
             this.invocation = invocation;
             this.ctx = ctx;
         }
    
    
         @Override
         public void run() {
    
             // 從註冊中心根據接口,找接口的實現類
             String interFaceName = invocation.getInterfaceName();
             Class impClass = null;
             try {
                 impClass = Class.forName(invocation.getImpl());
             } catch (ClassNotFoundException e) {
                 e.printStackTrace();
             }
    
             Method method;
             Object result = null;
             try {
                 method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes());
                 //這塊考慮實現類,是否是應該在 spring 裏面拿
                 result = method.invoke(impClass.newInstance(),invocation.getParams());
             } catch (Exception e) {
                 e.printStackTrace();
             }
             RpcResponse rpcResponse = new RpcResponse();
             rpcResponse.setResponseId(invocation.getRequestId());
             rpcResponse.setData(result);
             ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() {
                 public void operationComplete(ChannelFuture channelFuture) throws Exception {
                     System.out.println("RPC Server Send message-id respone:" + invocation.getRequestId());
                 }
             });
    
         }
    
     }

    再來看客戶端,客戶端有兩種實現,一種是不能複用 handler(能夠當即爲 connection)的模式,這種模式併發不過高,另外一種是可以複用 handler 的 handlerPool 模式。

    不能複用的模式。

    public class NettyClient {
         private static NettyClient INSTANCE = new NettyClient();
    
         private final static int parallel = Runtime.getRuntime().availableProcessors() * 2;
    
         private NettyClient(){};
    
         public static NettyClient getInstance(){
             return INSTANCE;
         }
    
         private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
    
         public void start(String host,Integer port){
    
             Bootstrap bootstrap = new Bootstrap();
             EventLoopGroup group = new NioEventLoopGroup(parallel);
    
             try{
                 bootstrap.group(group)
                         .channel(NioSocketChannel.class)
                         .handler(new ChannelInitializer<SocketChannel>(){
    
                             @Override
                             protected void initChannel(SocketChannel arg0) throws Exception {
                                 ChannelPipeline pipeline = arg0.pipeline();
                                 //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder
                                 //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4);
     //                          pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                 //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭
     //                          pipeline.addLast(new LengthFieldPrepender(4));
     //                          pipeline.addLast(new ObjectEncoder());
                                 //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可
     //                          pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                                 //註冊Netty編碼器
                                 System.out.println("11111111:"+serializeType.getSerializeType());
                                 pipeline.addLast(new NettyEncoderHandler(serializeType));
                                 //註冊Netty解碼器
                                 pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
                                 pipeline.addLast("handler", new NettyClientHandler());
    
                             }
    
                         });
                 ChannelFuture future = bootstrap.connect(host,port).sync();
             }catch(Exception e){
                 group.shutdownGracefully();
             }
    
    
         }
     }

    在看可複用的模式,固定 handler 數量,目前框架中使用的是可複用模式,上面的不可複用的沒用上,爲了給你們理解,沒有刪除。

    public class NettyChannelPoolFactory {
    
         //初始化Netty Channel阻塞隊列的長度,該值爲可配置信息
         private static final int channelConnectSize = 10;
    
         //Key爲服務提供者地址,value爲Netty Channel阻塞隊列
         private static final Map<URL, ArrayBlockingQueue<Channel>> channelPoolMap = new ConcurrentHashMap<>();
    
         private static NettyChannelPoolFactory INSTANCE = new NettyChannelPoolFactory();
    
         private NettyChannelPoolFactory(){};
    
         public static NettyChannelPoolFactory getInstance(){
             return INSTANCE;
         }
    
         private List<ServiceProvider> serviceMetaDataList = new ArrayList<>();
    
         //根據配置文件裏面須要調用的接口信息來初始化 channel
         public void initNettyChannelPoolFactory(Map<String, List<ServiceProvider>> providerMap){
    
             //將服務提供者信息存入serviceMetaDataList列表
             Collection<List<ServiceProvider>> collectionServiceMetaDataList = providerMap.values();
             for (List<ServiceProvider> serviceMetaDataModels : collectionServiceMetaDataList) {
                 if (CollectionUtils.isEmpty(serviceMetaDataModels)) {
                     continue;
                 }
                 serviceMetaDataList.addAll(serviceMetaDataModels);
             }
    
             //獲取服務提供者地址列表
             Set<URL> set = new HashSet<>();
             for (ServiceProvider serviceMetaData : serviceMetaDataList) {
                 String serviceIp = serviceMetaData.getIp();
                 int servicePort = serviceMetaData.getPort();
                 URL url = new URL(serviceIp,servicePort);
                 set.add(url);
             }
    
             for(URL url:set){
                 //爲每一個 ip端口 創建多個 channel,而且放入阻塞隊列中
                 int channelSize = 0;
                 while(channelSize < channelConnectSize){
                     Channel channel = null;
                     while(channel == null){
                         channel = registerChannel(url);
                     }
    
                     channelSize ++;
    
                     ArrayBlockingQueue<Channel> queue = channelPoolMap.get(url);
                     if(queue == null){
                         queue = new ArrayBlockingQueue<Channel>(channelConnectSize);
                         channelPoolMap.put(url, queue);
                     }
                     queue.offer(channel);
    
                 }
             }
    
         }
    
         public Channel registerChannel(URL url) {
             final SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize());
             Bootstrap bootstrap = new Bootstrap();
             EventLoopGroup group = new NioEventLoopGroup(10);
    
             try{
                 bootstrap.group(group)
                     .channel(NioSocketChannel.class)
                     .handler(new ChannelInitializer<SocketChannel>(){
    
                     @Override
                     protected void initChannel(SocketChannel arg0) throws Exception {
                         ChannelPipeline pipeline = arg0.pipeline();
                         //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder
                         //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4);
     //                  pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                         //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭
     //                  pipeline.addLast(new LengthFieldPrepender(4));
     //                  pipeline.addLast(new ObjectEncoder());
                         //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可
     //                  pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                         pipeline.addLast(new NettyEncoderHandler(serializeType));
                         //註冊Netty解碼器
                         pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType));
                         pipeline.addLast("handler", new NettyClientHandler());
    
                     }
    
                 });
                 ChannelFuture future = bootstrap.connect(url.getHost(),url.getPort()).sync();
                 Channel channel = future.channel();
                 //等待Netty服務端鏈路創建通知信號
                 final CountDownLatch connectedLatch = new CountDownLatch(1);
    
                 final List<Boolean> isSuccess = new ArrayList<>(1);
                 future.addListener(new ChannelFutureListener(){
    
                     @Override
                     public void operationComplete(ChannelFuture future)
                             throws Exception {
                         if(future.isSuccess()){
                             isSuccess.add(true);
                         }else{
                             isSuccess.add(false);
                         }
                         connectedLatch.countDown();
                     }
    
                 });
                 connectedLatch.await();
                 if(isSuccess.get(0)){
                     return channel;
                 }
             }catch(Exception e){
                 group.shutdownGracefully();
                 e.printStackTrace();
             }
             return null;
         }
         //根據 url 獲取阻塞隊列
         public ArrayBlockingQueue<Channel> acqiure(URL url){
             System.out.println(channelPoolMap.toString());
             return channelPoolMap.get(url);
         }
    
         //channel 使用完畢後進行回收
         public void release(ArrayBlockingQueue<Channel> queue, Channel channel, URL url){
             if(queue == null){
                 return;
             }
             //須要檢查 channel 是否可用,若是不可用,從新註冊一個放入阻塞隊列中
             if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
                 if (channel != null) {
                     channel.deregister().syncUninterruptibly().awaitUninterruptibly();
                     channel.closeFuture().syncUninterruptibly().awaitUninterruptibly();
                 }
                 Channel c = null;
                 while(c == null){
                     c = registerChannel(url);
                 }
                 queue.offer(c);
                 return;
             }
             queue.offer(channel);
         }
    
     }

    給出對應的 handler 實現,在 channelread0 裏面讀取 server 端返回的信息,由於 netty 是異步的,因此須要 MessageCallBack 來實現咱們的同步調用。

    public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
    
         private ChannelHandlerContext context;
    
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                 throws Exception {
             cause.printStackTrace();
             ctx.close();
         }
    
         @Override
         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
             System.out.println("中止時間是:"+new Date());
             System.out.println("HeartBeatClientHandler channelInactive");
         }
    
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             this.context = ctx;
             System.out.println("激活時間是:"+ctx.channel().id());
         }
    
    
         @Override
         protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception {
             // String res = (String)msg;
             //RpcResponse rpcResponse = (RpcResponse)msg;
             String responseId = rpcResponse.getResponseId();
             MessageCallBack callBack = ResponseHolder.getInstance().mapCallBack.get(responseId);
             if(callBack != null){
                 ResponseHolder.getInstance().mapCallBack.remove(responseId);
                 callBack.over(rpcResponse);
             }
         }
     }

    MessageCallBack 的實現。

    public class MessageCallBack {
    
         private RpcRequest rpcRequest;
    
         private RpcResponse rpcResponse;
    
         private Lock lock = new ReentrantLock();
    
         private Condition finish = lock.newCondition();
    
         public MessageCallBack(RpcRequest request) {
             this.rpcRequest = request;
         }
    
         public Object start() throws InterruptedException {
             try {
                 lock.lock();
                 //設定一下超時時間,rpc服務器過久沒有相應的話,就默認返回空吧。
                 finish.await(10*1000, TimeUnit.MILLISECONDS);
                 if (this.rpcResponse != null) {
                     return this.rpcResponse.getData();
                 } else {
                     return null;
                 }
             } finally {
                 lock.unlock();
             }
         }
    
         public void over(RpcResponse reponse) {
             try {
                 lock.lock();
                 this.rpcResponse = reponse;
                 finish.signal();
             } finally {
                 lock.unlock();
             }
         }
    
     }

    既然是可插拔式框架,那麼底層協議必定要是可選擇的,因此咱們定義一個頂層接口來支持咱們選擇協議。
    start 方法是啓動服務端,send 方法是客戶端發送數據。

    public interface Procotol {
    
         void start(URL url);
         Object send(URL url, RpcRequest invocation);
     }

    對應的三個協議的接口實現。
    Netty 的實現

    public class DubboProcotol implements Procotol {
         @Override
         public void start(URL url) {
             NettyServer nettyServer = NettyServer.getInstance();
             nettyServer.start(url.getHost(),url.getPort());
         }
    
         @Override
         public Object send(URL url, RpcRequest invocation) {
             ArrayBlockingQueue<Channel> queue = NettyChannelPoolFactory.getInstance().acqiure(url);
             Channel channel = null;
             try {
                 channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
                 if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){
                     channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS);
                     if(channel == null){
                         channel = NettyChannelPoolFactory.getInstance().registerChannel(url);
                     }
                 }
                 //將本次調用的信息寫入Netty通道,發起異步調用
                 ChannelFuture channelFuture = channel.writeAndFlush(invocation);
                 channelFuture.syncUninterruptibly();
                 MessageCallBack callback = new MessageCallBack(invocation);
                 ResponseHolder.getInstance().mapCallBack.put(invocation.getRequestId(), callback);
                 try {
                     return callback.start();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
                 return null;
             } catch (InterruptedException e1) {
                 e1.printStackTrace();
             }finally{
                 System.out.println("release:"+channel.id());
                 NettyChannelPoolFactory.getInstance().release(queue, channel, url);
             }
             return null;
         }
     }

    http 的實現

    public class HttpProcotol implements Procotol {
         @Override
         public void start(URL url) {
             HttpServer httpServer = HttpServer.getInstance();
             httpServer.start(url.getHost(),url.getPort());
         }
    
         @Override
         public Object send(URL url, RpcRequest invocation) {
             HttpClient httpClient = HttpClient.getInstance();
             return httpClient.post(url.getHost(),url.getPort(),invocation);
         }
     }

    Socket 的實現

    public class SocketProcotol implements Procotol {
         @Override
         public void start(URL url) {
             SocketServer socketServer = SocketServer.getInstance();
             socketServer.publiser(url.getPort());
         }
    
         @Override
         public Object send(URL url, RpcRequest invocation) {
             SocketClient socketClient = SocketClient.getInstance();
             return socketClient.sendRequest(url.getHost(),url.getPort(),invocation);
         }
     }

    這樣一個可選擇協議的模型就實現了,咱們可已經過這個模塊選擇協議,而且與服務端通訊。

相關文章
相關標籤/搜索