Netty源碼分析之NioEventLoop執行流程

NioEventLoop啓動觸發條件:java

1.服務端綁定本地端口bootstrap

2.新鏈接接入經過chooser綁定一個NioEventLoop數組

服務端綁定本地端口promise

  綁定本地端口,使用下面方法;安全

ChannelFuture future = bootstrap.bind(host, port).sync();

  最終會調用doBind0()方法:異步

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if(regFuture.isSuccess()) {
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

  這個時候就會調用channel對應NioEventLoop的execute方法,會判斷是否在當前的eventloop對應的thread中,若是在,直接向任務隊列中添加綁定端口的任務,若是不在,首先要start當前eventLoop對應的thread,再將任務放到任務隊列中。這裏的excute(task)方法,並非讓線程直接執行它,而是將它放到線程的任務隊列中,等待線程去執行它。ide

public void execute(Runnable task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        boolean inEventLoop = this.inEventLoop();
        if(inEventLoop) {
            this.addTask(task);
        } else {
            this.startThread();
            this.addTask(task);
            if(this.isShutdown() && this.removeTask(task)) {
                reject();
            }
        }

        if(!this.addTaskWakesUp && this.wakesUpForTask(task)) {
            this.wakeup(inEventLoop);
        }

    }
}

  這裏會調用startThread去啓動一個線程,首先會根據狀態判斷線程是否建立成功,不然使用CAS去建立線程,並調用一個doStartThread去建立一個FastThreadLocalThread,而且這個函數會將一個NioEventLoop與一個thread進行綁定。函數

private void startThread() {
    if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) {
        this.doStartThread();
    }
}

NioEventLoop線程執行邏輯oop

  NioEventLoop對應線程的run方法,run()方法裏面是一個死循環,主要的邏輯是首先採用select檢查是否有IO事件,若是有IO事件,就採用processSelectedKey()對IO事件進行處理,最後調用runAllTasks()處理任務隊列中的任務。fetch

protected void run() {
    while(true) {
        boolean oldWakenUp = this.wakenUp.getAndSet(false);

        try {
            if(this.hasTasks()) {
                this.selectNow();
            } else {
                this.select(oldWakenUp);
                if(this.wakenUp.get()) {
                    this.selector.wakeup();
                }
            }

            this.cancelledKeys = 0;
            this.needsToSelectAgain = false;
            int t = this.ioRatio;
            if(t == 100) {
                this.processSelectedKeys();
                this.runAllTasks();
            } else {
                long e = System.nanoTime();
                this.processSelectedKeys();
                long ioTime = System.nanoTime() - e;
                this.runAllTasks(ioTime * (long)(100 - t) / (long)t);
            }

            if(this.isShuttingDown()) {
                this.closeAll();
                if(this.confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable var8) {
            logger.warn("Unexpected exception in the selector loop.", var8);

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException var7) {
                ;
            }
        }
    }
}

  這段代碼中的ioRadio是控制執行IO事件和執行任務隊列中的任務的一個事件比,默認是50,表明執行IO事件處理和執行任務隊列的任務事件比是1:1。

1)使用select檢測IO事件

  經過Selector的select()方法能夠選擇已經準備就緒的通道 (這些通道包含你感興趣的的事件)。好比你對讀就緒的通道感興趣,那麼select()方法就會返回讀事件已經就緒的那些通道。Java中的Selector幾個重載的select()方法:

  • int select():阻塞到至少有一個通道在你註冊的事件上就緒了。
  • int select(long timeout):和select()同樣,但最長阻塞時間爲timeout毫秒。
  • int selectNow():非阻塞,只要有通道就緒就馬上返回。

  select()方法返回的int值表示有多少通道已經就緒,是自上次調用select()方法後有多少通道變成就緒狀態。以前在select()調用時進入就緒的通道不會在本次調用中被記入,而在前一次select()調用進入就緒但如今已經不在處於就緒的通道也不會被記入。例如:首次調用select()方法,若是有一個通道變成就緒狀態,返回了1,若再次調用select()方法,若是另外一個通道就緒了,它會再次返回1。若是對第一個就緒的channel沒有作任何操做,如今就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道就緒了。

  一旦調用select()方法,而且返回值不爲0時,則能夠經過調用Selector的selectedKeys()方法來訪問已選擇鍵集合 。以下: 

