《java.util.concurrent 包源碼閱讀》27 Phaser 第一部分

Phaser是JDK7新添加的線程同步輔助類,做用同CyclicBarrier,CountDownLatch相似,可是使用起來更加靈活:函數

1. Parties是動態的。this

2. Phaser支持樹狀結構,即Phaser能夠有一個父Phaser。spa

 

Phaser的構造函數涉及到兩個參數:父Phaser和初始的parties,所以提供了4個構造函數:線程

public Phaser();
public Phaser(int parties);

public Phaser(Phaser parent);
public Phaser(Phaser parent, int parties);

 

由於Phaser的特點在在於動態的parties,所以首先來看動態更新parties是如何實現的。code

Phaser提供了兩個方法:register和bulkRegister,前者會添加一個須要同步的線程,後者會添加parties個須要同步的線程。對象

    public int register() {
        return doRegister(1);
    }

    // 增長了參數的檢查
    public int bulkRegister(int parties) {
        if (parties < 0)
            throw new IllegalArgumentException();
        if (parties == 0)
            return getPhase();
        return doRegister(parties);
    }

兩個方法都調用了doRegister方法,所以接下來就來看看doRegister方法blog

在分析doRegister以前先來講說Phaser的成員變量:state,它存儲了Phaser的狀態信息:ci

private volatile long state;

1. state的最高位是一個標誌位,1表示Phaser的線程同步已經結束,0表示線程同步正在進行get

2. state的低32位中,低16位表示沒有到達的線程數量,高16位表示Parties值同步

3. state的高32位除了最高位以外的其餘31位表示的Phaser的phase,能夠理解爲第多少次同步(從0開始計算)。

 

介紹完了state,來看方法doRegister

    private int doRegister(int registrations) {
        // 把registrations值同時加到parties值和還未達到的線程數量中去
        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
        final Phaser parent = this.parent;
        int phase;
        for (;;) {
            long s = state;
            int counts = (int)s;
            int parties = counts >>> PARTIES_SHIFT;
            int unarrived = counts & UNARRIVED_MASK;
            // 超過了容許的最大parties
            if (registrations > MAX_PARTIES - parties)
                throw new IllegalStateException(badRegister(s));
            // 最高位爲1,表示Phaser的線程同步已經結束
            else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
                break;
            // Phaser中的parties不是0
            else if (counts != EMPTY) {
                // 若是當前Phaser沒有父Phaser,或者若是有父Phaser,
                // 刷新本身的state值,若是新後的state沒有變化。
                // 這裏新子Phaser的緣由在於,會出現父Phaser已經進入下一個phase
                // 而子Phaser卻沒有及時進入下一個phase的延遲現象
                if (parent == null || reconcileState() == s) {
                    // 若是全部線程都到達了,等待Phaser進入下一次同步開始
                    if (unarrived == 0)
                        root.internalAwaitAdvance(phase, null);
                    // 更新state成功,跳出循環完成註冊
                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                       s, s + adj))
                        break;
                }
            }
            // 第一次註冊,且不是子Phaser
            else if (parent == null) {
                // 更新當前Phaser的state值成功則完成註冊
                long next = ((long)phase << PHASE_SHIFT) | adj;
                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                    break;
            }
            // 第一次註冊到子Phaser
            else {
                // 鎖定當前Phaser對象
                synchronized (this) {
                    // 再次檢查state值,確保沒有被更新
                    if (state == s) {
                        // 註冊到父Phaser中去
                        parent.doRegister(1);
                        do { // 獲取當前phase值
                            phase = (int)(root.state >>> PHASE_SHIFT);
                        } while (!UNSAFE.compareAndSwapLong
                                 (this, stateOffset, state,
                                  ((long)phase << PHASE_SHIFT) | adj));// 更新當前Phaser的state值
                        break;
                    }
                }
            }
        }
        return phase;
    }

