EventLoop(netty源碼死磕4)

精進篇:netty源碼  死磕4-EventLoop的鬼斧神工html

目錄

1. EventLoop的鬼斧神工
2. 初識 EventLoop
3. Reactor模式回顧
3.1. Reactor模式的組成元素:
3.2. Reactor模式的三步曲
4. Netty中的Reactor模式應用
5. channel系列類結構
5.1. channel家族成員
5.2. NioSocketChannel 類的層次機構
5.3. netty channel 和本地Channel的關係
6. NioEventLoop
1.1. 和本地Selector的對應關係
1.2. 和Netty Channel的關係
7. Reactor三步曲之註冊
1.1. Netty中Channel註冊流程總覽
1.2. 註冊流程的關鍵代碼
1.3. 第三個參數有機關
8. Reactor三步曲之輪詢
1.1. Netty中EventLoop輪詢流程總覽
1.2. EventLoop線程啓動
1.3. NioEventLoop 事件輪詢
1.4. 取得就緒事件的個數
1.5. 就緒事件的迭代處理
1.6. processSelectedKey
9. Reactor三步曲之分派
1.1. dispatch(分派)結果
10. 總結java


1. 初識 EventLoop

閱讀netty的源碼,首先從最爲核心的、也是最爲基礎的EventLoop系列類入手。EventLoop 系列類,就像netty這座大廈的鋼筋混凝土框架,是很是重要的基礎設施。弄清楚EventLoop 的原理,是研讀和學習netty架構的前提。react

EventLoop 不是Netty中的一個類,而是一系列的類,或者說一組類。這一組類的做用,對應於Reactor模式的Reactor 角色。編程

呵呵,又回到了很是牛逼的反應器模式。promise

Reactor模式,是高性能JAVA編程的必知必會模式。首先熟悉Reactor模式,必定是磨刀不誤砍柴工。服務器

若是對Reactor模式還不太了了解,能夠翻閱《基礎篇:netty源碼  死磕3-傳說中神同樣的Reactor反應器模式》,此文站在巨人的肩膀上,對Reactor模式作了極爲詳盡的介紹。網絡

2. Reactor模式回顧

爲了更好的展開陳述,仍是簡單的總結一下Reactor模式。架構

1.1. Reactor模式的組成元素:

wps560D.tmp


channel和selector屬於 java.nio 包中的類,分別爲網絡通訊中的通道(鏈接)和選擇器。框架

Reactor和 handler 屬於Reactor模型高性能編程中的應用程序角色,分別爲反應器和處理器。異步

1.2. Reactor模式的三步曲

從開發或者執行流程上,Reactor模式能夠被清晰的被分紅三大步:註冊、輪詢、分發。

wps561E.tmp


第一步:註冊

將channel 通道的就緒事件,註冊到選擇器Selector。在文章《基礎篇:netty源碼  死磕3-傳說中神同樣的Reactor反應器模式》的例子中,這塊註冊的代碼,放在Reactor的構造函數中完成。通常來講,一個Reactor 對應一個選擇器Selector,一個Reactor擁有一個Selector成員屬性。

第二步:輪詢

輪詢的代碼,是Reactor重要的一個組成部分,或者說核心的部分。輪詢選擇器是否有就緒事件。

第三步:分發

將就緒事件,分發到事件附件的處理器handler中,由handler完成實際的處理。

整體上,Netty是基於Reactor模式實現的,對於就緒事件的處理總的流程,基本上就是上面的三步。

3. Netty中的Reactor模式應用

下面進行Netty的Reactor模型和經典Reactor的對照說明。

Netty的Reactor模型,和經典Reactor模型的元素對應關係以下圖:

wps562E.tmp

Netty中的Channel系列類型,對應於經典Reactor模型中的client, 封裝了用戶的通信鏈接。

Netty中的EventLoop系列類型,對應於經典Reactor模型中的Reactor,完成Channel的註冊、輪詢、分發。

Netty中的Handler系列類型,對應於經典Reactor模型中的Handler,不過Netty中的Handler設計得更加的高級和巧妙,使用了Pipeline模式。這塊很是精彩,後面專門開文章介紹。

總之,基本上一一對應。因此,若是熟悉經典的Reactor模式,學習Netty,會比較輕鬆。

4. channel系列類結構

