Netty源碼分析第2章(NioEventLoop)---->第8節: 執行任務隊列

 

Netty源碼分析第二章: NioEventLoophtml

 

第八節: 執行任務隊列oop

繼續回到NioEventLoop的run()方法:源碼分析

protected void run() { for (;;) { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: //輪詢io事件(1)
                    select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; //默認是50
            final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { //記錄下開始時間
                final long ioStartTime = System.nanoTime(); try { //處理輪詢到的key(2)
 processSelectedKeys(); } finally { //計算耗時
                    final long ioTime = System.nanoTime() - ioStartTime; //執行task(3)
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } //代碼省略
 } }

咱們看處處理完輪詢到的key以後, 首先記錄下耗時, 而後經過runAllTasks(ioTime * (100 - ioRatio) / ioRatio)執行taskQueue中的任務學習

咱們知道ioRatio默認是50, 因此執行完ioTime * (100 - ioRatio) / ioRatio, 方法傳入的值爲ioTime, 也就是processSelectedKeys()的執行時間:fetch

 

跟進runAllTasks方法:優化

protected boolean runAllTasks(long timeoutNanos) { //定時任務隊列中聚合任務
 fetchFromScheduledTaskQueue(); //從普通taskQ裏面拿一個任務
    Runnable task = pollTask(); //task爲空, 則直接返回
    if (task == null) { //跑完全部的任務執行收尾的操做
 afterRunningAllTasks(); return false; } //若是隊列不爲空 //首先算一個截止時間(+50毫秒, 由於執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執行每個任務
    for (;;) { safeExecute(task); //標記當前跑完的任務
        runTasks ++; //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) { //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime(); //若是超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) { break; } } //若是沒有超過這個時間, 則繼續從普通任務隊列拿任務
        task = pollTask(); //直到沒有任務執行
        if (task == null) { //記錄下最後執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工做
 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }

首先會執行fetchFromScheduledTaskQueue()這個方法, 這個方法的意思是從定時任務隊列中聚合任務, 也就是將定時任務中找到能夠執行的任務添加到taskQueuethis

咱們跟進fetchFromScheduledTaskQueue()方法:spa

private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時任務隊列中抓取第一個定時任務 //尋找截止時間爲nanoTime的任務
    Runnable scheduledTask  = pollScheduledTask(nanoTime); //若是該定時任務隊列不爲空, 則塞到普通任務隊列裏面
    while (scheduledTask != null) { //若是添加到普通任務隊列過程當中失敗
        if (!taskQueue.offer(scheduledTask)) { //則從新添加到定時任務隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續從定時任務隊列中拉取任務 //方法執行完成以後, 全部符合運行條件的定時任務隊列, 都添加到了普通任務隊列中
        scheduledTask = pollScheduledTask(nanoTime); } return true; }

 long nanoTime = AbstractScheduledEventExecutor.nanoTime() 表明從定時任務初始化到如今過去了多長時間日誌

 Runnable scheduledTask= pollScheduledTask(nanoTime) 表明從定時任務隊列中拿到小於nanoTime時間的任務, 由於小於初始化到如今的時間, 說明該任務須要執行了netty

 

跟到其父類AbstractScheduledEventExecutorpollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); //拿到定時任務隊列
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; //peek()方法拿到第一個任務
    ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { //從隊列中刪除
 scheduledTaskQueue.remove(); //返回該任務
        return scheduledTask; } return null; }

咱們看到首先得到當前類綁定的定時任務隊列的成員變量

若是不爲空, 則經過scheduledTaskQueue.peek()彈出第一個任務

若是當前任務小於傳來的時間, 說明該任務須要執行, 則從定時任務隊列中刪除

 

咱們繼續回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); //從定時任務隊列中抓取第一個定時任務 //尋找截止時間爲nanoTime的任務
    Runnable scheduledTask  = pollScheduledTask(nanoTime); //若是該定時任務隊列不爲空, 則塞到普通任務隊列裏面
    while (scheduledTask != null) { //若是添加到普通任務隊列過程當中失敗
        if (!taskQueue.offer(scheduledTask)) { //則從新添加到定時任務隊列中
            scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //繼續從定時任務隊列中拉取任務 //方法執行完成以後, 全部符合運行條件的定時任務隊列, 都添加到了普通任務隊列中
        scheduledTask = pollScheduledTask(nanoTime); } return true; }