Set selectedKeys=selector.selectedKeys(); 

  Netty中首先判斷任務隊列是否爲空,若是爲空的話,就採用select(ltimeout)有超時設置的阻塞方法,若是不爲空的話,就調用非阻塞的selectNow()方法,由於即便沒有IO事件處理,也能夠對任務隊列中的任務進行處理。Netty中NioEventLoop的select和selectNow方法其實底層仍是依靠selector的select方法。

void selectNow() throws IOException {
    try {
        this.selector.selectNow();
    } finally {
        if(this.wakenUp.get()) {
            this.selector.wakeup();
        }

    }

}

private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;

    try {
        int e = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos);

        while(true) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if(timeoutMillis <= 0L) {
                if(e == 0) {
                    selector.selectNow();
                    e = 1;
                }
                break;
            }

            int selectedKeys = selector.select(timeoutMillis);
            ++e;
            if(selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) {
                break;
            }

            if(Thread.interrupted()) {
                if(logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }

                e = 1;
                break;
            }

            long time = System.nanoTime();
            if(time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                e = 1;
            } else if(SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && e >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding selector.", Integer.valueOf(e));
                this.rebuildSelector();
                selector = this.selector;
                selector.selectNow();
                e = 1;
                break;
            }

            currentTimeNanos = time;
        }

        if(e > 3 && logger.isDebugEnabled()) {
            logger.debug("Selector.select() returned prematurely {} times in a row.", Integer.valueOf(e - 1));
        }
    } catch (CancelledKeyException var13) {
        if(logger.isDebugEnabled()) {
            logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", var13);
        }
    }

}

  能夠看到調用selectNow方法是直接調用java nio的select.selectNow方法,而Netty的select方法中有一個參數oldWakeUp記錄當前操做是不是喚醒狀態(不太清楚這個喚醒狀態的做用),每次進行select操做以前,會將其標誌位false,表示要進行select操做,並且是未喚醒狀態。

  Netty中的select方法首先是根據當前時間時間去計算截止時間,這裏使用到了超時隊列(超時隊列的做用也不太清楚),而後根據截止時間去計算超時時間,若是超時時間小於0,就執行selectNow操做,並退出這次select操做,不然執行帶有超時時間的select方法,若是返回的selectKey不等於0,也就是有channel在select上註冊了,或者該select操做被喚醒了(?),或者任務隊列中有了任務,定時任務隊列中有了任務,都會break出來。

  接下來的代碼邏輯是避免JDK空輪詢的,當JDK發生了空輪訓,select會直接返回,這時並無IO事件到達,也沒有超過超時時間,這樣會致使線程進入死循環,CPU利用率飆升至100%,JDK到如今也並無解決這個問題。

  而Netty是經過記錄空輪詢的次數,若是這個次數達到了一個上限,上限默認是512,那麼就新建一個selector,將註冊在老selector上的channel註冊到新的selector上,而且關閉老的selector,將新的selector替代老的selector。Netty經過rebuildSelector方法重建selector。