1.3. channel家族成員

Netty 還支持很是多的通信鏈接協議,每種協議還有 NIO(異步 IO) 和 OIO(Old-IO, 即傳統的阻塞 IO) 版本的區別。對應於不一樣協議,都有不一樣的 Channel 類型與之對應。

下面是一些經常使用的 Channel 類型:

NioSocketChannel: 表明異步的客戶端 TCP Socket 鏈接.

NioServerSocketChannel: 異步的服務器端 TCP Socket 鏈接.

NioDatagramChannel: 異步的 UDP 鏈接

NioSctpChannel: 異步的客戶端 Sctp 鏈接.

NioSctpServerChannel: 異步的 Sctp 服務器端鏈接.

OioSocketChannel: 同步的客戶端 TCP Socket 鏈接.

OioServerSocketChannel: 同步的服務器端 TCP Socket 鏈接.

OioDatagramChannel: 同步的 UDP 鏈接

OioSctpChannel: 同步的 Sctp 服務器端鏈接.

OioSctpServerChannel: 同步的客戶端 TCP Socket 鏈接.

以當下的編程來講,通常來講,用到最多的通信協議,仍是 TCP 協議。因此,本文選取NioSocketChannel 類,做爲channel 鏈接家族類的表明,進行講解。

瞭解了NioSocketChannel 類,在使用的方法上,其餘的通道類型,基本上是同樣的。


1.4. NioSocketChannel 類的層次機構

wps564F.tmp

1.5. 和本地Channel的關係

閱讀NioSocketChannel 源碼,在其父類AbstractNioChannel中找到了一個特殊的成員屬性ch,這個成員的類型是 java本地類型SelectableChannel 。在《Java NIO Channel (netty源碼死磕1.3)》一文中已經講到過,SelectableChannel 類型這個是全部java本地非阻塞NIO 通道類型的父類。

private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;

也就是說,一個Netty Channel 類型,封裝了一個java非阻塞NIO 通道類型成員。這個被封裝的本地Java 通道成員ch,在AbstractNioChannel的構造函數中,被初始化。

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
    ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

由於是非阻塞IO, ch.configureBlocking(false) 方法被調用,通道被設置爲非阻塞。


5. NioEventLoop

終於到了重點。

Netty中的NioEventLoop類,就是對應於非阻塞IO channel的Reactor 反應器。

1.6. NioEventLoop 和本地Selector的對應關係

NioEventLoop類型綁定了兩個重要的java本地類型:一個線程類型,一個Selector類型。

wps565F.tmp

本地Selector屬性的做用,用於註冊Java本地channel。本地線程屬性的做用,主要是用於輪詢。

NioEventLoop 源碼以下

public final class NioEventLoop extends SingleThreadEventLoop {

………………….

/**
 * The NIO {@link Selector}.
 */
Selector selector; private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider;

private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;

…………………..

}

在父類SingleThreadEventExecutor 中,定義了一個線程屬性thread,源碼以下(省略了不相干的內容):

public abstract class SingleThreadEventExecutor ….{

………………….

private final Thread thread;
private final ThreadProperties threadProperties;

…………………..

}

線程何時啓動呢?

在reactor模式中,線程是輪詢用的。因此,Reactor線程的啓動,通常在channel的註冊以後。

1.7. EventLoop 和Netty Channel的關係

Netty中,一個EventLoop,能夠註冊不少不一樣的Netty Channel。至關因而一對多的關係。

這一點,和Java NIO中Selector和channel的關係,也是一致的。

wps567F.tmp


經過上面的分析,咱們已經知道了Netty 的非阻塞IO,是創建在Java 的NIO基礎之上的。

若是對Java 的NIO不瞭解,請閱讀下面的四文:

JAVA NIO 簡介 (netty源碼死磕1.1)

Java NIO Buffer(netty源碼死磕1.2)

Java NIO Channel (netty源碼死磕1.3)

Java NIO Selector (netty死磕1.4)

6. Reactor三步曲之註冊

wps5690.tmp


對於Java NIO而言,第一步首先是channel到 seletor的事件就緒狀態的註冊。對於Netty而言,也是相似的。