彈出須要執行的定時任務以後, 咱們經過taskQueue.offer(scheduledTask)添加到taskQueue, 若是添加失敗, 則經過scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask)從新添加到定時任務隊列中

 

若是添加成功, 則經過pollScheduledTask(nanoTime)方法繼續添加, 直到沒有須要執行的任務

這樣就將定時任務隊列須要執行的任務添加到了taskQueue

 

回到runAllTasks(long timeoutNanos)方法中:

protected boolean runAllTasks(long timeoutNanos) { //定時任務隊列中聚合任務
 fetchFromScheduledTaskQueue(); //從普通taskQ裏面拿一個任務
    Runnable task = pollTask(); //task爲空, 則直接返回
    if (task == null) { //跑完全部的任務執行收尾的操做
 afterRunningAllTasks(); return false; } //若是隊列不爲空 //首先算一個截止時間(+50毫秒, 由於執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執行每個任務
    for (;;) { safeExecute(task); //標記當前跑完的任務
        runTasks ++; //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) { //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime(); //若是超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) { break; } } //若是沒有超過這個時間, 則繼續從普通任務隊列拿任務
        task = pollTask(); //直到沒有任務執行
        if (task == null) { //記錄下最後執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工做
 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }

首先經過 Runnable task = pollTask() 從taskQueue中拿一個任務

任務不爲空, 則經過 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos 計算一個截止時間, 任務的執行時間不能超過這個時間

而後在for循環中經過safeExecute(task)執行task

 

咱們跟到safeExecute(task):

protected static void safeExecute(Runnable task) { try { //直接調用run()方法執行
 task.run(); } catch (Throwable t) { //發生異常不終止
        logger.warn("A task raised an exception. Task: {}", task, t); } }

這裏直接調用taskrun()方法進行執行, 其中發生異常, 只打印一條日誌, 表明發生異常不終止, 繼續往下執行

 

回到runAllTasks(long timeoutNanos)方法:

protected boolean runAllTasks(long timeoutNanos) { //定時任務隊列中聚合任務
 fetchFromScheduledTaskQueue(); //從普通taskQ裏面拿一個任務
    Runnable task = pollTask(); //task爲空, 則直接返回
    if (task == null) { //跑完全部的任務執行收尾的操做
 afterRunningAllTasks(); return false; } //若是隊列不爲空 //首先算一個截止時間(+50毫秒, 由於執行任務, 不要超過這個時間)
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; //執行每個任務
    for (;;) { safeExecute(task); //標記當前跑完的任務
        runTasks ++; //當跑完64個任務的時候, 會計算一下當前時間
        if ((runTasks & 0x3F) == 0) { //定時任務初始化到當前的時間
            lastExecutionTime = ScheduledFutureTask.nanoTime(); //若是超過截止時間則不執行(nanoTime()是耗時的)
            if (lastExecutionTime >= deadline) { break; } } //若是沒有超過這個時間, 則繼續從普通任務隊列拿任務
        task = pollTask(); //直到沒有任務執行
        if (task == null) { //記錄下最後執行時間
            lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } //收尾工做
 afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; }

每次執行完task, runTasks自增

 

這裏 if ((runTasks & 0x3F) == 0) 表明是否執行了64個任務, 若是執行了64個任務, 則會經過 lastExecutionTime = ScheduledFutureTask.nanoTime() 記錄定時任務初始化到如今的時間, 若是這個時間超過了截止時間, 則退出循環

 

若是沒有超過截止時間, 則經過 task = pollTask() 繼續彈出任務執行

這裏執行64個任務統計一次時間, 而不是每次執行任務都統計, 主要緣由是由於獲取系統時間是個比較耗時的操做, 這裏是netty的一種優化方式

若是沒有task須要執行, 則經過afterRunningAllTasks()作收尾工做, 最後記錄下最後的執行時間

以上就是有關執行任務隊列的相關邏輯

 

 

第二章總結

 

        本章學習了有關NioEventLoopGroup的建立, NioEventLoop的建立和啓動, 以及多路複用器的輪詢處理和task執行的相關邏輯, 經過本章學習, 咱們應該掌握以下內容:

        1.  NioEventLoopGroup如何選擇分配NioEventLoop

        2.  NioEventLoop如何開啓

        3.  NioEventLoop如何進行select操做

        4.  NioEventLoop如何執行task

 

 

上一節: 處理IO事件

下一節: 初始化NioSocketChannelConfig

相關文章
相關標籤/搜索