Netty工具類HashedWheelTimer源碼走讀(三)

接上一篇( http://my.oschina.net/haogrgr/blog/490266 )java


8. Worker代碼走讀. linux

//主要負責累加tick, 執行到期任務等.
private final class Worker implements Runnable {
    private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

    private long tick;

    @Override
    public void run() {
        //初始化startTime, startTime只是一個起始時間的標記, 任務的deadline是相對這個時間點來的.
        startTime = System.nanoTime();

        //由於nanoTime返回值可能爲0, 甚至負數, 因此這時賦值爲1, Timer中start方法會判斷該值, 直到不爲0才跳出循環.
        if (startTime == 0) { 
            startTime = 1;
        }

        //喚醒阻塞在Timer.start()方法上的線程, 表示已經啓動完成.
        startTimeInitialized.countDown();

        //只要仍是啓動狀態, 就一直循環
        do {
            //waitForNextTick方法主要是計算下次tick的時間, 而後sleep到下次tick
            //返回值就是System.nanoTime() - startTime, 也就是Timer啓動後到此次tick, 所過去的時間
            final long deadline = waitForNextTick();
            if (deadline > 0) {//可能溢出, 因此小於等於0無論
                
                //獲取index, 原理見Timer的構造方法註釋, 等價於 tick % wheel.length
                int idx = (int) (tick & mask);  
                
                //移除cancel了的task, 具體能夠見HashedWheelTimeout.cancel()方法註釋
                processCancelledTasks();

                //當前tick對應的wheel
                HashedWheelBucket bucket = wheel[idx];

                //由於添加任務是先加入到timeouts隊列中, 而這裏就是將任務從隊列中取出, 放到對應的bucket中
                transferTimeoutsToBuckets();

                //見上篇HashedWheelBucket.expireTimeouts()方法的註釋
                //具體是根據當前的deadline, 判斷bucket中的人物是否到期, 到期的任務就執行, 沒到期的, 就將人物輪數減一.
                //正常狀況下, 一個bucket在一輪中, 只會執行一次expireTimeouts方法.
                bucket.expireTimeouts(deadline);

                //累加tick
                tick++;
            }
        } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

        //返回調用stop()時, 還未處理的任務.
        for (HashedWheelBucket bucket: wheel) {
            bucket.clearTimeouts(unprocessedTimeouts);
        }

        //加上還沒來得及放入bucket中的任務
        for (;;) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                break;
            }
            if (!timeout.isCancelled()) {
                unprocessedTimeouts.add(timeout);
            }
        }

        //最好移除下cancel了的task
        processCancelledTasks();
    }

    //將Timer.newTimeout()調用放入到timeouts時的任務放入到對應的bucket中
    private void transferTimeoutsToBuckets() {
        //一次tick, 最多放入10w任務, 防止太多了, 形成worker線程在這裏停留過久.
        for (int i = 0; i < 100000; i++) {
            HashedWheelTimeout timeout = timeouts.poll();
            if (timeout == null) {
                //所有處理完了, 退出循環
                break;
            }
            if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                //還沒加入到bucket中, 就取消了, 繼續.
                continue;
            }

            //calculated 表示任務要通過多少個tick
            long calculated = timeout.deadline / tickDuration;

            //設置任務要通過的輪數
            timeout.remainingRounds = (calculated - tick) / wheel.length;

            //若是任務在timeouts隊列裏面放久了, 以致於已通過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 因而方法調用完後就會執行.
            final long ticks = Math.max(calculated, tick);
            int stopIndex = (int) (ticks & mask);//一樣, 相似於ticks % wheel.length
            
            //這時任務所在的bucket在wheel中的位置就表示, 通過n輪後, 還須要多少次tick才執行.
            HashedWheelBucket bucket = wheel[stopIndex];
            bucket.addTimeout(timeout);//將timeout加入到鏈表
        }
    }

    //將cancel任務從隊列中取出, 並執行cancel操做, 具體能夠見HashedWheelTimeout.cancel()方法註釋.
    private void processCancelledTasks() {
        for (;;) {
            Runnable task = cancelledTimeouts.poll();
            if (task == null) {
                // all processed
                break;
            }
            try {
                task.run();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while process a cancellation task", t);
                }
            }
        }
    }

    /**
     * calculate goal nanoTime from startTime and current tick number,
     * then wait until that goal has been reached.
     * @return Long.MIN_VALUE if received a shutdown request,
     * current time otherwise (with Long.MIN_VALUE changed by +1)
     */
    //sleep, 直到下次tick到來, 而後返回該次tick和啓動時間之間的時長
    private long waitForNextTick() {

        //下次tick的時間點, 用於計算須要sleep的時間
        long deadline = tickDuration * (tick + 1);

        //循環, 直到HashedWheelTimer被stop, 或者到了下個tick
        for (;;) {

            //計算須要sleep的時間, 之因此加9999999後再除10000000, 是由於保證爲10毫秒的倍數.
            final long currentTime = System.nanoTime() - startTime;
            long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

            if (sleepTimeMs <= 0) {//小於等於0, 表示本次tick已經到了, 返回.
                if (currentTime == Long.MIN_VALUE) {
                    return -Long.MAX_VALUE; //不懂不懂, 我不懂...估計又是nanoTime的問題.
                } else {
                    return currentTime; //返回過去的時間.
                }
            }

            // Check if we run on windows, as if thats the case we will need
            // to round the sleepTime as workaround for a bug that only affect
            // the JVM if it runs on windows.
            //
            // See https://github.com/netty/netty/issues/356
            if (PlatformDependent.isWindows()) {//很少說, 一個字, 屌
                sleepTimeMs = sleepTimeMs / 10 * 10;
            }

            try {
                Thread.sleep(sleepTimeMs);//睡吧
            } catch (InterruptedException ignored) {
                //當調用Timer.stop時, 退出
                if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                    return Long.MIN_VALUE;
                }
            }
        }
    }

    public Set<Timeout> unprocessedTimeouts() {
        return Collections.unmodifiableSet(unprocessedTimeouts);
    }
}


    終於搞完了, 具體看註釋吧, 很詳細的註釋.git



