Netty工具類HashedWheelTimer源碼走讀(二)

接上一篇( http://my.oschina.net/haogrgr/blog/489320 )
java


6. HashedWheelTimeout源碼走讀.git

//任務的包裝類, 鏈表結構, 負責保存deadline, 輪數, 等
//繼承MpscLinkedQueueNode, 是由於timeous隊列是MpscLinkedQueue, 裏面對MpscLinkedQueueNode有特殊處理(併發優化)
private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout>
        implements Timeout {

    private static final int ST_INIT = 0;
    private static final int ST_CANCELLED = 1;
    private static final int ST_EXPIRED = 2;
    private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER;

    static {
        AtomicIntegerFieldUpdater<HashedWheelTimeout> updater =
                PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state");
        if (updater == null) {
            updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");
        }
        STATE_UPDATER = updater;
    }

    private final HashedWheelTimer timer; //timer引用
    private final TimerTask task; //要執行的任務引用
    private final long deadline; //Timer啓動時間 - 任務執行時間(任務加入時間+任務延遲時間)

    @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" })
    private volatile int state = ST_INIT;

    //離任務執行還要等待的輪數, 當任務加入到wheel中時計算該值, 並在Worker中, 每過一輪, 該值減一.
    long remainingRounds;

    //雙鏈表, 由於只有Worker這一個線程訪問, 因此不須要synchronization / volatile.
    HashedWheelTimeout next;
    HashedWheelTimeout prev;

    //HashedWheelTimeout 所在的 wheel
    HashedWheelBucket bucket;

    HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
        this.timer = timer;
        this.task = task;
        this.deadline = deadline;
    }

    @Override
    public Timer timer() {
        return timer;
    }

    @Override
    public TimerTask task() {
        return task;
    }

    @Override
    public boolean cancel() {
        // only update the state it will be removed from HashedWheelBucket on next tick.
        //這裏只修改狀態從ST_INIT到ST_CANCELLED
        if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
            return false;
        }

        //若是狀態修改爲功, 則表示第一次調用cancel方法, 將HashedWheelTimeout從bucked中移除的操做封裝,
        //加入到cancelled隊列, 等待下一次tick再移除, 跟蹤下了源碼歷史發現之因此這麼作, 是爲了對GC友好, 之前取消任務要等到下一輪纔會被處理,
        //因而, 改爲將cancel的任務放在timeous隊列裏, 而後統一處理, timeous隊列是MpscLinkedQueue, 裏面對MpscLinkedQueueNode有特殊處理,
        //然而, 後面又發現有鎖的問題, 由於timeous這個隊列可能被多個線程操做(HashedWheelTimer.newTimeout()), 開始是加鎖的, 
        //因而, 將cancel任務另外存一個隊列, 這樣, 就不須要使用鎖了, 具體見:
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
        timer.cancelledTimeouts.add(new Runnable() {
            @Override
            public void run() {
                HashedWheelBucket bucket = HashedWheelTimeout.this.bucket;
                if (bucket != null) {
                    bucket.remove(HashedWheelTimeout.this);
                }
            }
        });
        return true;
    }

    public boolean compareAndSetState(int expected, int state) {
        return STATE_UPDATER.compareAndSet(this, expected, state);
    }

    public int state() {
        return state;
    }

    @Override
    public boolean isCancelled() {
        return state() == ST_CANCELLED;
    }

    @Override
    public boolean isExpired() {
        return state() == ST_EXPIRED;
    }

    @Override
    public HashedWheelTimeout value() {
        return this;
    }

    //到期, 執行任務
    public void expire() {
        if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
            return;
        }

        try {
            task.run(this);
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
            }
        }
    }
}


    果真是大牛, 各類優化, 看了下源碼的提交記錄, 截取幾段:github

    1) https://github.com/netty/netty/commit/1f68479e3cd94deb3172edd3c01aa74f35032b9b   (之前wheel用的HashSet, 改爲了數組)shell

Motivation:
At the moment there are two issues with HashedWheelTimer:
* the memory footprint of it is pretty heavy (250kb fon an empty instance)
* the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen.

Modification:
Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a MPSC queue to handover the timeouts.  So volatile writes are reduced to a minimum and also the memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance.

Result:
Lower memory-footprint and better performance

   

    2) https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1 數組

Motivation:
At the moment the HashedWheelTimer will only remove the cancelled Timeouts once the HashedWheelBucket is processed again. Until this the instance will not be able to be GC'ed as there are still strong referenced to it even if the user not reference it by himself/herself. This can cause to waste a lot of memory even if the Timeout was cancelled before.

