翻閱源碼時,咱們會發現netty中不少方法的調用都是經過線程池的方式進行異步的調用,html
這種 eventLoop.execute
方式的調用,實際上即是reactor線程。對應項目中使用普遍的NioEventLoop。還記得咱們建立的兩個reactor線程池麼,具體代碼能夠參考 Netty源碼 服務端的啓動java
首先來解釋下什麼事 reactor 線程react
Reactor模式是處理併發I/O比較常見的一種模式,用於同步I/O,中心思想是將全部要處理的I/O事件註冊到一箇中心I/O多路複用器上,同時主線程/進程阻塞在多路複用器上;
一旦有I/O事件到來或是準備就緒(文件描述符或socket可讀、寫),多路複用器返回並將事先註冊的相應I/O事件分發到對應的處理器中。
io.netty.util.concurrent.SingleThreadEventExecutor#execute
promise
reactor模型在第一次接受任務的時候,會啓動線程安全
外部線程在提交任務時,netty會判斷是不是當前前程是不是 SingleThreadEventExecutor中的Thread一致,若是不一致,說明須要啓動一個新的線程接受任務。而後就會調用內部線程池執行reactor模型的run方法併發
而後就會執行addTask將線程封裝成一個任務放到Queue中。Queue中的任務就是經過reactor線程來消費的。異步
reactor線程作了三件事socket
1.不斷的輪訓註冊到selector上channel的IO事件ide
2.處理IO事件,讀取channel中的事件,選擇一個work線程,準備執行任務oop
3.執行任務
上述三件事不斷的輪訓,下面咱們依次進行分析。
select輪訓很簡單,分爲如下幾步
1.延遲任務隊列0.5秒之內有任務則中斷
2.普通任務隊列有任務添加則中斷
3.阻塞select操做結束以後,netty又作了一系列的狀態判斷來決定是否中斷本次輪詢,中斷本次輪詢的條件有
selectedKeys != 0
)hasTasks
)hasScheduledTasks()
)wakenUp.get()
)4.解決jdk空輪訓bug,具體的bug咱們能夠看 https://www.jianshu.com/p/3ec120ca46b2
netty這邊會記錄每次輪訓的時間,若是輪訓的時間有效,累加器會加1,累加器到256以後,開始rebuildSelector,rebuildSelector
的操做其實很簡單:new一個新的selector,將以前註冊到老的selector上的的channel從新轉移到新的selector上
Select步驟結束,表示輪訓到了io事件,那麼接下來咱們就要去處理這些事件
這裏出現了一個selectedKeys,selectedKeys的類型是SelectedSelectionKeySet,其實也就是一個Set集合
private SelectedSelectionKeySet selectedKeys;
這裏的SelectionKey又是什麼呢,咱們能夠看下類註釋
A selection key is created each time a channel is registered with a selector
每個channel在向selector註冊時都會建立一個SelectionKey。
暫且咱們先認爲 SelectionKey裏面包含了通道註冊時的一些信息。
如今咱們開始處理io事件
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { // 1.取出IO事件以及對應的channel final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); // 2.處理該channel if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 3.判斷是否該再來次輪詢 if (needsToSelectAgain) { for (;;) { i++; if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; } selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } } }
這裏的k.attachment()能轉換成
AbstractNioChannel。
搜一下k.attach的調用關係,在io.netty.channel.nio.AbstractNioChannel#doRegister發現了以下代碼,這能解釋了selectionKey的建立
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; }
咱們如今再來看processSelectedKey方法,代碼精簡後實際上就是調用unsafe對不一樣的事件進行處理
netty中總共有三種任務類型
channel.eventLoop().execute(new Runnable() { @Override public void run() { //task.... } });
execute並無真正去執行,而是將任務進行了封裝。
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
最終咱們的任務添加到了一個任務隊列中
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (!offerTask(task)) { reject(task); } } final boolean offerTask(Runnable task) { if (isShutdown()) { reject(); } return taskQueue.offer(task); }
這裏 taskQueue並非普通的任務隊列,而是Mpsc隊列,即多生產者單消費者隊列,netty使用mpsc,方便的將外部線程的task彙集,在reactor線程內部用單線程來串行執行
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return PlatformDependent.newMpscQueue(maxPendingTasks); }
服務端在接收到客戶端請求時,須要選擇相應的channel寫數據到客戶端
ctx.channel().writeAndFlush(responsePacket);
這種在用戶線程中的任務最終一樣會被封裝到任務隊列,channel的write最終代碼在io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } safeExecute(executor, task, promise, m); } }
先判斷是否在eventloop線程,這裏是false,最終封裝成一個task,執行safeExecutor
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) { try { executor.execute(runnable); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { if (msg != null) { ReferenceCountUtil.release(msg); } } } }
接下來就和第一種狀況同樣,添加到隊列當作。
第三種場景就是定時任務邏輯,相似以下
eventLoop().schedule(new Runnable() { @Override public void run() { }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
schedule的實際邏輯也是一個添加任務隊列的過程
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
這裏的scheduledTaskQueue是一個優先級隊列,注意這裏的線程安全問題,若是不是在eventloop線程提交的,那麼就會把添加操做封裝成一個task,這個task的任務是添加[添加定時任務]的任務,而不是添加定時任務,其實也就是第二種場景,這樣,對 PriorityQueue
的訪問就變成單線程,即只有reactor線程
Queue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new PriorityQueue<ScheduledFutureTask<?>>(); } return scheduledTaskQueue; }
如今再看runAllTasks方法
分爲如下3步
代碼仍是至關清晰的。這裏再也不深刻。
最後咱們再來總結下,reactor模型實質上就幹了三件事情,首先他會不停的檢測是否有io事件發生或者 是否有任務快要發生,若是檢測到了,說明他要去幹活了。首先先去處理io事件,全部的io事件都是經過unsafe去處理。處理完io事件後便開始處理任務隊列裏面的隊列。
以上關於reactor模型的研究。