一篇文章搞定——JDK8中新增的StampedLock

1、StampedLock類簡介

StampedLock類,在JDK1.8時引入,是對讀寫鎖ReentrantReadWriteLock的加強,該類提供了一些功能,優化了讀鎖、寫鎖的訪問,同時使讀寫鎖之間能夠互相轉換,更細粒度控制併發。java

首先明確下,該類的設計初衷是做爲一個內部工具類,用於輔助開發其它線程安全組件,用得好,該類能夠提高系統性能,用很差,容易產生死鎖和其它莫名其妙的問題。node

1.1 StampedLock的引入

上一篇文章,講解了讀寫鎖——ReentrantReadWriteLock原理詳解 ,那麼爲何有了ReentrantReadWriteLock,還要引入StampedLock?編程

ReentrantReadWriteLock使得多個讀線程同時持有讀鎖(只要寫鎖未被佔用),而寫鎖是獨佔的。segmentfault

可是,讀寫鎖若是使用不當,很容易產生「飢餓」問題:api

好比在讀線程很是多,寫線程不多的狀況下,很容易致使寫線程「飢餓」,雖然使用「公平」策略能夠必定程度上緩解這個問題,可是「公平」策略是以犧牲系統吞吐量爲代價的。安全

1.2 StampedLock的特色

try系列獲取鎖的函數,當獲取鎖失敗後會返回爲0的stamp值。當調用釋放鎖和轉換鎖的方法時候須要傳入獲取鎖時候返回的stamp值。併發

StampedLockd的內部實現是基於CLH鎖的,CLH鎖原理:鎖維護着一個等待線程隊列,全部申請鎖且失敗的線程都記錄在隊列。一個節點表明一個線程,oracle

保存着一個標記位locked,用以判斷當前線程是否已經釋放鎖。當一個線程試圖獲取鎖時,從隊列尾節點做爲前序節點,循環判斷全部的前序節點是否已經成功釋放鎖。框架

2、StampedLock使用示例

先來看一個Oracle官方的例子:函數

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        long stamp = sl.writeLock();    //涉及對共享資源的修改,使用寫鎖-獨佔操做
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            sl.unlockWrite(stamp);
        }
    }

    /**
     * 使用樂觀讀鎖訪問共享資源
     * 注意:樂觀讀鎖在保證數據一致性上須要拷貝一份要操做的變量到方法棧,而且在操做數據時候可能其餘寫線程已經修改了數據,
     * 而咱們操做的是方法棧裏面的數據,也就是一個快照,因此最多返回的不是最新的數據,可是一致性仍是獲得保障的。
     *
     * @return
     */
    double distanceFromOrigin() {
        long stamp = sl.tryOptimisticRead();    // 使用樂觀讀鎖
        double currentX = x, currentY = y;      // 拷貝共享資源到本地方法棧中
        if (!sl.validate(stamp)) {              // 若是有寫鎖被佔用,可能形成數據不一致,因此要切換到普通讀鎖模式
            stamp = sl.readLock();             
            try {
                currentX = x;
                currentY = y;
            } finally {
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) { // upgrade
        // Could instead start with optimistic, not read mode
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                long ws = sl.tryConvertToWriteLock(stamp);  //讀鎖轉換爲寫鎖
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                } else {
                    sl.unlockRead(stamp);
                    stamp = sl.writeLock();
                }
            }
        } finally {
            sl.unlock(stamp);
        }
    }
}

能夠看到,上述示例最特殊的實際上是distanceFromOrigin方法,這個方法中使用了「Optimistic reading」樂觀讀鎖,使得讀寫能夠併發執行,可是「Optimistic reading」的使用必須遵循如下模式:

long stamp = lock.tryOptimisticRead();  // 非阻塞獲取版本信息
copyVaraibale2ThreadMemory();           // 拷貝變量到線程本地堆棧
if(!lock.validate(stamp)){              // 校驗
    long stamp = lock.readLock();       // 獲取讀鎖
    try {
        copyVaraibale2ThreadMemory();   // 拷貝變量到線程本地堆棧
     } finally {
       lock.unlock(stamp);              // 釋放悲觀鎖
    }

}
useThreadMemoryVarables();              // 使用線程本地堆棧裏面的數據進行操做