Modification:
Add a new queue which holds CancelTasks that will be processed on each tick to remove cancelled Timeouts. Because all of this is done only by the WorkerThread there is no need for synchronization and only one extra object creation is needed when cancel() is executed. For addTimeout(...) no new overhead is introduced.

Result:
Less memory usage for cancelled Timeouts.


    3) https://github.com/netty/netty/commit/44ea769f537bf16b833d03db844b1f3067b3acd7 併發

Motivation:
Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE.

Modification:
Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC'ed.

Result:
No more NPE


    回到主題, 代碼並不複雜, 開始看的時候, 發現繼承了MpscLinkedQueueNode, 可是又沒有地方用到, 後面看了下, 發現MpscLinkedQueue對其有特殊處理.app

    能夠看到HashedWheelTimeout就是對Timeout任務的包裝, 鏈表結構方便加入wheel, 記錄deadline, remainingRounds, state等信息, ide



7. HashedWheelBucket 源碼走讀. 優化

//用來存放HashedWheelTimeout, 結構有點像linked-list, 方便移除操做.
private static final class HashedWheelBucket {

    //鏈表結構
    private HashedWheelTimeout head;
    private HashedWheelTimeout tail;

    //添加HashedWheelTimeout, 鏈表操做, 很少說~~~
    public void addTimeout(HashedWheelTimeout timeout) {
        assert timeout.bucket == null;
        timeout.bucket = this;
        if (head == null) {
            head = tail = timeout;
        } else {
            tail.next = timeout;
            timeout.prev = tail;
            tail = timeout;
        }
    }

    //當tick到該wheel的時候, Worker會調用這個方法, 根據deadline來判斷任務是否過時(remainingRounds是否爲0), 
    //任務到期就執行, 沒到期, 就timeout.remainingRounds--, 由於走到這裏, 表示改wheel裏的任務又過了一輪了.
    public void expireTimeouts(long deadline) {
        HashedWheelTimeout timeout = head;

        //遍歷鏈表
        while (timeout != null) {
            boolean remove = false;
            if (timeout.remainingRounds <= 0) {//任務已到執行點
                if (timeout.deadline <= deadline) {
                    timeout.expire();
                } else {
                    // The timeout was placed into a wrong slot. This should never happen.
                    throw new IllegalStateException(String.format(
                            "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                }
                remove = true;
            } else if (timeout.isCancelled()) {
                remove = true;
            } else {//沒到期, 剩餘輪數減一
                timeout.remainingRounds --;
            }

            //先保存next, 由於移除後, 再獲取timeout.next會爲空.
            HashedWheelTimeout next = timeout.next;
            if (remove) {//當以到期, 或者被取消, 就將timeou從鏈表中移除
                remove(timeout);
            }
            timeout = next;
        }
    }

    //鏈表移除, 很少說
    public void remove(HashedWheelTimeout timeout) {
        HashedWheelTimeout next = timeout.next;
        if (timeout.prev != null) {
            timeout.prev.next = next;
        }
        if (timeout.next != null) {
            timeout.next.prev = timeout.prev;
        }

        if (timeout == head) {
            if (timeout == tail) {
                tail = null;
                head = null;
            } else {
                head = next;
            }
        } else if (timeout == tail) {
            tail = timeout.prev;
        }
        timeout.prev = null;
        timeout.next = null;
        timeout.bucket = null;
    }

    //Clear this bucket and return all not expired / cancelled {@link Timeout}s.
    public void clearTimeouts(Set<Timeout> set) {
        for (;;) {
            HashedWheelTimeout timeout = pollTimeout();
            if (timeout == null) {
                return;
            }
            if (timeout.isExpired() || timeout.isCancelled()) {
                continue;
            }
            set.add(timeout);
        }
    }

    //鏈表的poll
    private HashedWheelTimeout pollTimeout() {
        HashedWheelTimeout head = this.head;
        if (head == null) {
            return null;
        }
        HashedWheelTimeout next = head.next;
        if (next == null) {
            tail = this.head =  null;
        } else {
            this.head = next;
            next.prev = null;
        }

        head.next = null;
        head.prev = null;
        head.bucket = null;
        return head;
    }
}


    能夠看到, 代碼也不復雜, 主要是提供一個相似於LinkedList的容器, 用來存放HashedWheelTimeout, 並提供expireTimeouts(long deadline) 方法來處理該wheel中的任務. 具體處理看註釋.this


    字數限制... 接第二篇..., 還剩最後的Worker的代碼.

相關文章
相關標籤/搜索