全文圍繞下圖,Netty-Channel的簡化版架構體系圖展開,從頂層Channel接口開始入手,往下遞進,閒言少敘,直接開擼java
概述: 從圖中能夠看到,從頂級接口Channel開始,在接口中定義了一套方法看成規範,緊接着的是來兩個抽象的接口實現類,在這個抽象類中對接口中的方法,進行了部分實現,而後開始根據不一樣的功能分支,分紅服務端的Channel和客戶端的Channelgit
根據服務端和客戶端,Channel能夠分紅兩類(這兩大類的分支見上圖):github
NioServerSocketChannel
NioSocketChannel
channel是一個管道,用於鏈接字節緩衝區Buf和另外一端的實體,這個實例能夠是Socket,也能夠是File, 在Nio網絡編程模型中, 服務端和客戶端進行IO數據交互(獲得彼此推送的信息)的媒介就是Channel編程
Netty對Jdk原生的ServerSocketChannel
進行了封裝和加強封裝成了NioXXXChannel
, 相對於原生的JdkChannel, Netty的Channel增長了以下的組件promise
本篇博客,會追溯上圖中的體系關係,找出NioXXXChannel
的相對於jdk原生channel在哪裏添加的上面的新組件服務器
如今來到上圖的Channel
部分, 他是一個接口, netty用它規定了一個Channel
應該具備的功能,在它的文檔對Channel的
是什麼,以及對各個組件進行了描述網絡
Channel
經過ChannelPipeline
中的多個Handler
處理器,Channel
使用它處理IO數據Channel
中的全部Io操做都是異步的,一經調用就立刻返回,因而Netty基於Jdk原生的Future
進行了封裝, ChannelFuture
, 讀寫操做會返回這個對象,實現自動通知IO操做已完成Channel
是能夠有parent的, 以下// 建立客戶端channel時,會把服務端的Channel設置成本身的parent // 因而就像下面: 服務端的channel = 客戶端的channel.parent(); 服務的channel.parent()==null;
此外,Channel還定義了大量的抽象方法, 以下:多線程
/** * todo 返回一個僅供內部使用的unsafe對象, Chanel上 IO數據的讀寫都是藉助這個類完成的 */ Unsafe unsafe(); // 返回Channel的管道 ChannelPipeline pipeline(); ByteBufAllocator alloc(); @Override // todo 進入第一個實現 , 讀取Channel中的 IO數據 Channel read(); // 返回Channel id ChannelId id(); // todo 返回channel所註冊的 eventLoop EventLoop eventLoop(); // 返回當前Channel的父channel Channel parent(); // todo 描述了 關於channel的 一些列配置信息 ChannelConfig config(); // 檢查channel是否開啓 boolean isOpen(); // 檢查channel是否註冊 boolean isRegistered(); // todo 什麼是active 他說的是channel狀態, 什麼狀態呢? 當前channel 若和Selector正常的通訊就說明 active boolean isActive(); // 返回channel的元數據 ChannelMetadata metadata(); // 服務器的ip地址 SocketAddress localAddress(); // remoteAddress 客戶端的ip地址 SocketAddress remoteAddress(); ChannelFuture closeFuture(); boolean isWritable(); long bytesBeforeUnwritable(); long bytesBeforeWritable(); @Override Channel flush();
unsafe
Netty中,真正幫助Channel完成IO讀寫操做的是它的內部類unsafe
, 源碼以下, 不少重要的功能在這個接口中定義, 下面列舉的經常使用的方法架構
interface Unsafe { // 把channel註冊進EventLoop void register(EventLoop eventLoop, ChannelPromise promise); // todo 給channel綁定一個 adress, void bind(SocketAddress localAddress, ChannelPromise promise); // 把channel註冊進Selector void deregister(ChannelPromise promise); // 從channel中讀取IO數據 void beginRead(); // 往channe寫入數據 void write(Object msg, ChannelPromise promise); ... ...
AbstractChanel
接着往下看,下面來到Channel
接口的直接實現類,AbstractChannel
他是個抽象類, AbstractChannel
重寫部分Channel
接口預約義的方法, 它的抽象內部類AbstractUnsafe
實現了Channel
的內部接口unsafe
併發
咱們如今是從上往下看,可是當咱們建立對象使用的時候實際上是使用的特化的對象,建立特化的對象就不免會調層層往上調用父類的構造方法, 因此咱們看看AbstractChannel
的構造方法幹了什麼活? 源碼以下:
protected AbstractChannel(Channel parent) { this.parent = parent; // todo channelId 表明Chanel惟一的身份標誌 id = newId(); // todo 建立一個unsafe對象 unsafe = newUnsafe(); // todo 在這裏初始化了每個channel都會有的pipeline組件 pipeline = newChannelPipeline(); }
咱們看,AbstractChannel
構造函數, 接受的子類傳遞進來的參數只有一個parent CHannel,並且,還不有可能爲空, 因此在AbstractChannel
是沒有維護jdk底層的Channel的, 相反他會維護着Channel關聯的EventLoop,我是怎麼知道的呢? 首先,它的屬性中存在這個字段,並且,將channel註冊進selector的Register()方法是AbastractChannel
重寫的,Selector在哪呢? 在EventLoop裏面,它怎麼獲得的呢? 它的子類傳遞了給了它
終於看出來點眉目,構造方法作了四件事
unsafe
channelPipeline
AbstractChannel中維護着EventLoop
AbstractChanel
的重要抽象內部類AbstractUnsafe
繼承了Channel
的內部接口Unsafe
他的源碼以下,我貼出來了兩個重要的方法, 關於這兩個方法的解析,我寫在代碼的下面
protected abstract class AbstractUnsafe implements Unsafe { @Override // todo 入參 eventLoop == SingleThreadEventLoop promise == NioServerSocketChannel + Executor public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } // todo 賦值給本身的 事件循環, 把當前的eventLoop賦值給當前的Channel上 做用是標記後續的全部註冊的操做都得交給我這個eventLoop處理, 正好對應着下面的判斷 // todo 保證了 即使是在多線程的環境下一條channel 也只能註冊關聯上惟一的eventLoop,惟一的線程 AbstractChannel.this.eventLoop = eventLoop; // todo 下面的分支判斷裏面執行的代碼是同樣的!!, 爲何? 這是netty的重點, 它大量的使用線程, 線程之間就會產生同步和併發的問題 // todo 下面的分支,目的就是把線程可能帶來的問題降到最低限度 // todo 進入inEventLoop() --> 判斷當前執行這行代碼的線程是否就是 SingleThreadEventExecutor裏面維護的那條惟一的線程 // todo 解釋下面分支的必要性, 一個eventLoop能夠註冊多個channel, 可是channel的整個生命週期中全部的IO事件,僅僅和它關聯上的thread有關係 // todo 並且,一個eventLoop在他的整個生命週期中,只和惟一的線程進行綁定, // // todo 當咱們註冊channel的時候就得確保給他專屬它的thread, // todo 若是是新的鏈接到了, if (eventLoop.inEventLoop()) { // todo 進入regist0() register0(promise); } else { try { // todo 若是不是,它以一個任務的形式提交 事件循環 , 新的任務在新的線程開始, 規避了多線程的併發 // todo 他是SimpleThreadEventExucutor中execute()實現的,把任務添加到執行隊列執行 eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // todo 進入這個方法doRegister() // todo 它把系統建立的ServerSocketChannel 註冊進了選擇器 doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. // todo 確保在 notify the promise前調用 handlerAdded(...) // todo 這是必需的,由於用戶可能已經經過ChannelFutureListener中的管道觸發了事件。 // todo 若是須要的話,執行HandlerAdded()方法 // todo 正是這個方法, 回調了前面咱們添加 Initializer 中添加 Accpter的重要方法 pipeline.invokeHandlerAddedIfNeeded(); // todo !!!!!!! 觀察者模式!!!!!! 通知觀察者,誰是觀察者? 暫時理解ChannelHandler 是觀察者 safeSetSuccess(promise); // todo 傳播行爲, 傳播什麼行爲呢? 在head---> ServerBootStraptAccptor ---> tail傳播事件ChannelRegistered , 也就是挨個調用它們的ChannelRegisted函數 pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. // todo 對於服務端: javaChannel().socket().isBound(); 即 當Channel綁定上了端口 isActive()纔會返回true // todo 對於客戶端的鏈接 ch.isOpen() && ch.isConnected(); 返回true , 就是說, Channel是open的 打開狀態的就是true if (isActive()) { if (firstRegistration) { // todo 在pipeline中傳播ChannelActive的行爲,跟進去 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 // todo 能夠接受客戶端的數據了 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); // todo 因爲端口的綁定未完成,因此 wasActive是 false try { // todo 綁定端口, 進去就是NIO原生JDK綁定端口的代碼 doBind(localAddress); // todo 端口綁定完成 isActive()是true } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } // todo 根據上面的邏輯判斷, 結果爲 true if (!wasActive && isActive()) { invokeLater(new Runnable() { // todo 來到這裏很重要, 向下傳遞事件行爲, 傳播行爲的時候, 從管道的第一個節點開始傳播, 第一個節點被封裝成 HeadContext的對象 // todo 進入方法, 去 HeadContext裏面查看作了哪些事情 // todo 她會觸發channel的read, 最終從新爲 已經註冊進selector 的 chanel, 二次註冊添加上感性趣的accept事件 @Override public void run() { pipeline.fireChannelActive(); } }); } // todo 觀察者模式, 設置改變狀態, 通知觀察者的方法回調 safeSetSuccess(promise); }
AbstractChannel
抽象內部類的register(EventLoop,channelPromise)方法這個方法,是將channel註冊進EventLoop的Selector, 它的調用順序以下:
本類方法 regist()--> 本類方法 register0() --> 本類抽象方法doRegister()
doRegister() 在這裏設計成抽象方法,等着子類去具體的實現, 爲啥這樣作呢?
剛纔說了,AbstractChannel
自己就是個模板,並且它僅僅維護了EventLoop,沒有拿到channel引用的它根本不可能進行註冊的邏輯,那誰有jdk原生channel的引用呢? 它的直接子類AbstractNioChannel
下面是AbstractNioChannel
的構造方法, 它本身維護jdk原生的Channel,因此由他重寫doRegister()
,
*/ // todo 不管是服務端的channel 仍是客戶端的channel都會使用這個方法進行初始化 // // TODO: 2019/6/23 null ServerSocketChannel accept // todo 若是是在建立NioSocketChannel parent==NioServerSocketChannel ch == SocketChanel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent);// todo 繼續向上跟,建立基本的組件 // todo 若是是建立NioSocketChannel 這就是在保存原生的jdkchannel // todo 若是是建立NioServerSocketChannel 這就是在保存ServerSocketChannel this.ch = ch; // todo 設置上感興趣的事件 this.readInterestOp = readInterestOp; try { // todo 做爲服務端, ServerSocketChannel 設置爲非阻塞的 // todo 做爲客戶端 SocketChannel 設置爲非阻塞的 ch.configureBlocking(false); } catch (IOException e) {
AbstractChannel
抽象內部類的bind()方法bind()方法的調用順序, 本類方法 bind()--> 本類的抽象方法 dobind()
方法的目的是給Channel綁定上屬於它的端口,一樣有一個抽象方法,等着子類去實現,由於咱們已經知道了AbstractChannel
不維護channel的引用,因而我就去找dobind()
這個抽象函數的實現, 結果發現,AbstractChannel
的直接子類AbstractNioChannel
中根本不沒有他的實現,這是被容許的,由於AbstractNioChannel
自己也是抽象類, 究竟是誰實現呢? 以下圖:在NioServerSocketChannel中獲取出 Jdk原生的channel, 客戶端和服務端的channel又不一樣,因此綁定端口這中特化的任務,交給他們本身實現
AbstractChannel
的beginRead()()方法上面完成註冊以後,就去綁定端口,當端口綁定完成,就會channel處於active
狀態,下一步就是執行beginRead()
,執行的流程以下
本類抽象方法 beginRead()
--> 本類抽象方法doBeginRead()
這個read()
就是從已經綁定好端口的channel中讀取IO數據,和上面的方法同樣,對於沒有channel
引用的AbstractChannel
來講,netty把它設計成抽象方法,交給擁有jdk 原生channel
引用的AbstractNioChannel
實現
小結:
AbstractChannel
做爲Channel的直接實現類,自己又是抽象類,因而它實現了Channel的預留的一些抽象方法, 初始化了channel的四個組件 id pipeline unsafe parent, 更爲重要的是它的抽象內部類 實現了 關於nettyChannel的註冊,綁定,讀取數據的邏輯,並且以抽象類的方法,挖好了填空題等待子類的特化實現
AbstractNioChannel
依然是來到AbstractNioChannel
的構造方法,發現它作了以下的構造工做:
AbstractChannel
// todo 不管是服務端的channel 仍是客戶端的channel都會使用這個方法進行初始化 // // TODO: 2019/6/23 null ServerSocketChannel accept // todo 若是是在建立NioSocketChannel parent==NioServerSocketChannel ch == SocketChanel protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent);// todo 繼續向上跟,建立基本的組件 // todo 若是是建立NioSocketChannel 這就是在保存原生的jdkchannel // todo 若是是建立NioServerSocketChannel 這就是在保存ServerSocketChannel this.ch = ch; // todo 設置上感興趣的事件 this.readInterestOp = readInterestOp; try { // todo 做爲服務端, ServerSocketChannel 設置爲非阻塞的 // todo 做爲客戶端 SocketChannel 設置爲非阻塞的 ch.configureBlocking(false); } catch (IOException e) {
doRegister()
AbstractNioChannel
維護channel的引用,真正的實現把 jdk 原生的 channel註冊進 Selector中
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { // todo javaChannel() -- 返回SelectableChanel 可選擇的Channel,換句話說,能夠和Selector搭配使用,他是channel體系的頂級抽象類, 實際的類型是 ServerSocketChannel // todo eventLoop().unwrappedSelector(), -- > 獲取選擇器, 如今在AbstractNioChannel中 獲取到的eventLoop是BossGroup裏面的 // todo 到目前看, 他是把ServerSocketChannel(系統建立的) 註冊進了 EventLoop的選擇器 // todo 這裏的 最後一個參數是 this是當前的channel , 意思是把當前的Channel當成是一個 attachment(附件) 綁定到selector上 做用??? // todo 如今知道了attachment的做用了 // todo 1. 當channel在這裏註冊進 selector中返回一個selectionKey, 這個key告訴selector 這個channel是本身的 // todo 2. 當selector輪詢到 有channel出現了本身的感興趣的事件時, 須要從成百上千的channel精確的匹配出 出現Io事件的channel, // todo 因而seleor就在這裏提早把channel存放入 attachment中, 後來使用 // todo 最後一個 this 參數, 若是是服務啓動時, 他就是NioServerSocketChannel 若是是客戶端他就是 NioSocketChannel // todo 到目前爲止, 雖然註冊上了,可是它不關心任何事件 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) {
AbstractNioChannel
新添加了一個內部接口,做爲原Channel的擴展,源碼以下, 咱們着重關心的就是這個新接口的 read()
方法, 它的做用是從channel去讀取IO數據,做爲接口的抽象方法,它規範服務端和客戶端根據本身需求去不一樣的實現這個read()
怎麼特化實現這個read方法呢? 如果服務端,它read的結果就是一個新的客戶端的鏈接, 若是是客戶端,它read的結果就是 客戶端發送過來的數據,因此這個read()
頗有必要去特化
/** * Read from underlying {@link SelectableChannel} */ // todo 兩個實現類, NioByteUnsafe , 處理關於客戶端發來的信息 // todo NioMessageUnsafe 處理客戶端新進來的鏈接 void read(); /** * Special {@link Unsafe} sub-type which allows to access the underlying {@link SelectableChannel} */ public interface NioUnsafe extends Unsafe { /** * Return underlying {@link SelectableChannel} */ SelectableChannel ch(); /** * Finish connect */ void finishConnect(); void forceFlush(); }
AbstractNioChannel
的抽象內部內同類時繼承了它父類的AbstractUnsafe
實現了當前的NioUnsafe
, 再日後看, 問題來了, 服務端和客戶端在的針對read的特化實如今哪裏呢? 想一想看確定在它子類的unsafe內部類中,以下圖,紫框框
一會再具體看這兩個 內部類是如何特化read的 注意啊,再也不是抽象的了
AbstractNioMessageChannel
它的構造函數以下, 只是調用父類的構造函數,傳遞參數
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { // todo 在進去 // todo null ServerSocketChannel accept super(parent, ch, readInterestOp); }
AbstractNioMessageChannel
的MessageNioUnsafe
對read()
特化實現在read方法中,咱們能夠看到,他調用是本類的抽象方法doReadMessages(List<Object> buf)
, 方法的實現類是繼承體系的最底層的NioServerSocketChannel
, 由於他就是那個特化的服務端channel
固然若是咱們一開始跟進read()時,來到的客戶端的AbstractNioByteChannel
,如今咱們找到的doReadMessage()
就是由 客戶端的channelNioSocketChannel
完成的doReadBytes()
// todo 用於處理新連接進來的內部類 private final class NioMessageUnsafe extends AbstractNioUnsafe { // todo 這個容器用於存放臨時讀到的鏈接 private final List<Object> readBuf = new ArrayList<Object>(); // todo 接受新連接的 read來到這裏 @Override public void read() { ... doBeginRead(buf); ... } // todo 處理新的鏈接 是在 NioServerSocketChannel中實現的, 進入查看 protected abstract int doReadMessages(List<Object> buf) throws Exception;
如今咱們就來到了最底層,整張繼承圖就所有展示在眼前了,下面就去看看,特化的服務端Channel NioServerSocketChannel
和NioSocketChannel
對 doReadMessages()
和doReadBytes()
的各自實現
服務端, 咱們看到了,它的特化read()
是在建立新的 Jdk遠程channel, 由於它在建立新的鏈接chanel
@Override protected int doReadMessages(List<Object> buf) throws Exception { // todo java Nio底層在這裏 建立jdk底層的 原生channel SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { // todo 把java原生的channel, 封裝成 Netty自定義的封裝的channel , 這裏的buf是list集合對象,由上一層傳遞過來的 // todo this -- NioServerSocketChannel // todo ch -- SocketChnnel buf.add(new NioSocketChannel(this, ch)); return 1; } ...
客戶端, 讀取客戶端發送過來的IO數據
@Override protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); }
Netty的channel繼承體系,到如今就完成了, 相信,當咱們如今再從 NioServerEventLoop
入手,看他的初始化過程應該很簡單了, 其中我但願本身能夠牢記幾個點
AbstractChannel
維護NioChannel
的EventLoop
AbstractNioChannel
維護jdk原生 channel
AbstractChannel
中的AbstractUnsafe
主要是定義了一套模板,給子類提供了填空題,下面的三個填空
NioMessageUnsafe
NioByteUnsafe