Netty與Reactor模式

1. 引言

1.1 Netty、NIO、多線程

關於Netty與NIO、多線程之間的關係,能夠參考@李林鋒發佈的一個Netty5.0架構剖析和源碼解讀的文章,在這篇文章中詳細的介紹了Java I/O的演進過程和Linux I/O網絡模型以及針對Netty實現IO多路複用的源碼分析和邏輯架構。相信看完後會收穫很多。前面的文章咱們分析了Netty的結構,本章來分析最錯綜複雜的部分(Netty中多線程以及NIO的應用)。nginx

理清楚NIO與Netty的關係以前,咱們必需要先來看看Reactor模式,Netty是一個典型的多線程與Reactor模式的使用,理解了這部分在宏觀上理解Netty的NIO及多線程部分就不會有什麼困難了。git

本章依然是針對Netty3.7源碼進行分析。github

2. 什麼是Reactor?

2.1 Reactor的由來

Reactor是一種普遍應用在服務器端開發的設計模式。Reactor中文大多譯爲「反應堆」,我當初接觸這個概念的時候,就感受很厲害,是否是它的原理就跟「核反應」差很少?後來才知道其實沒有什麼關係,從Reactor的兄弟「Proactor」(多譯爲前攝器)就能看得出來,這兩個詞的中文翻譯其實都不是太好,不夠形象。實際上,Reactor模式又有別名「Dispatcher」或者「Notifier」,我以爲這兩個都更加能代表它的本質。編程

那麼,Reactor模式到底是個什麼東西呢?這要從事件驅動的開發方式提及。咱們知道,對於應用服務器,一個主要規律就是,CPU的處理速度是要遠遠快於IO速度的,若是CPU爲了IO操做(例如從Socket讀取一段數據)而阻塞顯然是不划算的。好一點的方法是分爲多進程或者線程去進行處理,可是這樣會帶來一些進程切換的開銷,試想一個進程一個數據讀了500ms,期間進程切換到它3次,可是CPU卻什麼都不能幹,就這麼切換走了,是否是也不划算?設計模式

這時先驅們找到了事件驅動,或者叫回調的方式,來完成這件事情。這種方式就是,應用業務向一箇中間人註冊一個回調(event handler),當IO就緒後,就這個中間人產生一個事件,並通知此handler進行處理。這種回調的方式,也體現了「好萊塢原則」(Hollywood principle)-「Don’t call us, we’ll call you」,在咱們熟悉的IoC中也有用到。看來軟件開發真是互通的!服務器

好了,咱們如今來看Reactor模式。在前面事件驅動的例子裏有個問題:咱們如何知道IO就緒這個事件,誰來充當這個中間人?Reactor模式的答案是:由一個不斷等待和循環的單獨進程(線程)來作這件事,它接受全部handler的註冊,並負責先操做系統查詢IO是否就緒,在就緒後就調用指定handler進行處理,這個角色的名字就叫作Reactor。網絡

2.2 Reactor與NIO

Java中的NIO能夠很好的和Reactor模式結合。關於NIO中的Reactor模式,我想沒有什麼資料能比Doug Lea大神(不知道Doug Lea?看看JDK集合包和併發包的做者吧)在《Scalable IO in Java》解釋的更簡潔和全面了。NIO中Reactor的核心是Selector,我寫了一個簡單的Reactor示例,這裏我貼一個核心的Reactor的循環(這種循環結構又叫作EventLoop),剩餘代碼在這裏。多線程

public void run() {
	try {
		while (!Thread.interrupted()) {
			selector.select();
			Set selected = selector.selectedKeys();
			Iterator it = selected.iterator();
			while (it.hasNext())
				dispatch((SelectionKey) (it.next()));
			selected.clear();
		}
	} catch (IOException ex) { /* ... */
	}
}

2.3 Reactor相關的概念

前面提到了Proactor模式,這又是什麼呢?簡單來講,Reactor模式裏,操做系統只負責通知IO就緒,具體的IO操做(例如讀寫)仍然是要在業務進程裏阻塞的去作的,而Proactor模式則更進一步,由操做系統將IO操做執行好(例如讀取,會將數據直接讀到內存buffer中),而handler只負責處理本身的邏輯,真正作到了IO與程序處理異步執行。因此咱們通常又說Reactor是同步IO,Proactor是異步IO。架構

關於阻塞和非阻塞、異步和非異步,以及UNIX底層的機制,你們能夠看看這篇文章IO – 同步,異步,阻塞,非阻塞 (亡羊補牢篇),以及陶輝(《深刻理解nginx》的做者)《高性能網絡編程》的系列。併發

