在寫代碼以前咱們先要想清楚幾個問題。java
首先咱們須要實現底層的通訊的服務端和客戶端,能夠有一下幾種實現:spring
基於 Socket 的客戶端和服務端(同步阻塞式,不推薦),你們能夠看成一個編程練習,整個和系統沒有進行整合,純粹練習使用。
基於 Socket 的服務端。
啓動一個阻塞式的 socket server,加入一個線程池實現僞異步。編程
public class SocketServer { private static SocketServer INSTANCE = new SocketServer(); private SocketServer(){}; public static SocketServer getInstance() { return INSTANCE; } //沒有核心線程數量控制的線程池,最大線程數是 Integer 的最大值,多線程實現僞異步 ExecutorService executorService = Executors.newCachedThreadPool(); /** * 發佈服務,bio 模型 * @param service * @param port */ public void publiser(int port){ try (ServerSocket serverSocket = new ServerSocket(port);) { while (true){ Socket socket = serverSocket.accept();//接收請求 executorService.execute(new SocketHandler(socket)); } } catch (IOException e) { e.printStackTrace(); } } }
對應的 hanlder,使用反射調用對應的服務,並經過 sokcet 寫回結果。bootstrap
public class SocketHandler implements Runnable{ private Socket socket; public SocketHandler(Socket socket) { this.socket = socket; } @Override public void run() { try (ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());) { Object o = inputStream.readObject(); //readObject 是 java 反序列化的過程 System.out.println(o); Object result = invoke((RpcRequest) o); //寫回結果 outputStream.writeObject(result); outputStream.flush(); } catch (IOException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } private Object invoke(RpcRequest invocation){ //根據方法名和參數類型在 service 裏獲取方法 try { String interFaceName = invocation.getInterfaceName(); Class impClass = Class.forName(invocation.getImpl()); Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes()); String result = (String)method.invoke(impClass.newInstance(),invocation.getParams()); return result; } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } }
在看客戶端,拼裝參數,發送給 socket 服務端。緩存
public class SocketClient { private static SocketClient INSTANCE = new SocketClient(); private SocketClient(){}; public static SocketClient getInstance() { return INSTANCE; } private Socket newSocket(String host, Integer port) { System.out.println("建立一個新的 socket 鏈接"); try { Socket socket = new Socket(host, port); return socket; } catch (IOException e) { System.out.println("創建鏈接失敗"); e.printStackTrace(); } return null; } public Object sendRequest(String host, Integer port,RpcRequest rpcRequest) { Socket socket = newSocket(host,port); try ( ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());) { outputStream.writeObject(rpcRequest); outputStream.flush(); Object result = inputStream.readObject(); inputStream.close(); outputStream.close(); return result; } catch (Exception e) { e.printStackTrace(); } return null; } }經過上面的代碼相信你們已經明白了這個流程了,就是一個客戶端與服務端通訊的過程,將須要調用的方法的參數傳到服務端,服務端經過反射完成調用,最後返回結果給客戶端。
基於 Http 請求的客戶端和基於 Tomcat 的服務端。
基於 Tomcat 的服務端,單例模式,只有一個啓動服務的 start 方法,監聽到的請求經過 DispatcherServlet 處理。tomcat
public class HttpServer { private static HttpServer INSTANCE = new HttpServer(); private HttpServer(){} public static HttpServer getInstance(){ return INSTANCE; } /** * * servlet 容器,tomcat * @param hostname * @param port */ public void start(String hostname,Integer port){ Tomcat tomcat = new Tomcat(); Server server = tomcat.getServer(); Service service = server.findService("Tomcat"); Connector connector = new Connector(); connector.setPort(port); Engine engine = new StandardEngine(); engine.setDefaultHost(hostname); Host host = new StandardHost(); host.setName(hostname); String contextPath = ""; Context context = new StandardContext(); context.setPath(contextPath); context.addLifecycleListener(new Tomcat.FixContextListener()); //聲明週期監聽器 host.addChild(context); engine.addChild(host); service.setContainer(engine); service.addConnector(connector); tomcat.addServlet(contextPath,"dispatcher", new DispatcherServlet()); context.addServletMappingDecoded("/*","dispatcher"); try { tomcat.start(); tomcat.getServer().await(); } catch (LifecycleException e) { e.printStackTrace(); } } }
下面來看請求分發器 DispatcherServlet 的實現,將請求派發給 HttpServletHandler 實現。服務器
/** * tomcat 是 servlet 容器,寫一個 servlet * */ public class DispatcherServlet extends HttpServlet { @Override protected void service(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { new HttpServletHandler().handler(req,resp); } }
HttpServletHandler 的實現其實就是解析 request,經過反射調用最後返回結果。多線程
public class HttpServletHandler{ public void handler(HttpServletRequest req, HttpServletResponse resp) { try(InputStream inputStream = req.getInputStream(); OutputStream outputStream =resp.getOutputStream();){ ObjectInputStream ois = new ObjectInputStream(inputStream); RpcRequest invocation = (RpcRequest) ois.readObject(); // 從註冊中心根據接口,找接口的實現類 String interFaceName = invocation.getInterfaceName(); Class impClass = Class.forName(invocation.getImpl()); Method method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes()); Object result = method.invoke(impClass.newInstance(),invocation.getParams()); RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setResponseId(invocation.getRequestId()); rpcResponse.setData(result); IOUtils.write(toByteArray(rpcResponse),outputStream); }catch (IOException e){ e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } } public byte[] toByteArray (Object obj) { byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray (); oos.close(); bos.close(); } catch (IOException ex) { ex.printStackTrace(); } return bytes; } }
最後來看客戶端的實現,經過 post 方法發送數據,最後解析服務端返回的結果。併發
public class HttpClient { private static HttpClient INSTANCE = new HttpClient(); private HttpClient(){} public static HttpClient getInstance(){ return INSTANCE; } public Object post(String hostname, Integer port, RpcRequest invocation){ try{ URL url = new URL("http",hostname,port,"/"); HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection(); httpURLConnection.setRequestMethod("POST"); httpURLConnection.setDoOutput(true); OutputStream outputStream = httpURLConnection.getOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(outputStream); oos.writeObject(invocation); oos.flush(); oos.close(); InputStream inputStream = httpURLConnection.getInputStream(); RpcResponse rpcResponse = (RpcResponse)toObject(IOUtils.toByteArray(inputStream)); return rpcResponse.getData(); } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return null; } public Object toObject (byte[] bytes) { Object obj = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(bytes); ObjectInputStream ois = new ObjectInputStream (bis); obj = ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return obj; } }
Netty 模型的客戶端和服務端。
基於 Netty 的服務端,裏面的編碼器和解碼器是咱們本身實現的,你們能夠先用我註釋掉的那部分,等咱們寫到編碼解碼器的時候再替換。app
public class NettyServer { private static NettyServer INSTANCE = new NettyServer(); private static Executor executor = Executors.newCachedThreadPool(); private final static int MESSAGE_LENGTH = 4; private NettyServer(){}; public static NettyServer getInstance(){ return INSTANCE; } private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); public static void submit(Runnable t){ executor.execute(t); } public void start(String host, Integer port){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ final ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, NettyServer.MESSAGE_LENGTH, 0, NettyServer.MESSAGE_LENGTH)); //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭 // pipeline.addLast(new LengthFieldPrepender(NettyServer.MESSAGE_LENGTH)); // pipeline.addLast(new ObjectEncoder()); //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可 // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); //註冊解碼器NettyDecoderHandler pipeline.addLast(new NettyDecoderHandler(RpcRequest.class, serializeType)); //註冊編碼器NettyEncoderHandler pipeline.addLast(new NettyEncoderHandler(serializeType)); pipeline.addLast("handler", new NettyServerHandler()); } }); Channel channel = bootstrap.bind(host, port).sync().channel(); System.out.println("Server start listen at " + port); }catch(Exception e){ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
服務端對應的 handler,netty 都是這種 handler 模式,handler 裏面也是將這個接收的 request 放入線程池中處理。
public class NettyServerHandler extends SimpleChannelInboundHandler<RpcRequest> { private ChannelHandlerContext context; @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest rpcRequest) throws Exception { System.out.println("server channelRead..."); System.out.println(ctx.channel().remoteAddress() + "->server:" + rpcRequest.toString()); InvokeTask it = new InvokeTask(rpcRequest,ctx); NettyServer.submit(it); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ this.context = ctx; } }
給出 InvokeTask 的對應實現。
public class InvokeTask implements Runnable{ private RpcRequest invocation; private ChannelHandlerContext ctx; public InvokeTask(RpcRequest invocation,ChannelHandlerContext ctx) { super(); this.invocation = invocation; this.ctx = ctx; } @Override public void run() { // 從註冊中心根據接口,找接口的實現類 String interFaceName = invocation.getInterfaceName(); Class impClass = null; try { impClass = Class.forName(invocation.getImpl()); } catch (ClassNotFoundException e) { e.printStackTrace(); } Method method; Object result = null; try { method = impClass.getMethod(invocation.getMethodName(),invocation.getParamTypes()); //這塊考慮實現類,是否是應該在 spring 裏面拿 result = method.invoke(impClass.newInstance(),invocation.getParams()); } catch (Exception e) { e.printStackTrace(); } RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setResponseId(invocation.getRequestId()); rpcResponse.setData(result); ctx.writeAndFlush(rpcResponse).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("RPC Server Send message-id respone:" + invocation.getRequestId()); } }); } }
再來看客戶端,客戶端有兩種實現,一種是不能複用 handler(能夠當即爲 connection)的模式,這種模式併發不過高,另外一種是可以複用 handler 的 handlerPool 模式。
不能複用的模式。
public class NettyClient { private static NettyClient INSTANCE = new NettyClient(); private final static int parallel = Runtime.getRuntime().availableProcessors() * 2; private NettyClient(){}; public static NettyClient getInstance(){ return INSTANCE; } private SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); public void start(String host,Integer port){ Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(parallel); try{ bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭 // pipeline.addLast(new LengthFieldPrepender(4)); // pipeline.addLast(new ObjectEncoder()); //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可 // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); //註冊Netty編碼器 System.out.println("11111111:"+serializeType.getSerializeType()); pipeline.addLast(new NettyEncoderHandler(serializeType)); //註冊Netty解碼器 pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType)); pipeline.addLast("handler", new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(host,port).sync(); }catch(Exception e){ group.shutdownGracefully(); } } }
在看可複用的模式,固定 handler 數量,目前框架中使用的是可複用模式,上面的不可複用的沒用上,爲了給你們理解,沒有刪除。
public class NettyChannelPoolFactory { //初始化Netty Channel阻塞隊列的長度,該值爲可配置信息 private static final int channelConnectSize = 10; //Key爲服務提供者地址,value爲Netty Channel阻塞隊列 private static final Map<URL, ArrayBlockingQueue<Channel>> channelPoolMap = new ConcurrentHashMap<>(); private static NettyChannelPoolFactory INSTANCE = new NettyChannelPoolFactory(); private NettyChannelPoolFactory(){}; public static NettyChannelPoolFactory getInstance(){ return INSTANCE; } private List<ServiceProvider> serviceMetaDataList = new ArrayList<>(); //根據配置文件裏面須要調用的接口信息來初始化 channel public void initNettyChannelPoolFactory(Map<String, List<ServiceProvider>> providerMap){ //將服務提供者信息存入serviceMetaDataList列表 Collection<List<ServiceProvider>> collectionServiceMetaDataList = providerMap.values(); for (List<ServiceProvider> serviceMetaDataModels : collectionServiceMetaDataList) { if (CollectionUtils.isEmpty(serviceMetaDataModels)) { continue; } serviceMetaDataList.addAll(serviceMetaDataModels); } //獲取服務提供者地址列表 Set<URL> set = new HashSet<>(); for (ServiceProvider serviceMetaData : serviceMetaDataList) { String serviceIp = serviceMetaData.getIp(); int servicePort = serviceMetaData.getPort(); URL url = new URL(serviceIp,servicePort); set.add(url); } for(URL url:set){ //爲每一個 ip端口 創建多個 channel,而且放入阻塞隊列中 int channelSize = 0; while(channelSize < channelConnectSize){ Channel channel = null; while(channel == null){ channel = registerChannel(url); } channelSize ++; ArrayBlockingQueue<Channel> queue = channelPoolMap.get(url); if(queue == null){ queue = new ArrayBlockingQueue<Channel>(channelConnectSize); channelPoolMap.put(url, queue); } queue.offer(channel); } } } public Channel registerChannel(URL url) { final SerializeType serializeType = SerializeType.queryByType(Configuration.getInstance().getSerialize()); Bootstrap bootstrap = new Bootstrap(); EventLoopGroup group = new NioEventLoopGroup(10); try{ bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel arg0) throws Exception { ChannelPipeline pipeline = arg0.pipeline(); //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4); // pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭 // pipeline.addLast(new LengthFieldPrepender(4)); // pipeline.addLast(new ObjectEncoder()); //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可 // pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new NettyEncoderHandler(serializeType)); //註冊Netty解碼器 pipeline.addLast(new NettyDecoderHandler(RpcResponse.class, serializeType)); pipeline.addLast("handler", new NettyClientHandler()); } }); ChannelFuture future = bootstrap.connect(url.getHost(),url.getPort()).sync(); Channel channel = future.channel(); //等待Netty服務端鏈路創建通知信號 final CountDownLatch connectedLatch = new CountDownLatch(1); final List<Boolean> isSuccess = new ArrayList<>(1); future.addListener(new ChannelFutureListener(){ @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()){ isSuccess.add(true); }else{ isSuccess.add(false); } connectedLatch.countDown(); } }); connectedLatch.await(); if(isSuccess.get(0)){ return channel; } }catch(Exception e){ group.shutdownGracefully(); e.printStackTrace(); } return null; } //根據 url 獲取阻塞隊列 public ArrayBlockingQueue<Channel> acqiure(URL url){ System.out.println(channelPoolMap.toString()); return channelPoolMap.get(url); } //channel 使用完畢後進行回收 public void release(ArrayBlockingQueue<Channel> queue, Channel channel, URL url){ if(queue == null){ return; } //須要檢查 channel 是否可用,若是不可用,從新註冊一個放入阻塞隊列中 if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){ if (channel != null) { channel.deregister().syncUninterruptibly().awaitUninterruptibly(); channel.closeFuture().syncUninterruptibly().awaitUninterruptibly(); } Channel c = null; while(c == null){ c = registerChannel(url); } queue.offer(c); return; } queue.offer(channel); } }
給出對應的 handler 實現,在 channelread0 裏面讀取 server 端返回的信息,由於 netty 是異步的,因此須要 MessageCallBack 來實現咱們的同步調用。
public class NettyClientHandler extends SimpleChannelInboundHandler<RpcResponse> { private ChannelHandlerContext context; @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("中止時間是:"+new Date()); System.out.println("HeartBeatClientHandler channelInactive"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; System.out.println("激活時間是:"+ctx.channel().id()); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcResponse rpcResponse) throws Exception { // String res = (String)msg; //RpcResponse rpcResponse = (RpcResponse)msg; String responseId = rpcResponse.getResponseId(); MessageCallBack callBack = ResponseHolder.getInstance().mapCallBack.get(responseId); if(callBack != null){ ResponseHolder.getInstance().mapCallBack.remove(responseId); callBack.over(rpcResponse); } } }
MessageCallBack 的實現。
public class MessageCallBack { private RpcRequest rpcRequest; private RpcResponse rpcResponse; private Lock lock = new ReentrantLock(); private Condition finish = lock.newCondition(); public MessageCallBack(RpcRequest request) { this.rpcRequest = request; } public Object start() throws InterruptedException { try { lock.lock(); //設定一下超時時間,rpc服務器過久沒有相應的話,就默認返回空吧。 finish.await(10*1000, TimeUnit.MILLISECONDS); if (this.rpcResponse != null) { return this.rpcResponse.getData(); } else { return null; } } finally { lock.unlock(); } } public void over(RpcResponse reponse) { try { lock.lock(); this.rpcResponse = reponse; finish.signal(); } finally { lock.unlock(); } } }
既然是可插拔式框架,那麼底層協議必定要是可選擇的,因此咱們定義一個頂層接口來支持咱們選擇協議。
start 方法是啓動服務端,send 方法是客戶端發送數據。
public interface Procotol { void start(URL url); Object send(URL url, RpcRequest invocation); }
對應的三個協議的接口實現。
Netty 的實現
public class DubboProcotol implements Procotol { @Override public void start(URL url) { NettyServer nettyServer = NettyServer.getInstance(); nettyServer.start(url.getHost(),url.getPort()); } @Override public Object send(URL url, RpcRequest invocation) { ArrayBlockingQueue<Channel> queue = NettyChannelPoolFactory.getInstance().acqiure(url); Channel channel = null; try { channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS); if(channel == null || !channel.isActive() || !channel.isOpen()|| !channel.isWritable()){ channel = queue.poll(invocation.getTimeout(), TimeUnit.MILLISECONDS); if(channel == null){ channel = NettyChannelPoolFactory.getInstance().registerChannel(url); } } //將本次調用的信息寫入Netty通道,發起異步調用 ChannelFuture channelFuture = channel.writeAndFlush(invocation); channelFuture.syncUninterruptibly(); MessageCallBack callback = new MessageCallBack(invocation); ResponseHolder.getInstance().mapCallBack.put(invocation.getRequestId(), callback); try { return callback.start(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } catch (InterruptedException e1) { e1.printStackTrace(); }finally{ System.out.println("release:"+channel.id()); NettyChannelPoolFactory.getInstance().release(queue, channel, url); } return null; } }
http 的實現
public class HttpProcotol implements Procotol { @Override public void start(URL url) { HttpServer httpServer = HttpServer.getInstance(); httpServer.start(url.getHost(),url.getPort()); } @Override public Object send(URL url, RpcRequest invocation) { HttpClient httpClient = HttpClient.getInstance(); return httpClient.post(url.getHost(),url.getPort(),invocation); } }
Socket 的實現
public class SocketProcotol implements Procotol { @Override public void start(URL url) { SocketServer socketServer = SocketServer.getInstance(); socketServer.publiser(url.getPort()); } @Override public Object send(URL url, RpcRequest invocation) { SocketClient socketClient = SocketClient.getInstance(); return socketClient.sendRequest(url.getHost(),url.getPort(),invocation); } }
這樣一個可選擇協議的模型就實現了,咱們可已經過這個模塊選擇協議,而且與服務端通訊。