3、StampedLock原理

3.1 StampedLock的內部常量

StampedLock雖然不像其它鎖同樣定義了內部類來實現AQS框架,可是StampedLock的基本實現思路仍是利用CLH隊列進行線程的管理,經過同步狀態值來表示鎖的狀態和類型。

StampedLock內部定義了不少常量,定義這些常量的根本目的仍是和ReentrantReadWriteLock同樣,對同步狀態值按位切分,以經過位運算對State進行操做:

對於StampedLock來講,寫鎖被佔用的標誌是第8位爲1,讀鎖使用0-7位,正常狀況下讀鎖數目爲1-126,超過126時,使用一個名爲readerOverflow的int整型保存超出數。

部分常量的比特位表示以下:

另外,StampedLock相比ReentrantReadWriteLock,對多核CPU進行了優化,能夠看到,當CPU核數超過1時,會有一些自旋操做:

3.2 示例分析

假設如今有多個線程:ThreadA、ThreadB、ThreadC、ThreadD、ThreadE。操做以下:
ThreadA調用writeLock————獲取寫鎖
ThreadB調用readLock————獲取讀鎖
ThreadC調用readLock————獲取讀鎖
ThreadD調用writeLock————獲取寫鎖
ThreadE調用readLock————獲取讀鎖

1. StampedLock對象的建立

StampedLock的構造器很簡單,構造時設置下同步狀態值:

/**
 * Creates a new lock, initially in unlocked state.
 */
public StampedLock() {
    state = ORIGIN;
}

另外,StamedLock提供了三類視圖:

// views
transient ReadLockView readLockView;
transient WriteLockView writeLockView;
transient ReadWriteLockView readWriteLockView;

這些視圖實際上是對StamedLock方法的封裝,便於習慣了ReentrantReadWriteLock的用戶使用:
例如,ReadLockView其實至關於ReentrantReadWriteLock.readLock()返回的讀鎖;