public void rebuildSelector() {
    if(!this.inEventLoop()) {
        this.execute(new Runnable() {
            public void run() {
                NioEventLoop.this.rebuildSelector();
            }
        });
    } else {
        Selector oldSelector = this.selector;
        if(oldSelector != null) {
            Selector newSelector;
            try {
                newSelector = this.openSelector();
            } catch (Exception var9) {
                logger.warn("Failed to create a new Selector.", var9);
                return;
            }

            int nChannels = 0;

            label69:
            while(true) {
                try {
                    Iterator t = oldSelector.keys().iterator();

                    while(true) {
                        if(!t.hasNext()) {
                            break label69;
                        }

                        SelectionKey key = (SelectionKey)t.next();
                        Object a = key.attachment();

                        try {
                            if(key.isValid() && key.channel().keyFor(newSelector) == null) {
                                int e = key.interestOps();
                                key.cancel();
                                SelectionKey var14 = key.channel().register(newSelector, e, a);
                                if(a instanceof AbstractNioChannel) {
                                    ((AbstractNioChannel)a).selectionKey = var14;
                                }

                                ++nChannels;
                            }
                        } catch (Exception var11) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", var11);
                            if(a instanceof AbstractNioChannel) {
                                AbstractNioChannel var13 = (AbstractNioChannel)a;
                                var13.unsafe().close(var13.unsafe().voidPromise());
                            } else {
                                NioTask task = (NioTask)a;
                                invokeChannelUnregistered(task, key, var11);
                            }
                        }
                    }
                } catch (ConcurrentModificationException var12) {
                    ;
                }
            }

            this.selector = newSelector;

            try {
                oldSelector.close();
            } catch (Throwable var10) {
                if(logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", var10);
                }
            }

            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    }
}

2)processSelectedKey()

netty中selectedKey的優化

  經過調用Selector的selectedKeys()方法來訪問已選擇鍵集合,此時返回的是HashSet。可是netty是經過反射的方式,將HashSet替換成數組pssSelectedKeysOptimized去處理IO事件。

 

 

private Selector openSelector() {
    AbstractSelector selector;
    try {
        selector = this.provider.openSelector();
    } catch (IOException var7) {
        throw new ChannelException("failed to open a new selector", var7);
    }

    if(DISABLE_KEYSET_OPTIMIZATION) {
        return selector;
    } else {
        try {
            SelectedSelectionKeySet t = new SelectedSelectionKeySet();
            Class selectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader());
            if(!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);
            selectedKeysField.set(selector, t);
            publicSelectedKeysField.set(selector, t);
            this.selectedKeys = t;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
        } catch (Throwable var6) {
            this.selectedKeys = null;
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, var6);
        }

        return selector;
    }
}

 

  首先會調用JDK的openSelector方法返回建立的selector,而後會判斷是否要對keySet進行優化,經過判斷DISABLE_KEYSET_OPTIMIZATION,是否要對keyset進行優化,默認是要對keyset進行優化的。這裏的SelectedSelectionKeySet是優化事後的keyset,底層是經過兩個數組加上兩個數組的大小進行實現的,這樣可使得add操做達到o(1)的時間複雜度(可是是HashSet的add操做時間複雜度不也是o(1))嘛,

 

processSelectedKey調用processSelectedKeysOptimized

  該方法的流程就是遍歷數組中全部的selectedKey,一旦遍歷完,就將該引用指向爲空。獲取每個selectorKey對應的channel,而後經過調用processSelectedKey去處理該channel上感興趣的事件。

private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
    int i = 0;
  
  //遍歷SelectedKsys while(true) { SelectionKey k = selectedKeys[i]; if(k == null) { return; } selectedKeys[i] = null;
     //獲取selectKey對應的channel Object a = k.attachment(); if(a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel)((AbstractNioChannel)a)); } else { NioTask task = (NioTask)a; processSelectedKey(k, (NioTask)task); } if(this.needsToSelectAgain) { while(selectedKeys[i] != null) { selectedKeys[i] = null; ++i; } this.selectAgain(); selectedKeys = this.selectedKeys.flip(); i = -1; } ++i; } }

  這裏處理selector上面的IO事件,底層其實都是經過channel的unsafe類進行操做的,這裏read和accept事件對應的都是channel的read方法。

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if(!k.isValid()) {
        unsafe.close(unsafe.voidPromise());
    } else {
        try {
            int ignored = k.readyOps();
       //若是是read或者accept事件就對channel進行讀操做 if((ignored & 17) != 0 || ignored == 0) { unsafe.read(); if(!ch.isOpen()) { return; } }
       //write事件 if((ignored & 4) != 0) { ch.unsafe().forceFlush(); }
       //connect事件 if((ignored & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException var5) { unsafe.close(unsafe.voidPromise()); } } }

 

3)使用runAllTasks()執行任務隊列中的事件

  定時任務隊列是一個PriorityQueue(優先級隊列),定時的任務的排序是按照任務的截止時間排序的,也是非線程安全的隊列。

private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    if(task == null) {
        throw new NullPointerException("task");
    } else {
        if(this.inEventLoop()) {
            this.delayedTaskQueue.add(task);
        } else {
            this.execute(new Runnable() {
                public void run() {
                    SingleThreadEventExecutor.this.delayedTaskQueue.add(task);
                }
            });
        }

        return task;
    }
}

  runAllTask首先從定時任務隊列中拉取定時任務,將須要執行的定時任務加入到普通任務隊列中,並計算截止時間,而後循環的從普通任務隊列中拉取任務,並執行任務,這裏判斷是否到達超時時間,是每相隔64個任務,就判斷是否到達最大任務執行時間。爲啥要每隔64個任務判斷是否超時呢?由於nanoTime也是比較費時的。