看完了註冊,那麼來看同步操做的arrive,這裏也涉及到兩個方法:arrive和arriveAndDeregister,前者會等待其餘線程的到達,後者則會馬上返回:

    public int arrive() {
        return doArrive(false);
    }

    public int arriveAndDeregister() {
        return doArrive(true);
    }

兩個方法都調用了doArrive方法,區別在於參數一個是false,一個是true。那麼來看doArrive:

    private int doArrive(boolean deregister) {
        // arrive須要把未到達的線程數減去1,
        // deregister爲true,須要把parties值也減去1
        int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
        final Phaser root = this.root;
        for (;;) {
            // 若是是有父Phaser,首先刷新本身的state
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            int counts = (int)s;
            int unarrived = (counts & UNARRIVED_MASK) - 1;
            // 最高位爲1,表示同步已經結束,返回phase值
            if (phase < 0)
                return phase;
            // 若是parties爲0或者在這次arrive以前全部線程到達
            else if (counts == EMPTY || unarrived < 0) {
                // 對於非子Phaser來講,上述狀況的arrive確定是非法的
                // 對於子Phaser首先刷新一下狀態再作檢查
                if (root == this || reconcileState() == s)
                    throw new IllegalStateException(badArrive(s));
            }
            // 正常狀況下,首先更新state
            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
                // 全部線程都已經到達
                if (unarrived == 0) {
                    // 計算parties做爲下一個phase的未到達的parties
                    long n = s & PARTIES_MASK;
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    // 調用父Phaser的doArrive
                    if (root != this)
                        // 若是下一個phase的未到達的parties爲0,則須要向
                        // 父Phaser取消註冊
                        return parent.doArrive(nextUnarrived == 0);
                    // 正在進入下一個Phase,默認的實現是nextUnarrived爲0
                    // 表示正在進入下一個Phase,由於下一個phase的parties
                    // 爲0,須要等待parties不爲0
                    if (onAdvance(phase, nextUnarrived))
                        // 正在等待下一個phase,設置狀態爲終止
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        // 下一個phase的parties爲0,更新未到達的parties的值
                        n |= EMPTY;
                    else
                        // 更新下一個phase的未到達的parties的值
                        n |= nextUnarrived;
                    // phase值加1
                    n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;

                    // 更新state值
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);

                    // 喚醒等待的線程
                    releaseWaiters(phase);
                }
                return phase;
            }
        }
    }

關於arrive還有一個方法:arriveAndAwaitAdvance。這個方法會等到下一個phase開始再返回,相等於doArrive方法添加了awaitAdvance方法的功能。基本邏輯和上面說的doArrive方法相似:

    public int arriveAndAwaitAdvance() {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            int counts = (int)s;
            int unarrived = (counts & UNARRIVED_MASK) - 1;
            if (phase < 0)
                return phase;
            else if (counts == EMPTY || unarrived < 0) {
                // 對於非子Phaser來講,由於能夠等待下一個phase,
                // 因此不是非法arrive
                if (reconcileState() == s)
                    throw new IllegalStateException(badArrive(s));
            }
            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                               s -= ONE_ARRIVAL)) {
                // 還有其餘線程沒有達到,就會等待直到下一個phase開始
                if (unarrived != 0)
                    return root.internalAwaitAdvance(phase, null);
                if (root != this)
                    return parent.arriveAndAwaitAdvance();
                long n = s & PARTIES_MASK;  // base of next state
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (onAdvance(phase, nextUnarrived))
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    n |= EMPTY;
                else
                    n |= nextUnarrived;
                int nextPhase = (phase + 1) & MAX_PHASE;
                n |= (long)nextPhase << PHASE_SHIFT;
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT);
                releaseWaiters(phase);
                return nextPhase;
            }
        }
    }

 

這一部分主要講了Phaser的動態更新parties以及線程的arrive,下一部分將會分析線程等待的實現。

相關文章
相關標籤/搜索