3. Reactor與Netty之間的關係

3.1 多線程下的Reactor

講了一堆Reactor,咱們回到Netty。在《Scalable IO in Java》中講到了一種多線程下的Reactor模式。在這個模式裏,mainReactor只有一個,負責響應client的鏈接請求,並創建鏈接,它使用一個NIO Selector;subReactor能夠有一個或者多個,每一個subReactor都會在一個獨立線程中執行,而且維護一個獨立的NIO Selector。

這樣的好處很明顯,由於subReactor也會執行一些比較耗時的IO操做,例如消息的讀寫,使用多個線程去執行,則更加有利於發揮CPU的運算能力,減小IO等待時間。

輸入圖片說明

3.2 Netty中的Reactor與NIO

好了,瞭解了多線程下的Reactor模式,咱們來看看Netty吧(如下部分主要針對NIO,OIO部分更加簡單一點,不重複介紹了)。Netty裏對應mainReactor的角色叫作「Boss」,而對應subReactor的角色叫作」Worker」。Boss負責分配請求,Worker負責執行,好像也很貼切!以TCP的Server端爲例,這兩個對應的實現類分別爲NioServerBoss和NioWorker(Server和Client的Worker沒有區別,由於創建鏈接以後,雙方就是對等的進行傳輸了)。

Netty 3.7中Reactor的EventLoop在AbstractNioSelector.run()中,它實現了Runnable接口。這個類是Netty NIO部分的核心。它的邏輯很是複雜,其中還包括一些對JDK Bug的處理(例如rebuildSelector),剛開始讀的時候不須要深刻那麼細節。我精簡了大部分代碼,保留主幹以下:

abstract class AbstractNioSelector implements NioSelector {

    //NIO Selector
    protected volatile Selector selector;

    //內部任務隊列
    private final Queue taskQueue = new ConcurrentLinkedQueue();

    //selector循環
    public void run() {
        for (;;) {
            try {
                //處理內部任務隊列
                processTaskQueue();
                //處理selector事件對應邏輯
                process(selector);
            } catch (Throwable t) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // Ignore.
                }
            }
        }
    }

其中process是主要的處理事件的邏輯,例如在AbstractNioWorker中,處理邏輯以下:

protected void process(Selector selector) throws IOException {
    Set selectedKeys = selector.selectedKeys();
    if (selectedKeys.isEmpty()) {
        return;
    }
    for (Iterator i = selectedKeys.iterator(); i.hasNext();) {
        SelectionKey k = i.next();
        i.remove();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                if (!read(k)) {
                    // Connection already closed - no need to handle write.
                    continue;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                writeFromSelectorLoop(k);
            }
        } catch (CancelledKeyException e) {
            close(k);
        }

        if (cleanUpCancelledKeys()) {
            break; // break the loop to avoid ConcurrentModificationException
        }
    }
}

這不就是第二部分提到的selector經典用法了麼?

在4.0以後,做者以爲NioSelector這個叫法,以及區分NioBoss和NioWorker的作法稍微繁瑣了點,乾脆就將這些合併成了NioEventLoop,今後這兩個角色就不作區分了。我卻是以爲新版本的會更優雅一點。

3.3 Netty中的多線程

下面咱們來看Netty的多線程部分。一旦對應的Boss或者Worker啓動,就會分配給它們一個線程去一直執行。對應的概念爲BossPool和WorkerPool。對於每一個NioServerSocketChannel,Boss的Reactor有一個線程,而Worker的線程數由Worker線程池大小決定,可是默認最大不會超過CPU核數*2,固然,這個參數能夠經過NioServerSocketChannelFactory構造函數的參數來設置。

public NioServerSocketChannelFactory(
        Executor bossExecutor, Executor workerExecutor,
        int workerCount) {
    this(bossExecutor, 1, workerExecutor, workerCount);
}

最後咱們比較關心一個問題,咱們以前ChannlePipeline中的ChannleHandler是在哪一個線程執行的呢?答案是在Worker線程裏執行的,而且會阻塞Worker的EventLoop。例如,在NioWorker中,讀取消息完畢以後,會觸發MessageReceived事件,這會使得Pipeline中的handler都獲得執行。

protected boolean read(SelectionKey k) {
    ....

    if (readBytes > 0) {
        // Fire the event.
        fireMessageReceived(channel, buffer);
    }

    return true;
}

能夠看到,對於處理事件較長的業務,並不太適合直接放到ChannelHandler中執行。那麼怎麼處理呢?咱們在Handler部分會進行介紹。

最後附上項目github地址,歡迎交流:https://github.com/code4craft/netty-learning

相關文章
相關標籤/搜索