編者注:Netty是Java領域有名的開源網絡庫,特色是高性能和高擴展性,所以不少流行的框架都是基於它來構建的,好比咱們熟知的Dubbo、Rocketmq、Hadoop等,針對高性能RPC,通常都是基於Netty來構建,好比soft-bolt。總之一句話,Java小夥伴們須要且有必要學會使用Netty並理解其實現原理。
關於Netty的入門講解可參考:Netty 入門,這一篇文章就夠了java
Netty的鏈接處理就是IO事件的處理,IO事件包括讀事件、ACCEPT事件、寫事件和OP_CONNECT事件。程序員
IO事件的處理是結合ChanelPipeline來作的,一個IO事件到來,首先進行數據的讀寫操做,而後交給ChannelPipeline進行後續處理,ChannelPipeline中包含了channelHandler鏈(head + 自定義channelHandler + tail)。
使用channelPipeline和channelHandler機制,起到了解耦和可擴展的做用。一個IO事件的處理,包含了多個處理流程,這些處理流程正好對應channelPipeline中的channelHandler。若是對數據處理有新的需求,那麼就新增channelHandler添加到channelPipeline中,這樣實現很6,之後本身寫代碼能夠參考。面試
說到這裏,通常爲了知足擴展性要求,經常使用2種模式:網絡
netty的channelHandler
的channelPipeline
能夠理解成就是責任鏈模式,經過動態增長channelHandler可達到複用和高擴展性目的。架構
瞭解netty鏈接處理機制以前須要瞭解下NioEventLoop模型,其中處理鏈接事件的架構圖以下:框架
對應的處理邏輯源碼爲:socket
// 處理各類IO事件 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); try { int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // OP_CONNECT事件,client鏈接上客戶端時觸發的事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 注意,這裏讀事件和ACCEPT事件對應的unsafe實例是不同的 // 讀事件 -> NioByteUnsafe, ACCEPT事件 -> NioMessageUnsafe unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
從上面代碼來看,事件主要分爲3種,分別是OP_CONNECT事件、寫事件和讀事件(也包括ACCEPT事件)。下面分爲3部分展開:ide
// NioMessageUnsafe public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { do { // 調用java socket的accept方法,接收請求 int localRead = doReadMessages(readBuf); // 增長統計計數 allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } // readBuf中存的是NioChannel int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; // 觸發fireChannelRead pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); }
鏈接創建好以後就該鏈接的channel註冊到workGroup中某個NIOEventLoop的selector中,註冊操做是在fireChannelRead中完成的,這一塊邏輯就在ServerBootstrapAcceptor.channelRead中。oop
// ServerBootstrapAcceptor public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // 設置channel的pipeline handler,及channel屬性 child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { // 將channel註冊到childGroup中的Selector上 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
// NioByteUnsafe public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { byteBuf = allocHandle.allocate(allocator); // 從channel中讀取數據,存放到byteBuf中 allocHandle.lastBytesRead(doReadBytes(byteBuf)); allocHandle.incMessagesRead(1); readPending = false; // 觸發fireChannelRead pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 觸發fireChannelReadComplete,若是在fireChannelReadComplete中執行了ChannelHandlerContext.flush,則響應結果返回給客戶端 allocHandle.readComplete(); // 觸發fireChannelReadComplete pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }
正常狀況下通常是不會註冊寫事件的,若是Socket發送緩衝區中沒有空閒內存時,再寫入會致使阻塞,此時能夠註冊寫事件,當有空閒內存(或者可用字節數大於等於其低水位標記)時,再響應寫事件,並觸發對應回調。性能
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // 寫事件,從flush操做來看,雖然以前沒有向socket緩衝區寫數據,可是已經寫入到 // 了chnanel的outboundBuffer中,flush操做是將數據從outboundBuffer寫入到 // socket緩衝區 ch.unsafe().forceFlush(); }
該事件是client觸發的,由主動創建鏈接這一側觸發的。
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // OP_CONNECT事件,client鏈接上客戶端時觸發的事件 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); // 觸發finishConnect事件,其中就包括fireChannelActive事件,若是有自定義的handler有channelActive方法,則會觸發 unsafe.finishConnect(); }
推薦閱讀
歡迎小夥伴關注【TopCoder】閱讀更多精彩好文。