9. 備註. github

   在看代碼的時候, 大部分都好, 可是有些代碼看的很困惑, 好比說
算法

startTime = System.nanoTime();
if (startTime == 0) {
    // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
    startTime = 1;
}

//startTime 是volatile的, 而後沒有其餘地方修改startTime, 爲何這裏還要判斷下是否爲0...

    而後我就去羣裏問了問, 最後定位到是nanoTime的問題, API文檔說, 它連負數均可能返回~~~~windows


    大神(北京-菜鳥多年)貼的nanoTime實現.ide

jlong os::javaTimeNanos() {
  if (Linux::supports_monotonic_clock()) {
    struct timespec tp;
    int status = Linux::clock_gettime(CLOCK_MONOTONIC, &tp);
    assert(status == 0, "gettime error");
    jlong result = jlong(tp.tv_sec) * (1000 * 1000 * 1000) + jlong(tp.tv_nsec);
    return result;
  } else {
    timeval time;
    int status = gettimeofday(&time, NULL);
    assert(status != -1, "linux error");
    jlong usecs = jlong(time.tv_sec) * (1000 * 1000) + jlong(time.tv_usec);
    return 1000 * usecs;
  }
}

    北京-菜鳥多年 : 看了下 代碼  原來這個 clock_gettime函數 可能會發生時間迴繞 函數

    北京-菜鳥多年 : 而後  得到的納秒 就變成0了性能

    北京-菜鳥多年 : 不過, 須要很長時間優化

    北京-菜鳥多年 : 時間是遞增的  遞增到必定地步就溢出了  而後就從0開始

    北京-菜鳥多年 : timespec 這裏面  秒和納秒分開存儲的

    北京-菜鳥多年 : 就是爲了延長迴繞出現的概率

    北京-菜鳥多年 : else那個分支  就是  currentTimeMillis() 的實現

   北京-菜鳥多年 : 不過 是  納秒級別

   北京-菜鳥多年 : 和currentTimeMillis算法同樣的, 性能要慢些



10. 總結.

    任務裏不要有太耗時的操做, 不然會阻塞Worker線程, 致使tick不許.

    Wheel Timer, 確實是很精巧的算法, Netty實現的HashedWheelTimer也是通過大神們極致的優化而來的.

相關文章
相關標籤/搜索