版權聲明:本文爲博主原創文章,未經博主容許不得轉載。java
步驟1 打開ServerSocketChannel,用於監聽客戶端鏈接,它是全部客戶端鏈接的父管道,代碼示例以下:api
ServerSocketChannel acceptorSvr = ServerSocketChannel.open();
步驟2 綁定監聽端口,並設置非阻塞模式,示例代碼以下:數組
acceptorSvr.socket().bind(InetAddress.getByName("IP"),port); acceptorSvr.configureBlocking(false);
步驟3 建立Reactor線程,建立多路複用器並啓動線程,代碼以下:
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();服務器
步驟4 將ServerSocketChannel註冊到Reactor線程的多路複用器Selector上,監聽ACCEPT事件,代碼以下:網絡
SelectionKey key = accptorSvr.register(selector,SelectionKey.OP_ACCEPT,ioHandle);
步驟5 Selector在線程run方法內輪詢準備就緒的key,代碼以下:數據結構
int num = selector.select(); Set selectedKey = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while(it.hasNext()){ SelectionKey key = it.next(); //handle IO operation }
步驟6 Selector 監聽到有新的客戶端接入請求,處理新的處理請求,完成TCP三次握手,創建物理鏈路,代碼以下:併發
SocketChannel channel = svrChannel.accept();
步驟7 設置客戶端鏈路爲非阻塞式,示例代碼以下:異步
channel.configBlocking(false); channel.socket().setReuseAddress(true);
步驟8 將新接入的客戶端鏈接註冊到Reactor線程的Selector上,監聽讀寫操做。用來讀取客戶端發送的網絡信息,代碼以下:socket
SelectionKey key = socketChannel.register(selector,SelectionKey.OP_ACCEPT,ioHandle);
步驟9 異步讀取客戶端請求消息到緩衝區,代碼以下:
int num = channel.read(buffer);
步驟10 對ByteBuffer進行編解碼。若是有半包消息指針reset,繼續讀取後續的報文,將讀取的信息封裝成Task交給線程池處理。代碼以下:
//僞代碼 while(buffer.hasRemaining()){ buffer.mark(); Object message = decode(buffer); if(message==null){ buffer.reset(); break; } messageList.add(message); } if(!buffer.hasRemaining()) buffer.clear(); else buffer.compact(); if(messageList!=null&& !messageList.isEmpty()){ for (Object object:messageList) { handleTask(object); } }
步驟11 將POJO編碼成ButeBuffer.調用SocketChannel的異步write()方法將消息異步發送給客戶端,代碼以下:
socketChannel.write(buffer);
若是緩衝區滿,會致使寫半包。此時,須要註冊監聽寫操做位,循環寫,知道整包消息寫入TCP緩衝區。
步驟1 打開SocketChannel,綁定客戶端本地地址(可選,默認系統會隨機分配一個可用的本地地址),代碼示例以下:
SocketChannel clientChannel = SocketChannel.open();
步驟2 設置channel爲非阻塞的,同時設置TCP鏈接參數,代碼示例以下:
clientChannel.configBlocking(false);
socket.setReuseAddress(true);
步驟3 異步鏈接服務端,代碼示例以下:
boolean connected = clientChannel.connect(new InetSocketAddress("ip",port));
步驟4 判斷是否鏈接成功,若是成功則註冊到Selector,不然從新鏈接,代碼示例以下:
if(connected){ clientChannel.register(selector,SelectionKey.OP_READ,ioHandle); }else{ clientChannel.register(selector,SlectionKey.OP_CONNECT,ioHandle); }
步驟5 向Reactor線程的Selector註冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答,代碼示例以下:
步驟6 建立ReActor線程,建立Selector並啓動線程,代碼示例以下:
Selector selector = Selector.open(); new Thread(new ReactorTask()).start();
步驟7 Selector在線程run方法中無限循環輪詢準備就緒的key,代碼示例以下:
int num = selector.selector(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator it = selectionKeys.iterator(); while(it.hasNext()){ SelectionKey selectionKey = it.next(); //處理IO操做 }
步驟8 接收connect事件並處理,代碼示例以下:
if(key.isConnectable()){ //處理鏈接請求 }
步驟9 判斷鏈接結果,若是鏈接成功,則註冊讀事件到selector中,代碼示例以下:
if(channel.finishConnect()){ //registerRead() }
步驟10 註冊讀事件到selector,代碼示例以下:
clientChannel.register(selector,SelectionKey.OP_READ,ioHandle);
步驟11 異步讀客戶端請求消息到緩衝區,代碼示例以下:
int readNumber = channel.read(readBuffer);
步驟12 對讀取到的消息進行編解碼,若是有半包消息接收,緩衝區reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,丟到線程池中處理。代碼示例以下:
//僞代碼 while(buffer.hasRemaining()){ buffer.mark(); Object message = decode(buffer); if(message==null){ buffer.reset(); break; } messageList.add(message); } if(!buffer.hasRemaining()) buffer.clear(); else buffer.compact(); if(messageList!=null&& !messageList.isEmpty()){ for (Object object:messageList) { handleTask(object); } }
步驟13 將POJO對象編碼成ByteBuffer,調用SocketChannel的異步write接口將消息異步發送到客戶端,代碼示例以下:
socketChannel.write(buffer);
同步阻塞IO(BIO) | 僞異步 | 非阻塞IO(NIO) | 異步IO(AIO) | |
客戶端個數:IO線程數 | 1:1 | M:N(通常M大於N) | M:1 | M:0(不須要額外啓動線程,被動回調) |
IO類型(阻塞) | 阻塞IO | 阻塞 | 非阻塞IO | 非阻塞IO |
IO類型(同步) | 同步 | 同步 | 同步 | 異步 |
API使用難度 | 簡單 | 簡單 | 複雜 | 複雜 |
調試難度 | 簡單 | 簡單 | 難 | 難 |
可靠性 | 很是差 | 差 | 高 | 高 |
吞吐量 | 低 | 中 | 高 | 高 |