【如何實現一個簡單的RPC框架】系列文章:java
【遠程調用框架】如何實現一個簡單的RPC框架(一)想法與設計
【遠程調用框架】如何實現一個簡單的RPC框架(二)實現與使用
【遠程調用框架】如何實現一個簡單的RPC框架(三)優化一:利用動態代理改變用戶服務調用方式
【遠程調用框架】如何實現一個簡單的RPC框架(四)優化二:改變底層通訊框架
【遠程調用框架】如何實現一個簡單的RPC框架(五)優化三:軟負載中心設計與實現
第一個優化以及第二個優化修改後的工程代碼可下載資源 如何實現一個簡單的RPC框架編程
簡單socket通訊BIO方式-》-》NIO方式-》使用netty服務框架
關於這部分,能夠提早閱讀下博客《Java NIO BIO AIO總結》bootstrap
問題描述:在目前的服務框架版本中,服務發佈端和服務調用端採用的IO通訊模式爲BIO,即便用最基礎的Java Socket編程的方式。看過咱們以前實現介紹部分的讀者都知道,服務端一直在監聽請求,每當有一個請求發來,則會建立一個新的線程去處理該請求,以下代碼:api
while (true){ Socket socket = serverSocket.accept(); new Thread(new ServerProcessThread(socket)).start();//開啓新的線程進行鏈接請求的處理 }
而ServerProcessThread線程完成了服務的調用及結果的返回工做,這樣的方法,有如下兩個弊端:數組
(2)這種方式爲阻塞式IO,即數據的讀寫是阻塞的,在沒有有效可讀/可寫數據的狀況下,線程會一直阻塞,形成資源的浪費。
所以爲了解決上述兩個弊端,咱們改變這種IO模式。服務器
step1.使用selector+channel+buffer實現NIO模式(參考博客《Java NIO BIO AIO總結》)
NIO的模式有兩個特色:
(1)不用對全部鏈接都建立新的線程去維護,selector線程能夠管理多個數據通道;
(2)IO數據讀寫是非阻塞的,只有當出現有效讀寫數據時纔會出發相應的事件進行讀寫,節約資源。網絡
關於NIO模式的基本客戶端與服務端的實現代碼在博客《Java NIO BIO AIO總結》中已經進行了介紹。這裏我對LCRPC框架代碼的改造即利用該博客中的代碼。僅做爲NIO通訊模式的使用示例,由於還有好多能夠修改的地方。併發
@Override public boolean startLisetenByNIO() { new Thread(new NIOServerThread()).start(); return true; }
該方法開啓新的線程,採用NIO的模式進行服務的監聽。線程類NIOServerThread的代碼與博客《Java NIO BIO AIO總結》介紹的一致,只是read事件的觸發方法代碼有所改動。該類的代碼以下:app
public class NIOServerThread extends NIOBase implements Runnable{ @Override public void run() { try { initSelector();//初始化通道管理器Selector initServer(Constant.IP,Constant.PORT);//初始化ServerSocketChannel,開啓監聽 listen();//輪詢處理Selector選中的事件 } catch (IOException e) { e.printStackTrace(); } catch (InstantiationException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } catch (NoSuchMethodException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } /** * 初始化 該線程中的通道管理器Selector */ public void initSelector() throws IOException { this.selector = Selector.open(); } /** * 採用輪詢的方式監聽selector上是否有須要處理的事件,若是有,則循環處理 * 這裏主要監聽鏈接事件以及讀事件 */ public void listen() throws IOException, ClassNotFoundException, NoSuchMethodException, InstantiationException, IllegalAccessException, InvocationTargetException { System.out.println("監聽成功,可開始進行服務註冊!"); //輪詢訪問select while(true){ //當註冊的事件到達時,方法返回;不然將一直阻塞 selector.select(); //得到selector中選中的項的迭代器,選中的項爲註冊的事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //循環處理註冊事件 /** * 一共有四種事件: * 1. 服務端接收客戶端鏈接事件: SelectionKey.OP_ACCEPT * 2. 客戶端鏈接服務端事件: SelectionKey.OP_CONNECT * 3. 讀事件: SelectionKey.OP_READ * 4. 寫事件: SelectionKey.OP_WRITE */ while(iterator.hasNext()){ SelectionKey key = iterator.next(); //手動刪除已選的key,以防重複處理 iterator.remove(); //判斷事件性質 if (key.isAcceptable()){//服務端接收客戶端鏈接事件 accept(key); }else if (key.isReadable()){//讀事件 read(key); } } } } /** * 得到一個ServerSocket通道,並經過port對其進行初始化 * @param port 監聽的端口號 */ private void initServer(String ip,int port) throws IOException { //step1. 得到一個ServerSocketChannel ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //step2. 初始化工做 serverSocketChannel.configureBlocking(false);//設置通道爲非阻塞 serverSocketChannel.socket().bind(new InetSocketAddress(ip,port)); //step3. 將該channel註冊到Selector上,併爲該通道註冊SelectionKey.OP_ACCEPT事件 //這樣一來,當有"服務端接收客戶端鏈接"事件到達時,selector.select()方法會返回,不然將一直阻塞 serverSocketChannel.register(this.selector,SelectionKey.OP_ACCEPT); } /** * 當監聽到服務端接收客戶端鏈接事件後的處理函數 * @param key 事件key,能夠從key中獲取channel,完成事件的處理 */ public void accept(SelectionKey key) throws IOException { //step1. 獲取serverSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); //step2. 得到和客戶端鏈接的socketChannel SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false);//設置爲非阻塞 //step3. 註冊該socketChannel socketChannel.register(selector,SelectionKey.OP_READ);//爲了接收客戶端的消息,註冊讀事件 } public void read(SelectionKey key) throws IOException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { byte[] result = getReadData(key); if (result == null) return; SocketChannel socketChannel = (SocketChannel) key.channel(); LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(result); IProviderService providerService = new ProviderServiceImpl(); //將結果寫回 socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO)))); // socketChannel.close();//關閉 } }
類NIOBase爲基類。代碼以下:框架
public class NIOBase { // 線程中的通道管理器 public Selector selector; /** * 初始化 該線程中的通道管理器Selector */ public void initSelector() throws IOException { this.selector = Selector.open(); } public byte[] getReadData(SelectionKey key) throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(10); int len = socketChannel.read(byteBuffer); if (len == -1){ socketChannel.close(); return null;//說明鏈接已經斷開 } int lenth = 0; List<byte[]> list = new ArrayList<>(); while (len > 0){ lenth += len; byteBuffer.flip(); byte[] arr = new byte[len]; byteBuffer.get(arr,0,len); list.add(arr); byteBuffer.clear(); len = socketChannel.read(byteBuffer); } byte[] result = new byte[lenth]; int l = 0; for (int i = 0;i<list.size();i++){ for (int j = 0;j<list.get(i).length;j++){ result[l + j] = list.get(i)[j]; } l += list.get(i).length; } return result; } }
getReadData方法讀取客戶端發送的所有數據。利用幫助類ObjectAndByteUtil對客戶端發送的數據進行序列化爲reqeust對象。同時爲接口IProviderService添加方法getFuncCallData,利用request對象調用相應服務方法,獲得方法的返回值,反序列化後發送給客戶端,該方法的代碼與初版本一致。
幫助類ObjectAndByteUtil負責利用反/序列化技術進行字節數組與對象之間的轉化,代碼以下:
package whu.edu.lcrpc.util; import java.io.*; /** * Created by apple on 17/3/30. */ public class ObjectAndByteUtil { /** * 對象轉數組 * @param obj * @return */ public static byte[] toByteArray (Object obj) { byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray (); oos.close(); bos.close(); } catch (IOException ex) { ex.printStackTrace(); } return bytes; } /** * 數組轉對象 * @param bytes * @return */ public static Object toObject (byte[] bytes) { Object obj = null; try { ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis); obj = ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return obj; } }
爲了採用NIO的方法開啓服務發佈端的服務監聽,咱們修改LCRPCProviderImpl類中對startListen函數的調用改成方法startListenByNIO,使得服務端採用NIO的方式發佈服務。
public Object read(SelectionKey key) throws IOException { //step1. 獲得事件發生的通道 byte[] result = getReadData(key); if (result == null) return null; Object object = ObjectAndByteUtil.toObject(result); return object; }
咱們爲接口IConsumerService添加方法sendDataByNIO,採用NIO的方式將服務調用端的請求信息序列化後發送給服務端,該函數代碼以下:
public Object sendDataByNIO(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException { NIOClient nioClient = new NIOClient(requestDO,ip); return nioClient.run(); }
類NIOClient代碼以下,其中run函數開啓輪詢,當所註冊事件發生時,觸發相應的方法。並在read事件觸發後結束輪訓。
public class NIOClient extends NIOBase{ private LCRPCRequestDO requestDO;//客戶端對應的請求DO,發送給服務端 private String ip; public NIOClient(LCRPCRequestDO requestDO,String ip){ this.requestDO = requestDO; this.ip = ip; } public Object run() { try { initSelector();//初始化通道管理器 initClient(ip,Constant.PORT);//初始化客戶端鏈接scoketChannel return listen();//開始輪詢處理事件 } catch (Exception e) { e.printStackTrace(); return null; } } public Object listen() throws IOException { //輪詢訪問select boolean flag = true; Object result = null; while(flag){ //當註冊的事件到達時,方法返回;不然將一直阻塞 selector.select(); //得到selector中選中的項的迭代器,選中的項爲註冊的事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); //循環處理註冊事件 /** * 一共有四種事件: * 1. 服務端接收客戶端鏈接事件: SelectionKey.OP_ACCEPT * 2. 客戶端鏈接服務端事件: SelectionKey.OP_CONNECT * 3. 讀事件: SelectionKey.OP_READ * 4. 寫事件: SelectionKey.OP_WRITE */ while(iterator.hasNext()){ SelectionKey key = iterator.next(); //手動刪除已選的key,以防重複處理 iterator.remove(); //判斷事件性質 if (key.isReadable()){//讀事件 result = read(key); flag = false; break; }else if (key.isConnectable()) {//客戶端鏈接事件 connect(key); } } } return result; } /** * 得到一個SocketChannel,並對該channel作一些初始化工做,並註冊到 * @param ip * @param port */ public void initClient(String ip,int port) throws IOException { //step1. 得到一個SocketChannel SocketChannel socketChannel = SocketChannel.open(); //step2. 初始化該channel socketChannel.configureBlocking(false);//設置通道爲非阻塞 //step3. 客戶端鏈接服務器,其實方法執行並無實現鏈接,須要再listen()方法中調用channel.finishConnect()方法才能完成鏈接 socketChannel.connect(new InetSocketAddress(ip,port)); //step4. 註冊該channel到selector中,併爲該通道註冊SelectionKey.OP_CONNECT事件和SelectionKey.OP_READ事件 socketChannel.register(this.selector,SelectionKey.OP_CONNECT|SelectionKey.OP_READ); } /** * 當監聽到客戶端鏈接事件後的處理函數 * @param key 事件key,能夠從key中獲取channel,完成事件的處理 */ public void connect(SelectionKey key) throws IOException { //step1. 獲取事件中的channel SocketChannel socketChannel = (SocketChannel) key.channel(); //step2. 若是正在鏈接,則完成鏈接 if (socketChannel.isConnectionPending()){ socketChannel.finishConnect(); } socketChannel.configureBlocking(false);//將鏈接設置爲非阻塞 //step3. 鏈接後,能夠給服務端發送消息 socketChannel.write(ByteBuffer.wrap(ObjectAndByteUtil.toByteArray(requestDO))); } public Object read(SelectionKey key) throws IOException { //step1. 獲得事件發生的通道 byte[] result = getReadData(key); if (result == null) return null; Object object = ObjectAndByteUtil.toObject(result); return object; } }
爲了使得客戶端採用NIO方式進行通信,咱們修改MyInvokeHandler類:
// result = consumerService.sendData(serviceAddress,requestDO);//採用BIO的方式 result = consumerService.sendDataByNIO(serviceAddress,requestDO);//採用NIO的方式
至此,NIO通訊模式代碼修改完畢。在測試的過程當中,遇到了一個問題,就是在服務調用端發出一個服務調用請求後,服務發佈端一直在觸發read事件,查閱資料後,瞭解到這種NIO的實現方式中,客戶端或者服務端其中一方將鏈接關閉後,會一直觸發另外一方的read事件,這時read會回傳-1,若沒有即便正確處理斷線(關閉channel),read事件會一直觸發,所以在getData函數讀取數據時,添加以下代碼:
if (len == -1){ socketChannel.close(); return null;//說明鏈接已經斷開 }
至此,問題得以解決。
服務註冊查找中心以及服務端客戶端的代碼都不須要改變,分別運行後,獲得與初版相同的結果。(因爲服務端咱們採用一個selector管理全部channel,而且沒有開啓新的線程去處理數據,所以客戶端會以同步的方式獲得四次服務調用結果)
目前爲止咱們的代碼中,通訊部分採用了NIO和BIO兩種模式。BIO模式採用socket編程實現,NIO部分採用selector channel buffer編程實現。可是不管哪種,都只是簡單的幫助咱們瞭解兩種通訊模式的基本概念,以及如何用最簡單得編程方式實現。咱們在代碼中,也有很是多的異常,網絡等狀況沒有考慮,在實際生產中,也毫不會使用這種最基本最底層的編程方式來完成遠程得通訊。所以,咱們這裏引入Netty開源框架來實現通訊。他幫助咱們考慮了多種情況,使得咱們以簡單的代碼完成高質量的遠程通訊,專一於其餘業務邏輯等的實現。
在分佈式應用系統開發中,服務化的應用之間進行遠程通訊時使用。Netty是在Java NIO的基礎上封裝的用於客戶端服務端網絡應用程序開發的框架,幫助用戶考慮在分佈式、高併發、高性能開發中遇到的多種情況,使得用戶使用更容易的網絡編程接口完成網絡通訊,專一於其餘業務邏輯的開發。
(1)關於Netty
(如下內容摘自知乎的帖子《通俗地講,Netty 能作什麼?》)
netty是一套在java NIO的基礎上封裝的便於用戶開發網絡應用程序的api.
Netty是什麼?
1)本質:JBoss作的一個Jar包
2)目的:快速開發高性能、高可靠性的網絡服務器和客戶端程序
3)優勢:提供異步的、事件驅動的網絡應用程序框架和工具
通俗的說:一個好使的處理Socket的東東
(2)爲何選擇netty
如下內容摘抄自《Netty權威指南》
在上述優化中,咱們使用JDK爲咱們提供的NIO的類庫來修改LCRPC框架的遠程通訊方式。如下總結了不選擇Java原聲NIO編程的緣由:
因爲上述緣由,在大多數場景下,不建議你們直接食用JDK的NIO類庫,除非精通NIO編程或者有特殊的需求。在絕大多數的業務場景中,咱們可使用NIO框架Netty來進行NIO編程,他既能夠做爲客戶端也能夠做爲服務端,同時也支持UDP和異步文件傳輸,功能很是強大。
如下總結了爲何選擇Netty做爲基礎通訊框架:
(3)LCRPC服務框架優化:使用netty替換底層網絡通信
與NIO的修改方式大體相同
增長四個netty服務端與客戶端的類;
netty服務端開啓監聽的類NettyServer:
package whu.edu.lcrpc.io.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import whu.edu.lcrpc.util.Constant; /** * Created by apple on 17/4/10. */ public class NettyServer { public void bind() throws InterruptedException { //配置服務端的NIO的線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup wokerGroup = new NioEventLoopGroup(); //建立服務啓動的輔助類 try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,wokerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannelHandler()); //綁定端口,同步等待成功 ChannelFuture f = b.bind(Constant.PORT).sync(); System.out.println("已經開始監聽,能夠註冊服務了"); //等待服務端監聽端口關閉 f.channel().closeFuture().sync(); }finally { //優雅退出,釋放線程池資源 bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } } private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new NettyServerHandler()); } } }
netty服務端hanlder類NettyServerhandler:
package whu.edu.lcrpc.io.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import whu.edu.lcrpc.entity.LCRPCRequestDO; import whu.edu.lcrpc.service.IProviderService; import whu.edu.lcrpc.service.impl.ProviderServiceImpl; import whu.edu.lcrpc.util.ObjectAndByteUtil; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Date; /** * Created by apple on 17/4/10. */ public class NettyServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); if (req == null) return; LCRPCRequestDO requestDO = (LCRPCRequestDO) ObjectAndByteUtil.toObject(req); IProviderService providerService = new ProviderServiceImpl(); ByteBuf resp = Unpooled.copiedBuffer(ObjectAndByteUtil.toByteArray(providerService.getFuncCalldata(requestDO))); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
netty客戶端鏈接類NettyClient:
package whu.edu.lcrpc.io.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import whu.edu.lcrpc.util.Constant; import whu.edu.lcrpc.util.ObjectAndByteUtil; import java.io.UnsupportedEncodingException; /** * Created by apple on 17/4/10. */ public class NettyClient { private Object reqObj; private String ip; public NettyClient(Object reqObj, String ip){ this.reqObj = reqObj; this.ip = ip; } public Object connect() throws InterruptedException, UnsupportedEncodingException { //配置客戶端NIO線程組 EventLoopGroup group = new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); byte[] req = ObjectAndByteUtil.toByteArray(reqObj); NettyClientHandler nettyClientHandler = new NettyClientHandler(req); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY,true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(nettyClientHandler); } }); //發起異步鏈接操做 ChannelFuture f = b.connect(ip, Constant.PORT).sync(); //等待客戶端鏈路關閉 f.channel().closeFuture().sync(); //拿到異步請求結果,返回 Object responseObj = ObjectAndByteUtil.toObject(nettyClientHandler.response); return responseObj; }finally { //優雅退出,釋放NIO線程組 group.shutdownGracefully(); } } }
netty客戶端handler類NettyClientHandler:
package whu.edu.lcrpc.io.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * Created by apple on 17/4/11. */ public class NettyClientHandler extends ChannelHandlerAdapter { private ByteBuf firstMessage; public byte[] response; public NettyClientHandler(byte[] req){ //將請求寫入緩衝區 firstMessage = Unpooled.buffer(req.length); firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; response = new byte[buf.readableBytes()]; buf.readBytes(response); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
然後修改LCRPC中本來的代碼,採用netty來進行遠程通訊。
首先在接口IConsumerService中增長函數sendDataByNetty,該函數採用netty的方式向服務發佈端發送數據。函數實現以下:
@Override public Object sendDataByNetty(String ip, LCRPCRequestDO requestDO) throws IOException, ClassNotFoundException, InterruptedException { NettyClient nettyClient = new NettyClient(requestDO,ip); return nettyClient.connect(); }
然後在接口IProviderService增長函數startListenByNetty,該函數採用netty的方式開啓服務監聽。
@Override public boolean startListenByNetty() { new Thread(()->{ NettyServer nettyServer = new NettyServer(); try { nettyServer.bind(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); return true; }
而後在代理handler類MyInvocationHandler中,
修改
result = consumerService.sendDataByNIO(serviceAddress,requestDO);//採用NIO的方式
爲
result = consumerService.sendDataByNetty(serviceAddress,requestDO); //採用netty的方式
採用netty的方式調用服務。
而且在類LCRPCProviderImpl中使用方法startListenByNetty開啓服務的監聽。
客戶端以及服務端的測試工程代碼均不須要改變,進行測試後,輸出結果不變。
須要注意的是:上述關於Netty的使用沒有考慮到TCL粘包/拆包的問題!
這個優化未完待續~