final class ReadLockView implements Lock {
    public void lock() { readLock(); }
    public void lockInterruptibly() throws InterruptedException {
        readLockInterruptibly();
    }
    public boolean tryLock() { return tryReadLock() != 0L; }
    public boolean tryLock(long time, TimeUnit unit)
        throws InterruptedException {
        return tryReadLock(time, unit) != 0L;
    }
    public void unlock() { unstampedUnlockRead(); }
    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

2. ThreadA調用writeLock獲取寫鎖

來看下writeLock方法:

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    return ((((s = state) & ABITS) == 0L &&//(s=state)&ABITS==0L表示讀鎖和寫鎖都未被使用
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?//CAS操做:將第8位置爲1,表示寫鎖被佔用
            next : acquireWrite(false, 0L));//獲取失敗則調用acquireWrite,加入到等待隊列
}

說明:上述代碼獲取寫鎖,若是獲取失敗,則進入阻塞,注意該方法不響應中斷,返回非0表示獲取成功。

StampedLock中大量運用了位運算,這裏(s = state) & ABITS == 0L 表示讀鎖和寫鎖都未被使用,這裏寫鎖能夠當即獲取成功,而後CAS操做更新同步狀態值State。

操做完成後,等待隊列的結構以下:

注意:StampedLock中,等待隊列的結點要比AQS中簡單些,僅僅三種狀態。
0:初始狀態
-1:等待中
1:取消

另外,結點的定義中有個cowait字段,該字段指向一個棧,用於保存讀線程,這個後續會講到。

3. ThreadB調用readLock獲取讀鎖

來看下readLock方法:
因爲ThreadA此時持有寫鎖,因此ThreadB獲取讀鎖失敗,將調用acquireRead方法,加入等待隊列:

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    return ((whead == wtail && (s & ABITS) < RFULL &&//表示寫鎖未被佔用,且讀鎖數量沒用超限
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}

說明:上述代碼獲取讀鎖,若是寫鎖被佔用,線程會阻塞,注意該方法不響應中斷,返回非0表示獲取成功。

acquireRead方法很是複雜,用到了大量自旋操做:

/**
 * 嘗試自旋的獲取讀鎖, 獲取不到則加入等待隊列, 並阻塞線程
 *
 * @param interruptible true 表示檢測中斷, 若是線程被中斷過, 則最終返回INTERRUPTED
 * @param deadline      若是非0, 則表示限時獲取
 * @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過
 */
private long acquireRead(boolean interruptible, long deadline) {
    WNode node = null, p;   // node指向入隊結點, p指向入隊前的隊尾結點

    /**
     * 自旋入隊操做
     * 若是寫鎖未被佔用, 則當即嘗試獲取讀鎖, 獲取成功則返回.
     * 若是寫鎖被佔用, 則將當前讀線程包裝成結點, 並插入等待隊列(若是隊尾是寫結點,直接連接到隊尾;不然,連接到隊尾讀結點的棧中)
     */
    for (int spins = -1; ; ) {
        WNode h;
        if ((h = whead) == (p = wtail)) {   // 若是隊列爲空或只有頭結點, 則會當即嘗試獲取讀鎖
            for (long m, s, ns; ; ) {
                if ((m = (s = state) & ABITS) < RFULL ?     // 判斷寫鎖是否被佔用
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //寫鎖未佔用,且讀鎖數量未超限, 則更新同步狀態
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))        //寫鎖未佔用,但讀鎖數量超限, 超出部分放到readerOverflow字段中
                    return ns;          // 獲取成功後, 直接返回
                else if (m >= WBIT) {   // 寫鎖被佔用,以隨機方式探測是否要退出自旋
                    if (spins > 0) {
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    } else {
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        spins = SPINS;
                    }
                }
            }
        }
        if (p == null) {                            // p == null表示隊列爲空, 則初始化隊列(構造頭結點)
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null) {                  // 將當前線程包裝成讀結點
            node = new WNode(RMODE, p);
        } else if (h == p || p.mode != RMODE) {     // 若是隊列只有一個頭結點, 或隊尾結點不是讀結點, 則直接將結點連接到隊尾, 連接完成後退出自旋
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        // 隊列不爲空, 且隊尾是讀結點, 則將添加當前結點連接到隊尾結點的cowait鏈中(實際上構成一個棧, p是棧頂指針 )
        else if (!U.compareAndSwapObject(p, WCOWAIT, node.cowait = p.cowait, node)) {    // CAS操做隊尾結點p的cowait字段,實際上就是頭插法插入結點
            node.cowait = null;
        } else {
            for (; ; ) {
                WNode pp, c;
                Thread w;
                // 嘗試喚醒頭結點的cowait中的第一個元素, 假如是讀鎖會經過循環釋放cowait鏈
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                ns = s + RUNIT) :
                            (m < WBIT &&
                                (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT);
                }
                if (whead == h && p.prev == pp) {
                    long time;
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, p, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if ((h != pp || (state & ABITS) == WBIT) && whead == h && p.prev == pp) {
                        // 寫鎖被佔用, 且當前結點不是隊首結點, 則阻塞當前線程
                        U.park(false, time);
                    }
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 若是當前線程是隊首結點, 則嘗試獲取讀鎖
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long m, s, ns;
                if ((m = (s = state) & ABITS) < RFULL ?     // 判斷寫鎖是否被佔用
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :  //寫鎖未佔用,且讀鎖數量未超限, 則更新同步狀態
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {      //寫鎖未佔用,但讀鎖數量超限, 超出部分放到readerOverflow字段中
                    // 獲取讀鎖成功, 釋放cowait鏈中的全部讀結點
                    WNode c;
                    Thread w;

                    // 釋放頭結點, 當前隊首結點成爲新的頭結點
                    whead = node;
                    node.prev = null;

                    // 從棧頂開始(node.cowait指向的結點), 依次喚醒全部讀結點, 最終node.cowait==null, node成爲新的頭結點
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                            U.unpark(w);
                    }
                    return ns;
                } else if (m >= WBIT &&
                    LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        } else if (h != null) {     // 若是頭結點存在cowait鏈, 則喚醒鏈中全部讀線程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 將前驅結點的等待狀態置爲WAITING, 表示以後將喚醒當前結點
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {        // 阻塞當前讀線程
                long time;
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)   //限時等待超時, 取消等待
                    return cancelWaiter(node, node, false);

                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) == WBIT) && whead == h && node.prev == p) {
                    // 若是前驅的等待狀態爲WAITING, 且寫鎖被佔用, 則阻塞當前調用線程
                    U.park(false, time);
                }
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

咱們來分析下這個方法。
該方法會首先自旋的嘗試獲取讀鎖,獲取成功後,就直接返回;不然,會將當前線程包裝成一個讀結點,插入到等待隊列。
因爲,目前等待隊列仍是空,因此ThreadB會初始化隊列,而後將自身包裝成一個讀結點,插入隊尾,而後在下面這個地方跳出自旋:

if (p == null) { // initialize queue,表示等待隊列爲空,且當前線程未得到讀鎖,則初始化隊列(構造頭結點)
    WNode hd = new WNode(WMODE, null);
    if (U.compareAndSwapObject(this, WHEAD, null, hd))
        wtail = hd;
}
else if (node == null)//將當前線程保證成共享節點
    node = new WNode(RMODE, p);
else if (h == p || p.mode != RMODE) {//若是等待隊列只有一個頭結點或當前入隊的是寫線程,則直接將節點連接到隊尾,連接完成後退出自旋
    if (node.prev != p)
        node.prev = p;
    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
        p.next = node;
        break;//這裏退出循環
    }
}

此時,等待隊列的結構以下:

跳出自旋後,ThreadB會繼續向下執行,進入下一個自旋,在下一個自旋中,依然會再次嘗試獲取讀鎖,若是此次再獲取不到,就會將前驅的等待狀態置爲WAITING, 表示我(當前線程)要去睡了(阻塞),到時記得叫醒我:

if (whead == h) {
    if ((np = node.prev) != p) {
        if (np != null)
            (p = np).next = node;   // stale
    }
    else if ((ps = p.status) == 0)//將前驅結點的等待狀態置爲WAITING,表示以後將喚醒當前結點
        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
    else if (ps == CANCELLED) {
        if ((pp = p.prev) != null) {
            node.prev = pp;
            pp.next = node;
        }
    }

最終, ThreadB進入阻塞狀態:

else {//阻塞當前線程
        long time;
        if (deadline == 0L)
            time = 0L;
        else if ((time = deadline - System.nanoTime()) <= 0L)//限時等待超時,取消等待
            return cancelWaiter(node, node, false);
        Thread wt = Thread.currentThread();
        U.putObject(wt, PARKBLOCKER, this);
        node.thread = wt;
        if (p.status < 0 &&
            (p != h || (state & ABITS) == WBIT) &&
            whead == h && node.prev == p)
            U.park(false, time);//若是前驅的等待狀態爲WAITINF,其寫鎖被佔用,則阻塞當前調用線程
        node.thread = null;
        U.putObject(wt, PARKBLOCKER, null);
        if (interruptible && Thread.interrupted())
            return cancelWaiter(node, node, true);
    }
}

最終,等待隊列的結構以下:

4. ThreadC調用readLock獲取讀鎖

這個過程和ThreadB獲取讀鎖同樣,區別在於ThreadC被包裝成結點加入等待隊列後,是連接到ThreadB結點的棧指針中的。調用完下面這段代碼後,ThreadC會連接到以Thread B爲棧頂指針的棧中:

else if (!U.compareAndSwapObject(p, WCOWAIT,
                                 node.cowait = p.cowait, node))//CAS操做隊尾結點,p的cowait字段,實際上就是頭插法插入節點
    node.cowait = null;

說明:上述代碼隊列不爲空,且隊尾是讀結點,則將添加當前結點連接到隊尾結點的cowait鏈中(實際上構成一個棧,p是棧頂指針)

注意:讀結點的cowait字段其實構成了一個棧,入棧的過程實際上是個「頭插法」插入單鏈表的過程。好比,再來個ThreadX讀結點,則cowait鏈表結構爲:ThreadB - > ThreadX -> ThreadC。最終喚醒讀結點時,將從棧頂開始。

而後會在下一次自旋中,阻塞當前讀線程:

if (whead == h && p.prev == pp) {
    long time;
    if (pp == null || h == p || p.status > 0) {
        node = null; // throw away
        break;
    }
    if (deadline == 0L)
        time = 0L;
    else if ((time = deadline - System.nanoTime()) <= 0L)
        return cancelWaiter(node, p, false);
    Thread wt = Thread.currentThread();
    U.putObject(wt, PARKBLOCKER, this);
    node.thread = wt;
    if ((h != pp || (state & ABITS) == WBIT) &&
        whead == h && p.prev == pp)
        U.park(false, time);//寫鎖被佔用,且當前節點不是隊首節點,則阻塞當前線程
    node.thread = null;
    U.putObject(wt, PARKBLOCKER, null);
    if (interruptible && Thread.interrupted())
        return cancelWaiter(node, p, true);
}

最終,等待隊列的結構以下:

能夠看到,此時ThreadC結點並無把它的前驅的等待狀態置爲-1,由於ThreadC是連接到棧中的,當寫鎖釋放的時候,會從棧底元素開始,喚醒棧中全部讀結點。

5. ThreadD調用writeLock獲取寫鎖

ThreadD調用writeLock方法獲取寫鎖失敗後(ThreadA依然佔用着寫鎖),會調用acquireWrite方法,該方法總體邏輯和acquireRead差很少,首先自旋的嘗試獲取寫鎖,獲取成功後,就直接返回;不然,會將當前線程包裝成一個寫結點,插入到等待隊列。

public long writeLock() {
    long s, next;  // bypass acquireWrite in fully unlocked case only
    return ((((s = state) & ABITS) == 0L &&//表示讀鎖和寫鎖都未被使用
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?//CAS操做:將第8位置位1,表示寫鎖被佔用
            next : acquireWrite(false, 0L));//獲取失敗則調用acquireWrite,加入等待隊列
}

說明:上述代碼獲取寫鎖,若是失敗,則進入阻塞,注意該方法不響應中斷,返回非0,表示獲取成功

acquireWrite源碼:

/**
 * 嘗試自旋的獲取寫鎖, 獲取不到則阻塞線程
 *
 * @param interruptible true 表示檢測中斷, 若是線程被中斷過, 則最終返回INTERRUPTED
 * @param deadline      若是非0, 則表示限時獲取
 * @return 非0表示獲取成功, INTERRUPTED表示中途被中斷過
 */
private long acquireWrite(boolean interruptible, long deadline) {
    WNode node = null, p;

    /**
     * 自旋入隊操做
     * 若是沒有任何鎖被佔用, 則當即嘗試獲取寫鎖, 獲取成功則返回.
     * 若是存在鎖被使用, 則將當前線程包裝成獨佔結點, 並插入等待隊列尾部
     */
    for (int spins = -1; ; ) {
        long m, s, ns;
        if ((m = (s = state) & ABITS) == 0L) {      // 沒有任何鎖被佔用
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))    // 嘗試當即獲取寫鎖
                return ns;                                                 // 獲取成功直接返回
        } else if (spins < 0)
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        } else if ((p = wtail) == null) {       // 隊列爲空, 則初始化隊列, 構造隊列的頭結點
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        } else if (node == null)               // 將當前線程包裝成寫結點
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {    // 連接結點至隊尾
            p.next = node;
            break;
        }
    }

    for (int spins = -1; ; ) {
        WNode h, np, pp;
        int ps;
        if ((h = whead) == p) {     // 若是當前結點是隊首結點, 則當即嘗試獲取寫鎖
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
            for (int k = spins; ; ) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {      // 寫鎖未被佔用
                    if (U.compareAndSwapLong(this, STATE, s,
                        ns = s + WBIT)) {               // CAS修改State: 佔用寫鎖
                        // 將隊首結點從隊列移除
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                } else if (LockSupport.nextSecondarySeed() >= 0 &&
                    --k <= 0)
                    break;
            }
        } else if (h != null) {  // 喚醒頭結點的棧中的全部讀線程
            WNode c;
            Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) && (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        if (whead == h) {
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            } else if ((ps = p.status) == 0)        // 將當前結點的前驅置爲WAITING, 表示當前結點會進入阻塞, 前驅未來須要喚醒我
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            } else {        // 阻塞當前調用線程
                long time;  // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) && whead == h && node.prev == p)
                    U.park(false, time);    // emulate LockSupport.park
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}

acquireWrite中的下面這個自旋操做,用於將線程包裝成寫結點,插入隊尾:

for (int spins = -1;;) { // spin while enqueuing
    long m, s, ns;
    if ((m = (s = state) & ABITS) == 0L) {//沒用任何鎖被佔用
        if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))//嘗試當即獲取寫鎖
            return ns;//獲取成功直接返回
    }
    else if (spins < 0)
        spins = (m == WBIT && wtail == whead) ? SPINS : 0;
    else if (spins > 0) {
        if (LockSupport.nextSecondarySeed() >= 0)
            --spins;
    }
    else if ((p = wtail) == null) { // initialize queue,隊列爲空,則初始化隊列,構造隊列的頭結點
        WNode hd = new WNode(WMODE, null);
        if (U.compareAndSwapObject(this, WHEAD, null, hd))
            wtail = hd;
    }
    else if (node == null)
        node = new WNode(WMODE, p);//將當前線程包裝成寫節點
    else if (node.prev != p)
        node.prev = p;
    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {//連接節點至隊尾
        p.next = node;
        break;
    }
}

說明:上述代碼自旋入隊操做,若是沒用任何鎖被佔用,則當即嘗試獲取寫鎖,獲取成功則返回,若是存在鎖被使用,則將當前線程包裝成獨佔節點,並插入等待隊列尾部

插入完成後,隊列結構以下:

而後,進入下一個自旋,並在下一個自旋中阻塞ThreadD,最終隊列結構以下:

6. ThreadE調用readLock獲取讀鎖

一樣,因爲寫鎖被ThreadA佔用着,因此最終會調用acquireRead方法,在該方法的第一個自旋中,會將ThreadE加入等待隊列:

注意,因爲隊尾結點是寫結點,因此當前讀結點會直接連接到隊尾;若是隊尾是讀結點,則會連接到隊尾讀結點的cowait鏈中。

而後進入第二個自旋,阻塞ThreadE,最終隊列結構以下:

7. ThreadA調用unlockWrite釋放寫鎖

經過CAS操做,修改State成功後,會調用release方法喚醒等待隊列的隊首結點:

//釋放寫鎖
public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L)//stamp不匹配,或者寫鎖未被佔用,拋出異常
        throw new IllegalMonitorStateException();
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;//正常狀況下,stamp+=WBIT後,第8位位0,表示寫鎖被釋放;可是溢出,則置爲ORIGIN
    if ((h = whead) != null && h.status != 0)
        release(h);//喚醒等待隊列中的隊首節點
}

