SocketChannel clientChannel = SocketChannel.open();
ClientChannel.configureBlocking(false); socket.setReuseAddress(true); socket.setReceiveBufferSize(BUFFER_SIZE); socket.setSendBufferSize(BUFFER_SIZE);
boolean connected = clientChannel.connect(new InetSocketAddress(IP,port));
if(connected){ clientChannel.register(seletor,SelectionKey.OP_READ,ioHandler); }else{ clientChannel.register(selector,Selection.OP_CONNECT,ioHandler); }
步驟五:向Reactor線程的多路複用器註冊OP_CONNET狀態位,監聽服務端的TCP ACK應答,示例代碼以下:java
clientChannel.register(select, SelectionKey.OP_CONNECT,ioHandler);
Selector selector = Selector.open(); New Thread(new ReactorTask()).start();
步驟七:多路複用器在線程run方法的無限循環體內輪詢準備就緒的Key,示例代碼以下: 異步
int num = selector.select(); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); while(it.hasNext){ SelectionKey key = (SelectionKey)it.next(); //...deal with I/O event ... }
if(key.isConnectable()){ //handlerConnect(); }
步驟九:判斷鏈接成功,若是鏈接成功,註冊讀事件到多路複用器,示例代碼以下: socket
if(channel.finishConnect()){ registerRead(); }
clientChannel.register(selector,SelectionKey.OP_READ,ioHandler);
步驟十一:異步讀客戶端請求消息到緩衝區,示例代碼以下: 線程
int readNumber = channel.read(receivedBuffer);
步驟十二:對ByteBuffer進行編解碼,若是有半包消息接收緩衝區Reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排,示例代碼以下: code
Object message = null; whiel(buffer.hasRemain()){ byteBuffer.mark(); Object message = decode(byteBuffer); if(message == null){ byteBuffer.reset(); break; } messageList.add(message); } if( !byteBuffer.hasRemain()){ byteBuffer.clear(); }else{ byteBuffer.compact(); } if(messageList != null & !messageList.isEmpty()){ for(Object messageE : messageList){ handlerTask(messageE); } }
步驟十三:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。示例代碼以下: 對象
socketChannel.write(buffer);