在此以前,Netty有一些啓動的工做須要完成。這些啓動的工做,包含了EventLoop、Channel的建立。 這塊BootStrap 的啓動類和系列啓動工做,後面有文章專門介紹。下面假定啓動工做已經完成和就緒,開始進行管道的註冊。

1.8. Netty中Channel註冊流程總覽

Channel向EventLoop註冊的過程,是在啓動時進行的。註冊的入口代碼,在啓動類AbstractBootstrap.initAndRegister 方法中。

註冊入口代碼以下:

final ChannelFuture initAndRegister() {

    // .........

    final Channel channel = channelFactory().newChannel();

    init(channel);

    ChannelFuture regFuture = group().register(channel);

}

完整的註冊調用流程以下:

wps56A1.tmp

1.9. 註冊流程的關鍵代碼

關鍵代碼以下,主要在AbstractChannel類中:

public abstract class AbstractChannel  ….{

// 內部類AbstractUnsafe

protected abstract class AbstractUnsafe implements Unsafe {

//倒數第三步

public final void register(EventLoop eventLoop, final ChannelPromise promise) {

    ………………….

  AbstractChannel.this.eventLoop = eventLoop;

if (eventLoop.inEventLoop()) {

   register0(promise);

}else {

try {

    eventLoop.execute(new Runnable() {

@Override

public void run() {

    register0(promise);

}

    });

} catch (Throwable t) {

    closeForcibly();

    closeFuture.setClosed();

    safeSetFailure(promise, t);

}

}

………………….

//倒數第二步:實際的註冊方法

private void register0(ChannelPromise promise) {

    boolean firstRegistration = neverRegistered;

    doRegister();

    neverRegistered = false;

    registered = true;

    safeSetSuccess(promise);

    pipeline.fireChannelRegistered();

    // Only fire a channelActive if the channel has never been registered. This prevents firing

    // multiple channel actives if the channel is deregistered and re-registered.

    if (firstRegistration && isActive()) {

        pipeline.fireChannelActive();

    }

}

…………………..

//倒數第一步,執行註冊

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
 selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

在倒數的第一步,也就是最後一步中,Netty的Channel經過javaChannel()方法,取得了Java本地Channel。

這個javaChannel()方法,它返回的是一個 Java NIO SocketChannel。

前面講到,AbstractNioChannel通道類有一個本地Java 通道成員ch,在AbstractNioChannel的構造函數中,被初始化。 javaChannel()取到的,就是這個ch成員屬性。經過最後一步,Netty終於將這個 SocketChannel 註冊到與 eventLoop 關聯的 selector 上了。


在註冊的倒數第三步:

AbstractChannel.this.eventLoop = eventLoop;

在這個 AbstractChannel#AbstractUnsafe.register 中,會將一個 EventLoop 賦值給 AbstractChannel 內部的 eventLoop 字段, 到這裏就完成了 EventLoop 與 Channel 的關聯過程.

反過來講:這一句,將 Channel 與對應的 EventLoop 關聯和綁定,也就是說, 每一個 Channel 都會關聯一個特定的 EventLoop。


在關聯好 Channel 和 EventLoop 後, 會繼續調用底層的 Java NIO SocketChannel 的 register 方法, 將底層的 Java NIO SocketChannel 註冊到指定的 selector 中。


慶祝下, 經過這兩步, 就完成了 Netty Channel 的註冊過程。 從上到下,所有關聯了哈。



1.10. 第三個參數有機關


到了這裏,先別太捉急高興。

再回到最後一步,看一下AbstractChannel  的doRegister() 代碼。

//倒數第一步,執行註冊

@Override
protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
 selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return;
        } catch (CancelledKeyException e) {
.....

   }
        }
    }
}

特別注意一下 register 的第三個參數,這個參數是設置 selectionKey 的附加對象的, 和調用 selectionKey.attach(object) 的效果同樣。

下面是經典Reactor模式的註冊代碼:

Reactor(int port) throws IOException
{ //Reactor初始化
    selector = Selector.open();
    serverSocket = ServerSocketChannel.open();
    serverSocket.socket().bind(new InetSocketAddress(port));
    //非阻塞
    serverSocket.configureBlocking(false);
    //分步處理,第一步,接收accept事件
 SelectionKey sk =
            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
    //attach callback object, Acceptor
    sk.attach(new Acceptor());
}