release方法很是簡單,先將頭結點的等待狀態置爲0,表示即將喚醒後繼結點,而後當即喚醒隊首結點:

//喚醒等待隊列的隊首節點(即頭結點whead的後繼節點)
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//將頭結點的等待狀態從-1置爲0,表示將要喚醒後繼節點
        if ((q = h.next) == null || q.status == CANCELLED) {//從隊尾開始查找距離頭結點最近的WAITING節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w);//喚醒隊首節點
    }
}

此時,等待隊列的結構以下:

8. ThreadB被喚醒後繼續向下執行

ThreadB被喚醒後,會從原阻塞處繼續向下執行,而後開始下一次自旋:

if (whead == h) {
    if ((np = node.prev) != p) {
        if (np != null)
            (p = np).next = node;   // stale
    }
    else if ((ps = p.status) == 0)前驅結點的等待狀態設置爲WAITING,表示以後喚醒當前結點
        U.compareAndSwapInt(p, WSTATUS, 0, WAITING);//將
    else if (ps == CANCELLED) {
        if ((pp = p.prev) != null) {
            node.prev = pp;
            pp.next = node;
        }
    }
    else {//阻塞當前讀線程
        long time; // 0 argument to park means no timeout
        if (deadline == 0L)
            time = 0L;
        else if ((time = deadline - System.nanoTime()) <= 0L)//限時等待超時,取消等待
            return cancelWaiter(node, node, false);
        Thread wt = Thread.currentThread();
        U.putObject(wt, PARKBLOCKER, this);
        node.thread = wt;
        if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
            whead == h && node.prev == p)
            U.park(false, time);  // emulate LockSupport.park,若是前驅的等待狀態爲WAITING,且寫鎖被佔用,則阻塞當前調用線程,注意,ThreadB今後處被喚醒,並繼續向下執行
        node.thread = null;
        U.putObject(wt, PARKBLOCKER, null);
        if (interruptible && Thread.interrupted())
            return cancelWaiter(node, node, true);
    }
}

