Netty源碼解析 -- FastThreadLocal與HashedWheelTimer

Netty源碼分析系列文章已接近尾聲,本文再來分析Netty中兩個常見組件:FastThreadLoca與HashedWheelTimer。
源碼分析基於Netty 4.1.52算法

FastThreadLocal

FastThreadLocal比較簡單。
FastThreadLocal和FastThreadLocalThread是配套使用的。
FastThreadLocalThread繼承了Thread,FastThreadLocalThread#threadLocalMap 是一個InternalThreadLocalMap,該InternalThreadLocalMap對象只能用於當前線程。
InternalThreadLocalMap#indexedVariables是一個數組,存放了當前線程全部FastThreadLocal對應的值。
而每一個FastThreadLocal都有一個index,用於定位InternalThreadLocalMap#indexedVariables。
數組

FastThreadLocal#get微信

public final V get() {
    // #1
    InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
    // #2
    Object v = threadLocalMap.indexedVariable(index);
    if (v != InternalThreadLocalMap.UNSET) {
        return (V) v;
    }
    // #3
    return initialize(threadLocalMap);
}

#1 獲取該線程的InternalThreadLocalMap
若是是FastThreadLocalThread,直接獲取FastThreadLocalThread#threadLocalMap。
不然,從UnpaddedInternalThreadLocalMap.slowThreadLocalMap獲取該線程InternalThreadLocalMap。
注意,UnpaddedInternalThreadLocalMap.slowThreadLocalMap是一個ThreadLocal,這裏實際回退到使用ThreadLocal了。
#2 每一個FastThreadLocal都有一個index。
經過該index,獲取InternalThreadLocalMap#indexedVariables中存放的值
#3 找不到值,經過initialize方法構建新對象。源碼分析

能夠看到,FastThreadLocal中連hash算法都不用,經過下標獲取對應的值,複雜度爲log(1),天然很快啦。性能

HashedWheelTimer

HashedWheelTimer是Netty提供的時間輪調度器。
時間輪是一種充分利用線程資源進行批量化任務調度的調度模型,可以高效的管理各類延時任務。
簡單說,就是將延時任務存放到一個環形隊列中,並經過執行線程定時執行該隊列的任務。this

例如,
環形隊列上有60個格子,
執行線程每秒移動一個格子,則環形隊列每輪可存放1分鐘內的任務。
如今有兩個定時任務
task1,32秒後執行
task2,2分25秒後執行
而執行線程當前位於第6格子
則task1放到32+6=38格,輪數爲0
task2放到25+6=31個,輪數爲2
執行線程將執行當前格子輪數爲0的任務,並將其餘任務輪數減1。
imagespa

缺點,時間輪調度器的時間精度不高。
由於時間輪算法的精度取決於執行線程移動速度。
例如上面例子中執行線程每秒移動一個格子,則調度精度小於一秒的任務就沒法準時調用。線程

HashedWheelTimer關鍵字段code

// 任務執行器,負責執行任務
Worker worker = new Worker();
// 任務執行線程
Thread workerThread;
//  HashedWheelTimer狀態, 0 - init, 1 - started, 2 - shut down
int workerState;
// 時間輪隊列,使用數組實現
HashedWheelBucket[] wheel;
// 暫存新增的任務
Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
// 已取消任務
Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

添加延遲任務 HashedWheelTimer#newTimeoutorm

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    ...

    // #1
    start();

    // #2
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    ...
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    timeouts.add(timeout);
    return timeout;
}

#1 若是HashedWheelTimer未啓動,則啓動該HashedWheelTimer
HashedWheelTimer#start方法負責是啓動workerThread線程
#2 startTime是HashedWheelTimer啓動時間
deadline是相對HashedWheelTimer啓動的延遲時間
構建HashedWheelTimeout,添加到HashedWheelTimer#timeouts

時間輪運行 Worker#run

public void run() {
    ...

    // #1
    startTimeInitialized.countDown();

    do {
        // #2
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            // #3
            int idx = (int) (tick & mask);
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // #4
            transferTimeoutsToBuckets();
            // #5
            bucket.expireTimeouts(deadline);
            // #6
            tick++;
        }
    } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // #7
    ...
}

#1 HashedWheelTimer#start方法阻塞HashedWheelTimer線程直到Worker啓動完成,這裏解除HashedWheelTimer線程阻塞。
#2 計算下一格子開始執行的時間,而後sleep到下次格子開始執行時間
#2 tick是從HashedWheelTimer啓動後移動的總格子數,這裏獲取tick對應的格子索引。
因爲Long類型足夠大,這裏並不考慮溢出問題。
#4 將HashedWheelTimer#timeouts的任務遷移到對應的格子中
#5 處理已到期任務
#6 移動到下一個格子
#7 這裏是HashedWheelTimer#stop後的邏輯處理,取消任務,中止時間輪

遷移任務 Worker#transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
    // #1
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            continue;
        }
        // #2
        long calculated = timeout.deadline / tickDuration;
        // #3
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        // #4
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        // #5
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        bucket.addTimeout(timeout);
    }
}

#1 注意,每次只遷移100000個任務,以避免阻塞線程
#2 任務延遲時間/每格時間數, 獲得該任務需延遲的總格子移動數
#3 (總格子移動數 - 已移動格子數)/每輪格子數,獲得輪數
#4 若是任務在timeouts隊列放得過久致使已通過了執行時間,則使用當前tick, 也就是放到當前bucket,以便儘快執行該任務
#5 計算tick對應格子索引,放到對應的格子位置

執行到期任務 HashedWheelBucket#expireTimeouts

public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    while (timeout != null) {
        HashedWheelTimeout next = timeout.next;
        // #1
        if (timeout.remainingRounds <= 0) {
            // #2
            next = remove(timeout);
            if (timeout.deadline <= deadline) {
                // #3
                timeout.expire();
            } else {
                throw new IllegalStateException(String.format(
                        "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            // #4
            timeout.remainingRounds --;
        }
        timeout = next;
    }
}

#1 選擇輪數小於等於0的任務
#2 移除任務
#3 修改狀態爲過時,並執行任務
#4 其餘任務輪數減1

ScheduledExecutorService使用堆(DelayedWorkQueue)維護任務,新增任務複雜度爲O(logN)。
而 HashedWheelTimer 新增任務複雜度爲O(1),因此在任務很是多時, HashedWheelTimer 能夠表現出它的優點。
可是任務較少甚至沒有任務時,HashedWheelTimer的執行線程都須要不斷移動,也會形成性能消耗。
注意,HashedWheelTimer使用同一個線程調用和執行任務,若是某些任務執行時間太久,則影響後續定時任務執行。固然,咱們也能夠考慮在任務中另起線程執行邏輯。
另外,若是任務過多,也會致使任務長期滯留在HashedWheelTimer#timeouts中而不能及時執行。

若是您以爲本文不錯,歡迎關注個人微信公衆號,系列文章持續更新中。您的關注是我堅持的動力!

相關文章
相關標籤/搜索