接上一篇( http://my.oschina.net/haogrgr/blog/490266 )java
8. Worker代碼走讀. linux
//主要負責累加tick, 執行到期任務等. private final class Worker implements Runnable { private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>(); private long tick; @Override public void run() { //初始化startTime, startTime只是一個起始時間的標記, 任務的deadline是相對這個時間點來的. startTime = System.nanoTime(); //由於nanoTime返回值可能爲0, 甚至負數, 因此這時賦值爲1, Timer中start方法會判斷該值, 直到不爲0才跳出循環. if (startTime == 0) { startTime = 1; } //喚醒阻塞在Timer.start()方法上的線程, 表示已經啓動完成. startTimeInitialized.countDown(); //只要仍是啓動狀態, 就一直循環 do { //waitForNextTick方法主要是計算下次tick的時間, 而後sleep到下次tick //返回值就是System.nanoTime() - startTime, 也就是Timer啓動後到此次tick, 所過去的時間 final long deadline = waitForNextTick(); if (deadline > 0) {//可能溢出, 因此小於等於0無論 //獲取index, 原理見Timer的構造方法註釋, 等價於 tick % wheel.length int idx = (int) (tick & mask); //移除cancel了的task, 具體能夠見HashedWheelTimeout.cancel()方法註釋 processCancelledTasks(); //當前tick對應的wheel HashedWheelBucket bucket = wheel[idx]; //由於添加任務是先加入到timeouts隊列中, 而這裏就是將任務從隊列中取出, 放到對應的bucket中 transferTimeoutsToBuckets(); //見上篇HashedWheelBucket.expireTimeouts()方法的註釋 //具體是根據當前的deadline, 判斷bucket中的人物是否到期, 到期的任務就執行, 沒到期的, 就將人物輪數減一. //正常狀況下, 一個bucket在一輪中, 只會執行一次expireTimeouts方法. bucket.expireTimeouts(deadline); //累加tick tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); //返回調用stop()時, 還未處理的任務. for (HashedWheelBucket bucket: wheel) { bucket.clearTimeouts(unprocessedTimeouts); } //加上還沒來得及放入bucket中的任務 for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //最好移除下cancel了的task processCancelledTasks(); } //將Timer.newTimeout()調用放入到timeouts時的任務放入到對應的bucket中 private void transferTimeoutsToBuckets() { //一次tick, 最多放入10w任務, 防止太多了, 形成worker線程在這裏停留過久. for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { //所有處理完了, 退出循環 break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { //還沒加入到bucket中, 就取消了, 繼續. continue; } //calculated 表示任務要通過多少個tick long calculated = timeout.deadline / tickDuration; //設置任務要通過的輪數 timeout.remainingRounds = (calculated - tick) / wheel.length; //若是任務在timeouts隊列裏面放久了, 以致於已通過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 因而方法調用完後就會執行. final long ticks = Math.max(calculated, tick); int stopIndex = (int) (ticks & mask);//一樣, 相似於ticks % wheel.length //這時任務所在的bucket在wheel中的位置就表示, 通過n輪後, 還須要多少次tick才執行. HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout);//將timeout加入到鏈表 } } //將cancel任務從隊列中取出, 並執行cancel操做, 具體能夠見HashedWheelTimeout.cancel()方法註釋. private void processCancelledTasks() { for (;;) { Runnable task = cancelledTimeouts.poll(); if (task == null) { // all processed break; } try { task.run(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown while process a cancellation task", t); } } } } /** * calculate goal nanoTime from startTime and current tick number, * then wait until that goal has been reached. * @return Long.MIN_VALUE if received a shutdown request, * current time otherwise (with Long.MIN_VALUE changed by +1) */ //sleep, 直到下次tick到來, 而後返回該次tick和啓動時間之間的時長 private long waitForNextTick() { //下次tick的時間點, 用於計算須要sleep的時間 long deadline = tickDuration * (tick + 1); //循環, 直到HashedWheelTimer被stop, 或者到了下個tick for (;;) { //計算須要sleep的時間, 之因此加9999999後再除10000000, 是由於保證爲10毫秒的倍數. final long currentTime = System.nanoTime() - startTime; long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; if (sleepTimeMs <= 0) {//小於等於0, 表示本次tick已經到了, 返回. if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; //不懂不懂, 我不懂...估計又是nanoTime的問題. } else { return currentTime; //返回過去的時間. } } // Check if we run on windows, as if thats the case we will need // to round the sleepTime as workaround for a bug that only affect // the JVM if it runs on windows. // // See https://github.com/netty/netty/issues/356 if (PlatformDependent.isWindows()) {//很少說, 一個字, 屌 sleepTimeMs = sleepTimeMs / 10 * 10; } try { Thread.sleep(sleepTimeMs);//睡吧 } catch (InterruptedException ignored) { //當調用Timer.stop時, 退出 if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } } public Set<Timeout> unprocessedTimeouts() { return Collections.unmodifiableSet(unprocessedTimeouts); } }
終於搞完了, 具體看註釋吧, 很詳細的註釋.git
9. 備註. github
在看代碼的時候, 大部分都好, 可是有些代碼看的很困惑, 好比說
算法
startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } //startTime 是volatile的, 而後沒有其餘地方修改startTime, 爲何這裏還要判斷下是否爲0...
而後我就去羣裏問了問, 最後定位到是nanoTime的問題, API文檔說, 它連負數均可能返回~~~~windows
大神(北京-菜鳥多年)貼的nanoTime實現.ide
jlong os::javaTimeNanos() { if (Linux::supports_monotonic_clock()) { struct timespec tp; int status = Linux::clock_gettime(CLOCK_MONOTONIC, &tp); assert(status == 0, "gettime error"); jlong result = jlong(tp.tv_sec) * (1000 * 1000 * 1000) + jlong(tp.tv_nsec); return result; } else { timeval time; int status = gettimeofday(&time, NULL); assert(status != -1, "linux error"); jlong usecs = jlong(time.tv_sec) * (1000 * 1000) + jlong(time.tv_usec); return 1000 * usecs; } }
北京-菜鳥多年 : 看了下 代碼 原來這個 clock_gettime函數 可能會發生時間迴繞 函數
北京-菜鳥多年 : 而後 得到的納秒 就變成0了性能
北京-菜鳥多年 : 不過, 須要很長時間優化
北京-菜鳥多年 : 時間是遞增的 遞增到必定地步就溢出了 而後就從0開始
北京-菜鳥多年 : timespec 這裏面 秒和納秒分開存儲的
北京-菜鳥多年 : 就是爲了延長迴繞出現的概率
北京-菜鳥多年 : else那個分支 就是 currentTimeMillis() 的實現
北京-菜鳥多年 : 不過 是 納秒級別
北京-菜鳥多年 : 和currentTimeMillis算法同樣的, 性能要慢些
10. 總結.
任務裏不要有太耗時的操做, 不然會阻塞Worker線程, 致使tick不許.
Wheel Timer, 確實是很精巧的算法, Netty實現的HashedWheelTimer也是通過大神們極致的優化而來的.