第二次自旋時,ThreadB發現寫鎖未被佔用,則成功獲取到讀鎖,而後從棧頂(ThreadB的cowait指針指向的結點)開始喚醒棧中全部線程,
最後返回:

for (int k = spins;;) { // spin at head
    long m, s, ns;
    if ((m = (s = state) & ABITS) < RFULL ?//判斷寫鎖是否被佔用
        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) ://寫鎖未被佔用,且讀鎖數量未超限制,則更新同步狀態
        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {//寫鎖未被佔用,但讀鎖數量限制,超出部分放到readerOverflow字段中
        WNode c; Thread w;//獲取讀鎖成功,釋放cowrite鏈中的全部讀結點
        whead = node;
        node.prev = null;//釋放頭節點,當前隊首節點成爲新的頭結點
        //從棧頂開始(node.cowait指向的節點),依次喚醒全部讀結點,最終node.cowait==null,node成爲新的頭結點
        while ((c = node.cowait) != null) {
            if (U.compareAndSwapObject(node, WCOWAIT,
                                       c, c.cowait) &&
                (w = c.thread) != null)
                U.unpark(w);
        }
        return ns;
    }
    else if (m >= WBIT &&
             LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
        break;
}

最終,等待隊列的結構以下:

9. ThreadC被喚醒後繼續向下執行

ThreadC被喚醒後,繼續執行,並進入下一次自旋,下一次自旋時,會成功獲取到讀鎖。

for (;;) {
    WNode pp, c; Thread w;
    //嘗試喚醒頭節點whead的cowait中的第一個元素,假如是讀鎖會經過循環釋放cowait鏈
    if ((h = whead) != null && (c = h.cowait) != null &&
        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
        (w = c.thread) != null) // help release
        U.unpark(w);
    if (h == (pp = p.prev) || h == p || pp == null) {
        long m, s, ns;
        do {
            if ((m = (s = state) & ABITS) < RFULL ?
                U.compareAndSwapLong(this, STATE, s,
                                     ns = s + RUNIT) :
                (m < WBIT &&
                 (ns = tryIncReaderOverflow(s)) != 0L))
                return ns;
        } while (m < WBIT);
    }
    if (whead == h && p.prev == pp) {
        long time;
        if (pp == null || h == p || p.status > 0) {
            node = null; // throw away
            break;
        }
        if (deadline == 0L)
            time = 0L;
        else if ((time = deadline - System.nanoTime()) <= 0L)
            return cancelWaiter(node, p, false);
        Thread wt = Thread.currentThread();
        U.putObject(wt, PARKBLOCKER, this);
        node.thread = wt;
        if ((h != pp || (state & ABITS) == WBIT) &&
            whead == h && p.prev == pp)
            U.park(false, time);//寫鎖被釋放,且當前節點不是隊首節點,則阻塞當前線程
        node.thread = null;
        U.putObject(wt, PARKBLOCKER, null);
        if (interruptible && Thread.interrupted())
            return cancelWaiter(node, p, true);
    }
}

注意,此時ThreadB和ThreadC已經拿到了讀鎖,ThreadD(寫線程)和ThreadE(讀線程)依然阻塞中,原來ThreadC對應的結點是個孤立結點,會被GC回收。

最終,等待隊列的結構以下:

10. ThreadB和ThreadC釋放讀鎖

ThreadB和ThreadC調用unlockRead方法釋放讀鎖,CAS操做State將讀鎖數量減1:

//釋放讀鎖
public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        if (((s = state) & SBITS) != (stamp & SBITS) ||//stamp不匹配,或沒用任何鎖被佔用,都會拋出異常
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        if (m < RFULL) {//讀鎖數量未超限
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {//讀鎖數量-1
                if (m == RUNIT && (h = whead) != null && h.status != 0)//若是當前讀鎖數量爲1,喚醒等待隊列中的隊首節點
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)//讀鎖數量超限,則溢出字段要-1
            break;
    }
}

注意,當讀鎖的數量變爲0時纔會調用release方法,喚醒隊首結點:

//喚醒等待隊列中的隊首節點(即頭結點whead的後繼節點)
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);//將頭結點的等待狀態從-1置爲0,表示將要喚醒後繼節點
        if ((q = h.next) == null || q.status == CANCELLED) {//從隊尾開始查找距離頭結點最近的WAITING節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w);//喚醒隊首節點
    }
}