這段經典Reactor模式代碼中,調用二個參數的register 方法,而後再附加對象。這種分離附加對象的方式,和前面調用三個參數的register 方法,結果是同樣的。

再回到最後一步,看一下AbstractChannel  的doRegister() 代碼。

doRegister()所傳遞的第三個參數是 this,,它就是一個 NioSocketChannel 的實例。簡單的說——Netty將 SocketChannel 對象自身,以附加字段的方式添加到了selectionKey 中,供事件就緒後使用。



後面會怎麼樣使用這個附加字段呢?

且看Reactor三步曲之二——輪詢。



7. Reactor三步曲之輪詢

1.11. Netty中EventLoop輪詢流程總覽

EventLoop 做爲Reactor反應器的角色,是Reactor模式的核心。在Channel註冊完成以後,EventLoop 就會開啓輪詢模式。

整個輪詢的過程,和經典的Reactor模式的流程大體相同。在Netty中分爲如下四步。

wps56B1.tmp

在講解輪詢的流程前,首先介紹一下輪詢線程的啓動。

1.12. EventLoop線程啓動

前面講到,Netty中,一個 NioEventLoop 本質上是和一個特定的線程綁定, 這個線程保存在EvnentLoop的父類屬性中。

在EvnentLoop的父類SingleThreadEventExecutor 中,有一個 Thread thread 屬性, 存儲了一個本地 Java 線程。

線程在哪裏啓動的呢?

細心的你,有可能在前面已經發現了。

在前面的倒數第三步的註冊中,函數 AbstractChannel.AbstractUnsafe.register中,有一個eventLoop.execute()方法調用,這個調用,就是啓動EvnentLoop的本地線程的的入口。

重複貼一次,代碼以下:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
}
if (!isCompatible(eventLoop)) {
        promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
}
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
} else {
try {
            eventLoop.execute(new Runnable() {
@Override
public void run() {
                    register0(promise);
}
            });
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
    }
}

在execute的方法中,去調用 startThread(),啓動線程。

代碼以下:

@Override

public void execute(Runnable task) {

    if (task == null) {

        throw new NullPointerException("task");

    }

    boolean inEventLoop = inEventLoop();

    if (inEventLoop) {

        addTask(task);

    } else {

// 調用 startThread 方法, 啓動EventLoop 線程.

        startThread();

        addTask(task);

        if (isShutdown() && removeTask(task)) {

            reject();

        }

    }

    if (!addTaskWakesUp && wakesUpForTask(task)) {

        wakeup(inEventLoop);

    }

}

 SingleThreadEventExecutor.startThread() 方法中了:

private void startThread() {

    if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {

        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {

            thread.start();

        }

    }

}

終於看到牽腸掛肚的線程啓動方法了。

它既是:thread.start()

STATE_UPDATER 是 SingleThreadEventExecutor 內部維護的一個屬性, 它的做用是標識當前的 thread 的狀態。在初始的時候, STATE_UPDATER == ST_NOT_STARTED, 所以第一次調用 startThread() 方法時, 就會進入到 if 語句內, 進而調用到 thread.start().

1.13. NioEventLoop 事件輪詢

事件的輪詢,在NioEventLoop.run() 方法, 其源碼以下:

@Override

protected void run() {

    for (;;) {

        boolean oldWakenUp = wakenUp.getAndSet(false);

        try {

//第一步,查詢 IO 就緒

            if (hasTasks()) {

                selectNow();

            } else {

                select(oldWakenUp);

                if (wakenUp.get()) {

                    selector.wakeup();

                }

            }

//第二步,處理這些 IO 就緒

            cancelledKeys = 0;

            needsToSelectAgain = false;

            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {

 processSelectedKeys();

                runAllTasks();

            } else {

                final long ioStartTime = System.nanoTime();

  processSelectedKeys();

                final long ioTime = System.nanoTime() - ioStartTime;

                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

            }

            if (isShuttingDown()) {

                closeAll();

                if (confirmShutdown()) {

                    break;

                }

            }

        } catch (Throwable t) {

            ...

        }

    }

}

完成第二步IO就緒事件處理的調用是processSelectedKeys() ,這個調用很是關鍵。

這個方法是查詢就緒的 IO 事件, 而後處理它;第二個調用是 runAllTasks(), 這個方法的功能就是運行 taskQueue 中的任務。