protected boolean runAllTasks(long timeoutNanos) {
    this.fetchFromDelayedQueue();
    Runnable task = this.pollTask();
    if(task == null) {
        return false;
    } else {
        long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0L;

        long lastExecutionTime;
        while(true) {
            try {
                task.run();
            } catch (Throwable var11) {
                logger.warn("A task raised an exception.", var11);
            }

            ++runTasks;
            if((runTasks & 63L) == 0L) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if(lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = this.pollTask();
            if(task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
}

  從定時隊列中拉取任務,這裏拉取的任務是拉取截止時間不超過nanoTime的任務,將任務從定時任務隊列中刪除,將任務加入到普通任務隊列中。這個while循環執行完成以後,全部須要執行的定時任務所有都加入到普通任務隊列中。

private void fetchFromDelayedQueue() {
    long nanoTime = 0L;

    while (true) {
        ScheduledFutureTask delayedTask = (ScheduledFutureTask) this.delayedTaskQueue.peek();
        if (delayedTask == null) {
            break;
        }

        if (nanoTime == 0L) {
            nanoTime = ScheduledFutureTask.nanoTime();
        }

        if (delayedTask.deadlineNanos() > nanoTime) {
            break;
        }

        this.delayedTaskQueue.remove();
        this.taskQueue.add(delayedTask);
    }
}

   定時任務隊列是一個優先級隊列,隊列按照優先級進行排序,這裏的優先級是每一個任務的截止時間,隊列是按照截止時間的遲早對任務進行排序的。

public int compareTo(Delayed o) {
    if(this == o) {
        return 0;
    } else {
        ScheduledFutureTask that = (ScheduledFutureTask)o;
        long d = this.deadlineNanos() - that.deadlineNanos();
        if(d < 0L) {
            return -1;
        } else if(d > 0L) {
            return 1;
        } else if(this.id < that.id) {
            return -1;
        } else if(this.id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }
}

總結:

1.默認狀況下,NioEventLoopGroup會建立2*cpu個數的線程池,在調用NioEventLoop.execute(task)的時候,若是當前的NioEventLoop沒有建立本身的線程,就會建立線程。

2.Netty如何解決JDK空輪訓bug?經過計算空輪訓操做的個數,這裏的空輪訓的判斷是既沒有IO事件的到達,也沒有達到超時時間,若是空輪訓的個數超過閾值(512),就會新建一個selector,將舊selector的selectorKey註冊到新的selector上,將舊的selector關閉,用新的selector替代舊的selector。

3.Netty在全部外部線程調用NioEventLoop的操做時,若是經過InEventLoop判斷是否在NioEventLoop所屬的線程,若是不在經過startThread啓動NioEventLoop的線程,而且將任務添加到NioEventLoop的任務隊列中,全部NioEventLoop對應一個線程,其中的操做只會被一個線程所執行,實現了異步串行無鎖化。

 4.當NioEventLoop第一次調用execute()方法,會新建一個FastThreadLocalThread與NioEventLoop綁定。

相關文章
相關標籤/搜索