隊首結點(ThreadD寫結點被喚醒),最終等待隊列的結構以下:

11. ThreadD被喚醒後繼續向下執行

ThreadD會從原阻塞處繼續向下執行,並在下一次自旋中獲取到寫鎖,而後返回:

for (int spins = -1;;) {
    WNode h, np, pp; int ps;
    if ((h = whead) == p) {
        if (spins < 0)
            spins = HEAD_SPINS;
        else if (spins < MAX_HEAD_SPINS)
            spins <<= 1;
        for (int k = spins;;) { // spin at head
            long m, s, ns;
            if ((m = (s = state) & ABITS) < RFULL ?
                U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                WNode c; Thread w;
                whead = node;
                node.prev = null;
                while ((c = node.cowait) != null) {
                    if (U.compareAndSwapObject(node, WCOWAIT,
                                               c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
                return ns;
            }
            else if (m >= WBIT &&
                     LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                break;
        }
    }
    else if (h != null) {
        WNode c; Thread w;
        while ((c = h.cowait) != null) {
            if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                (w = c.thread) != null)
                U.unpark(w);
        }
    }
    if (whead == h) {
        if ((np = node.prev) != p) {
            if (np != null)
                (p = np).next = node;   // stale
        }
        else if ((ps = p.status) == 0)
            U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
        else if (ps == CANCELLED) {
            if ((pp = p.prev) != null) {
                node.prev = pp;
                pp.next = node;
            }
        }
        else {
            long time;
            if (deadline == 0L)
                time = 0L;
            else if ((time = deadline - System.nanoTime()) <= 0L)
                return cancelWaiter(node, node, false);
            Thread wt = Thread.currentThread();
            U.putObject(wt, PARKBLOCKER, this);
            node.thread = wt;
            if (p.status < 0 &&
                (p != h || (state & ABITS) == WBIT) &&
                whead == h && node.prev == p)
                U.park(false, time);
            node.thread = null;
            U.putObject(wt, PARKBLOCKER, null);
            if (interruptible && Thread.interrupted())
                return cancelWaiter(node, node, true);
        }
    }

最終,等待隊列的結構以下:

12. ThreadD調用unlockWrite釋放寫鎖

ThreadD釋放寫鎖的過程和步驟7徹底相同,會調用unlockWrite喚醒隊首結點(ThreadE)。

ThreadE被喚醒後會從原阻塞處繼續向下執行,但因爲ThreadE是個讀結點,因此同時會喚醒cowait棧中的全部讀結點,過程和步驟8徹底同樣。最終,等待隊列的結構以下:

至此,所有執行完成。

4、StampedLock總結

StampedLock的等待隊列與RRW的CLH隊列相比,有如下特色:

  1. 當入隊一個線程時,若是隊尾是讀結點,不會直接連接到隊尾,而是連接到該讀結點的cowait鏈中,cowait鏈本質是一個棧;
  2. 當入隊一個線程時,若是隊尾是寫結點,則直接連接到隊尾;
  3. QS相似喚醒線程的規則和A,都是首先喚醒隊首結點。區別是StampedLock中,當喚醒的結點是讀結點時,會喚醒該讀結點的cowait鏈中的全部讀結點(順序和入棧順序相反,也就是後進先出)。

另外,StampedLock使用時要特別當心,避免鎖重入的操做,在使用樂觀讀鎖時也須要遵循相應的調用模板,防止出現數據不一致的問題。

 

參考書籍

Java併發編程之美

參考連接

https://docs.oracle.com/javase/8/docs/api/

https://segmentfault.com/a/1190000015808032?utm_source=tag-newest

相關文章
相關標籤/搜索