關於EventLoop中如何處理任務,後面用專門的文章來說解。

1.14. 取得就緒事件的個數

void selectNow() throws IOException {

    try {

        selector.selectNow();

    } finally {

        // restore wakup state if needed

        if (wakenUp.get()) {

            selector.wakeup();

        }

    }

}

首先調用了 selector.selectNow() 方法,這個 selector 屬性正是 Java NIO 中的多路複用器 Selector。selector.selectNow() 方法會檢查當前是否有就緒的 IO 事件。若是有, 則返回就緒 IO 事件的個數;若是沒有, 則返回0。

注意, selectNow() 是當即返回的,不會阻塞當前線程。 當 selectNow() 調用後, finally 語句塊中會檢查 wakenUp 變量是否爲 true,當爲 true 時, 調用 selector.wakeup() 喚醒 select() 的阻塞調用。

1.15. 就緒事件的迭代處理

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {

    for (int i = 0;; i ++) {

        final SelectionKey k = selectedKeys[i];

        if (k == null) {

            break;

        }

        selectedKeys[i] = null;

 final Object a = k.attachment();

        if (a instanceof AbstractNioChannel) {

            processSelectedKey(k, (AbstractNioChannel) a);

        } else {

            @SuppressWarnings("unchecked")

            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

            processSelectedKey(k, task);

        }

        ...

    }

}

迭代 selectedKeys 獲取就緒的 IO 事件, 而後爲每一個事件都調用 processSelectedKey 來處理它.

在前面的channel註冊時,將 SocketChannel 所對應的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中。

在這裏, 經過k.attachment()取得這個通道對象,而後就調用 processSelectedKey 來處理這個 IO 事件和通道。

1.16. processSelectedKey

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {

    final NioUnsafe unsafe = ch.unsafe();

    ...

    try {

        int readyOps = k.readyOps();

        // 讀就緒

        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {

 unsafe.read();

            if (!ch.isOpen()) {

                // Connection already closed - no need to handle write.

                return;

            }

        }

        // 寫就緒

        if ((readyOps & SelectionKey.OP_WRITE) != 0) {

            ch.unsafe().forceFlush();

        }

        // 鏈接創建就緒事件

        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {

            ........

            int ops = k.interestOps();

            ops &= ~SelectionKey.OP_CONNECT;

            k.interestOps(ops);

            unsafe.finishConnect();

        }

    } catch (CancelledKeyException ignored) {

        unsafe.close(unsafe.voidPromise());

    }

}

上面的代碼,已經回到經典Reactor模式了。

processSelectedKey 中處理了三個事件, 分別是:

OP_READ, 可讀事件, 即 Channel 中收到了新數據可供上層讀取.

OP_WRITE, 可寫事件, 即上層能夠向 Channel 寫入數據.

OP_CONNECT, 鏈接創建事件, 即 TCP 鏈接已經創建, Channel 處於 active 狀態.

8. Reactor三步曲之分派

1.17. dispatch(分派)結果

在AbstractNioByteChannel 中,能夠找到 unsafe.read( ) 調用的實現代碼。 unsafe.read( )負責的是 Channel 的底層數據的 IO 讀取,而且將讀取的結果,dispatch(分派)給最終的Handler。

AbstractNioByteChannel.read()的關鍵源碼節選以下:

@Override

public final void read() {

    ...

    ByteBuf byteBuf = null;

    int messages = 0;

    boolean close = false;

    try {

        int totalReadAmount = 0;

        boolean readPendingReset = false;

        do {

             // 讀取結果.

            byteBuf = allocHandle.allocate(allocator);

            int writable = byteBuf.writableBytes();

            int localReadAmount = doReadBytes(byteBuf);

             ...

 // dispatch結果到Handler

            pipeline.fireChannelRead(byteBuf);

            byteBuf = null;

            ...

            totalReadAmount += localReadAmount;

            ...

    }

}

9. 總結

到此爲止,EventLoop的整個流程,已經分析完了

下一篇文章,將解讀Netty的Handler。



無編程不創客,無案例不學習。瘋狂創客圈,一大波高手正在交流、學習中!

瘋狂創客圈 Netty 死磕系列 10多篇深度文章博客園 總入口】  QQ羣:104131248

相關文章
相關標籤/搜索