Netty源碼 reactor 模型

 翻閱源碼時,咱們會發現netty中不少方法的調用都是經過線程池的方式進行異步的調用,html

這種  eventLoop.execute 方式的調用,實際上即是reactor線程。對應項目中使用普遍的NioEventLoop。還記得咱們建立的兩個reactor線程池麼,具體代碼能夠參考 Netty源碼 服務端的啓動java

 

首先來解釋下什麼事 reactor 線程react

Reactor模式是處理併發I/O比較常見的一種模式,用於同步I/O,中心思想是將全部要處理的I/O事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上;
一旦有I/O事件到來或是準備就緒(文件描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。

 

 

reactor線程的啓動


 io.netty.util.concurrent.SingleThreadEventExecutor#executepromise

reactor模型在第一次接受任務的時候,會啓動線程安全

 

外部線程在提交任務時,netty會判斷是不是當前前程是不是 SingleThreadEventExecutor中的Thread一致,若是不一致,說明須要啓動一個新的線程接受任務。而後就會調用內部線程池執行reactor模型的run方法併發

 

 

而後就會執行addTask將線程封裝成一個任務放到Queue中。Queue中的任務就是經過reactor線程來消費的。異步

 

reactor線程的執行


 reactor線程作了三件事socket

1.不斷的輪訓註冊到selector上channel的IO事件ide

2.處理IO事件,讀取channel中的事件,選擇一個work線程,準備執行任務oop

3.執行任務

 

 

上述三件事不斷的輪訓,下面咱們依次進行分析。

1.select輪訓

select輪訓很簡單,分爲如下幾步

1.延遲任務隊列0.5秒之內有任務則中斷

2.普通任務隊列有任務添加則中斷

3.阻塞select操做結束以後,netty又作了一系列的狀態判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有

  • 輪詢到IO事件 (selectedKeys != 0
  • oldWakenUp 參數爲true
  • 任務隊列裏面有任務(hasTasks
  • 第一個定時任務即將要被執行 (hasScheduledTasks()
  • 用戶主動喚醒(wakenUp.get()

4.解決jdk空輪訓bug,具體的bug咱們能夠看 https://www.jianshu.com/p/3ec120ca46b2

netty這邊會記錄每次輪訓的時間,若是輪訓的時間有效,累加器會加1,累加器到256以後,開始rebuildSelector,rebuildSelector的操做其實很簡單:new一個新的selector,將以前註冊到老的selector上的的channel從新轉移到新的selector上

 

Select步驟結束,表示輪訓到了io事件,那麼接下來咱們就要去處理這些事件

2.處理IO事件 

這裏出現了一個selectedKeys,selectedKeys的類型是SelectedSelectionKeySet,其實也就是一個Set集合

private SelectedSelectionKeySet selectedKeys;

這裏的SelectionKey又是什麼呢,咱們能夠看下類註釋

A selection key is created each time a channel is registered with a selector

每個channel在向selector註冊時都會建立一個SelectionKey。

 

暫且咱們先認爲 SelectionKey裏面包含了通道註冊時的一些信息。

如今咱們開始處理io事件

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
     for (int i = 0;; i ++) {
         // 1.取出IO事件以及對應的channel
         final SelectionKey k = selectedKeys[i];
         if (k == null) {
             break;
         }
         selectedKeys[i] = null;
         final Object a = k.attachment();
         // 2.處理該channel
         if (a instanceof AbstractNioChannel) {
             processSelectedKey(k, (AbstractNioChannel) a);
         } else {
             NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
             processSelectedKey(k, task);
         }
         // 3.判斷是否該再來次輪詢
         if (needsToSelectAgain) {
             for (;;) {
                 i++;
                 if (selectedKeys[i] == null) {
                     break;
                 }
                 selectedKeys[i] = null;
             }
             selectAgain();
             selectedKeys = this.selectedKeys.flip();
             i = -1;
         }
     }
 }

 

這裏的k.attachment()能轉換成AbstractNioChannel。

搜一下k.attach的調用關係,在io.netty.channel.nio.AbstractNioChannel#doRegister發現了以下代碼,這能解釋了selectionKey的建立

selectionKey = javaChannel().register(eventLoop().selector, 0, this);
public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }

 咱們如今再來看processSelectedKey方法,代碼精簡後實際上就是調用unsafe對不一樣的事件進行處理

 

3.處理任務隊列

netty中總共有三種任務類型

1.普通的eventLoop任務

channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                //task....
            }
        });

 

execute並無真正去執行,而是將任務進行了封裝。

public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

 

最終咱們的任務添加到了一個任務隊列中

protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }

 

這裏 taskQueue並非普通的任務隊列,而是Mpsc隊列,即多生產者單消費者隊列,netty使用mpsc,方便的將外部線程的task彙集,在reactor線程內部用單線程來串行執行

protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        // This event loop never calls takeTask()
        return PlatformDependent.newMpscQueue(maxPendingTasks);
    }

 

2.外部任務

服務端在接收到客戶端請求時,須要選擇相應的channel寫數據到客戶端

ctx.channel().writeAndFlush(responsePacket);

 

這種在用戶線程中的任務最終一樣會被封裝到任務隊列,channel的write最終代碼在io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) { if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }

 

先判斷是否在eventloop線程,這裏是false,最終封裝成一個task,執行safeExecutor

private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
        try {
            executor.execute(runnable);
        } catch (Throwable cause) {
            try {
                promise.setFailure(cause);
            } finally {
                if (msg != null) {
                    ReferenceCountUtil.release(msg);
                }
            }
        }
    }

 

接下來就和第一種狀況同樣,添加到隊列當作。

3.定時任務

第三種場景就是定時任務邏輯,相似以下

eventLoop().schedule(new Runnable() {
                            @Override
                            public void run() {
                                
                        }, connectTimeoutMillis, TimeUnit.MILLISECONDS);

 

schedule的實際邏輯也是一個添加任務隊列的過程

<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

 這裏的scheduledTaskQueue是一個優先級隊列,注意這裏的線程安全問題,若是不是在eventloop線程提交的,那麼就會把添加操做封裝成一個task,這個task的任務是添加[添加定時任務]的任務,而不是添加定時任務,其實也就是第二種場景,這樣,對 PriorityQueue的訪問就變成單線程,即只有reactor線程

Queue<ScheduledFutureTask<?>> scheduledTaskQueue() {
    if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>();
    }
    return scheduledTaskQueue;
}

 

 

如今再看runAllTasks方法

 

 分爲如下3步

  • 從scheduledTaskQueue轉移定時任務到taskQueue
  • 計算本次任務循環的截止時間並執行
  • 執行完成任務後的任務

代碼仍是至關清晰的。這裏再也不深刻。

 

最後咱們再來總結下,reactor模型實質上就幹了三件事情,首先他會不停的檢測是否有io事件發生或者 是否有任務快要發生,若是檢測到了,說明他要去幹活了。首先先去處理io事件,全部的io事件都是經過unsafe去處理。處理完io事件後便開始處理任務隊列裏面的隊列。

以上關於reactor模型的研究。

相關文章
相關標籤/搜索