死磕 java同步系列之StampedLock源碼解析

問題

(1)StampedLock是什麼?java

(2)StampedLock具備什麼特性?node

(3)StampedLock是否支持可重入?git

(4)StampedLock與ReentrantReadWriteLock的對比?源碼分析

簡介

StampedLock是java8中新增的類,它是一個更加高效的讀寫鎖的實現,並且它不是基於AQS來實現的,它的內部自成一片邏輯,讓咱們一塊兒來學習吧。學習

StampedLock具備三種模式:寫模式、讀模式、樂觀讀模式。ui

ReentrantReadWriteLock中的讀和寫都是一種悲觀鎖的體現,StampedLock加入了一種新的模式——樂觀讀,它是指當樂觀讀時假定沒有其它線程修改數據,讀取完成後再檢查下版本號有沒有變化,沒有變化就讀取成功了,這種模式更適用於讀多寫少的場景。this

使用方法

讓咱們經過下面的例子瞭解一下StampedLock三種模式的使用方法:spa

class Point {
    private double x, y;
    private final StampedLock sl = new StampedLock();

    void move(double deltaX, double deltaY) {
        // 獲取寫鎖,返回一個版本號(戳)
        long stamp = sl.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            // 釋放寫鎖,須要傳入上面獲取的版本號
            sl.unlockWrite(stamp);
        }
    }

    double distanceFromOrigin() {
        // 樂觀讀
        long stamp = sl.tryOptimisticRead();
        double currentX = x, currentY = y;
        // 驗證版本號是否有變化
        if (!sl.validate(stamp)) {
            // 版本號變了,樂觀讀轉悲觀讀
            stamp = sl.readLock();
            try {
                // 從新讀取x、y的值
                currentX = x;
                currentY = y;
            } finally {
                // 釋放讀鎖,須要傳入上面獲取的版本號
                sl.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }

    void moveIfAtOrigin(double newX, double newY) {
        // 獲取悲觀讀鎖
        long stamp = sl.readLock();
        try {
            while (x == 0.0 && y == 0.0) {
                // 轉爲寫鎖
                long ws = sl.tryConvertToWriteLock(stamp);
                // 轉換成功
                if (ws != 0L) {
                    stamp = ws;
                    x = newX;
                    y = newY;
                    break;
                }
                else {
                    // 轉換失敗
                    sl.unlockRead(stamp);
                    // 獲取寫鎖
                    stamp = sl.writeLock();
                }
            }
        } finally {
            // 釋放鎖
            sl.unlock(stamp);
        }
    }
}
複製代碼

從上面的例子咱們能夠與ReentrantReadWriteLock進行對比:線程

(1)寫鎖的使用方式基本一對待;調試

(2)讀鎖(悲觀)的使用方式能夠進行升級,經過tryConvertToWriteLock()方式能夠升級爲寫鎖;

(3)樂觀讀鎖是一種全新的方式,它假定數據沒有改變,樂觀讀以後處理完業務邏輯再判斷版本號是否有改變,若是沒改變則樂觀讀成功,若是有改變則轉化爲悲觀讀鎖重試;

下面咱們一塊兒來學習它的源碼是怎麼實現的。

源碼分析

主要內部類

static final class WNode {
    // 前一個節點
    volatile WNode prev;
    // 後一個節點
    volatile WNode next;
    // 讀線程所用的鏈表(實際是一個棧結果)
    volatile WNode cowait;    // list of linked readers
    // 阻塞的線程
    volatile Thread thread;   // non-null while possibly parked
    // 狀態
    volatile int status;      // 0, WAITING, or CANCELLED
    // 讀模式仍是寫模式
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}
複製代碼

隊列中的節點,相似於AQS隊列中的節點,能夠看到它組成了一個雙向鏈表,內部維護着阻塞的線程。

主要屬性

// 一堆常量
// 讀線程的個數佔有低7位
private static final int LG_READERS = 7;
// 讀線程個數每次增長的單位
private static final long RUNIT = 1L;
// 寫線程個數所在的位置
private static final long WBIT  = 1L << LG_READERS;  // 128 = 1000 0000
// 讀線程個數所在的位置
private static final long RBITS = WBIT - 1L;  // 127 = 111 1111
// 最大讀線程個數
private static final long RFULL = RBITS - 1L;  // 126 = 111 1110
// 讀線程個數和寫線程個數的掩碼
private static final long ABITS = RBITS | WBIT;  // 255 = 1111 1111
// 讀線程個數的反數,高25位所有爲1
private static final long SBITS = ~RBITS;  // -128 = 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1111 1000 0000

// state的初始值
private static final long ORIGIN = WBIT << 1;  // 256 = 1 0000 0000
// 隊列的頭節點
private transient volatile WNode whead;
// 隊列的尾節點
private transient volatile WNode wtail;
// 存儲着當前的版本號,相似於AQS的狀態變量state
private transient volatile long state;
複製代碼

經過屬性能夠看到,這是一個相似於AQS的結構,內部一樣維護着一個狀態變量state和一個CLH隊列。

構造方法

public StampedLock() {
    state = ORIGIN;
}
複製代碼

state的初始值爲ORIGIN(256),它的二進制是 1 0000 0000,也就是初始版本號。

writeLock()方法

獲取寫鎖。

public long writeLock() {
    long s, next;
    // ABITS = 255 = 1111 1111
    // WBITS = 128 = 1000 0000
    // state與ABITS若是等於0,嘗試原子更新state的值加WBITS
    // 若是成功則返回更新的值,若是失敗調用acquireWrite()方法
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}
複製代碼

咱們以state等於初始值爲例,則state & ABITS的結果爲:

StampedLock

此時state爲初始狀態,與ABITS與運算後的值爲0,因此執行後面的CAS方法,s + WBITS的值爲384 = 1 1000 0000。

到這裏咱們大膽猜想:state的高24位存儲的是版本號,低8位存儲的是是否有加鎖,第8位存儲的是寫鎖,低7位存儲的是讀鎖被獲取的次數,並且若是隻有第8位存儲寫鎖的話,那麼寫鎖只能被獲取一次,也就不可能重入了。

到底咱們猜想的對不對呢,走着瞧^^

咱們接着來分析acquireWrite()方法:

(手機橫屏看源碼更方便)

private long acquireWrite(boolean interruptible, long deadline) {
    // node爲新增節點,p爲尾節點(即將成爲node的前置節點)
    WNode node = null, p;
    
    // 第一次自旋——入隊
    for (int spins = -1;;) { // spin while enqueuing
        long m, s, ns;
        // 再次嘗試獲取寫鎖
        if ((m = (s = state) & ABITS) == 0L) {
            if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                return ns;
        }
        else if (spins < 0)
            // 若是自旋次數小於0,則計算自旋的次數
            // 若是當前有寫鎖獨佔且隊列無元素,說明快輪到本身了
            // 就自旋就好了,若是自旋完了還沒輪到本身才入隊
            // 則自旋次數爲SPINS常量
            // 不然自旋次數爲0
            spins = (m == WBIT && wtail == whead) ? SPINS : 0;
        else if (spins > 0) {
            // 當自旋次數大於0時,當前此次自旋隨機減一次自旋次數
            if (LockSupport.nextSecondarySeed() >= 0)
                --spins;
        }
        else if ((p = wtail) == null) {
            // 若是隊列未初始化,新建一個空節點並初始化頭節點和尾節點
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 若是新增節點還未初始化,則新建之,並賦值其前置節點爲尾節點
            node = new WNode(WMODE, p);
        else if (node.prev != p)
            // 若是尾節點有變化,則更新新增節點的前置節點爲新的尾節點
            node.prev = p;
        else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
            // 嘗試更新新增節點爲新的尾節點成功,則退出循環
            p.next = node;
            break;
        }
    }

    // 第二次自旋——阻塞並等待喚醒
    for (int spins = -1;;) {
        // h爲頭節點,np爲新增節點的前置節點,pp爲前前置節點,ps爲前置節點的狀態
        WNode h, np, pp; int ps;
        // 若是頭節點等於前置節點,說明快輪到本身了
        if ((h = whead) == p) {
            if (spins < 0)
                // 初始化自旋次數
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                // 增長自旋次數
                spins <<= 1;
            
            // 第三次自旋,不斷嘗試獲取寫鎖
            for (int k = spins;;) { // spin at head
                long s, ns;
                if (((s = state) & ABITS) == 0L) {
                    if (U.compareAndSwapLong(this, STATE, s,
                                             ns = s + WBIT)) {
                        // 嘗試獲取寫鎖成功,將node設置爲新頭節點並清除其前置節點(gc)
                        whead = node;
                        node.prev = null;
                        return ns;
                    }
                }
                // 隨機立減自旋次數,當自旋次數減爲0時跳出循環再重試
                else if (LockSupport.nextSecondarySeed() >= 0 &&
                         --k <= 0)
                    break;
            }
        }
        else if (h != null) { // help release stale waiters
            // 這段代碼很難進來,是用於協助喚醒讀節點的
            // 我是這麼調試進來的:
            // 起三個寫線程,兩個讀線程
            // 寫線程1獲取鎖不要釋放
            // 讀線程1獲取鎖,讀線程2獲取鎖(會阻塞)
            // 寫線程2獲取鎖(會阻塞)
            // 寫線程1釋放鎖,此時會喚醒讀線程1
            // 在讀線程1裏面先不要喚醒讀線程2
            // 寫線程3獲取鎖,此時就會走到這裏來了
            WNode c; Thread w;
            // 若是頭節點的cowait鏈表(棧)不爲空,喚醒裏面的全部節點
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 若是頭節點沒有變化
        if (whead == h) {
            // 若是尾節點有變化,則更新
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                // 若是尾節點狀態爲0,則更新成WAITING
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                // 若是尾節點狀態爲取消,則把它從鏈表中刪除
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 有超時時間的處理
                long time; // 0 argument to park means no timeout
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 已超時,剔除當前節點
                    return cancelWaiter(node, node, false);
                // 當前線程
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                // 把node的線程指向當前線程
                node.thread = wt;
                if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                    whead == h && node.prev == p)
                    // 阻塞當前線程
                    U.park(false, time);  // 等同於LockSupport.park()
                    
                // 當前節點被喚醒後,清除線程
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                // 若是中斷了,取消當前節點
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}
複製代碼

這裏對acquireWrite()方法作一個總結,這個方法裏面有三段自旋邏輯:

第一段自旋——入隊:

(1)若是頭節點等於尾節點,說明沒有其它線程排隊,那就多自旋一會,看能不能嘗試獲取到寫鎖;

(2)不然,自旋次數爲0,直接讓其入隊;

第二段自旋——阻塞並等待被喚醒 + 第三段自旋——不斷嘗試獲取寫鎖:

(1)第三段自旋在第二段自旋內部;

(2)若是頭節點等於前置節點,那就進入第三段自旋,不斷嘗試獲取寫鎖;

(3)不然,嘗試喚醒頭節點中等待着的讀線程;

(4)最後,若是當前線程一直都沒有獲取到寫鎖,就阻塞當前線程並等待被喚醒;

這麼一大段邏輯看着比較鬧心,其實真正分解下來仍是比較簡單的,無非就是自旋,把不少狀態的處理都糅合到一個for循環裏面處理了。

unlockWrite()方法

釋放寫鎖。

public void unlockWrite(long stamp) {
    WNode h;
    // 檢查版本號對不對
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    // 這行代碼實際有兩個做用:
    // 1. 更新版本號加1
    // 2. 釋放寫鎖
    // stamp + WBIT實際會把state的第8位置爲0,也就至關於釋放了寫鎖
    // 同時會進1,也就是高24位總體加1了
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    // 若是頭節點不爲空,而且狀態不爲0,調用release方法喚醒它的下一個節點
    if ((h = whead) != null && h.status != 0)
        release(h);
}
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 將其狀態改成0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 若是頭節點的下一個節點爲空或者其狀態爲已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 從尾節點向前遍歷找到一個可用的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 喚醒q節點所在的線程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}
複製代碼

寫鎖的釋放過程比較簡單:

(1)更改state的值,釋放寫鎖;

(2)版本號加1;

(3)喚醒下一個等待着的節點;

readLock()方法

獲取讀鎖。

public long readLock() {
    long s = state, next;  // bypass acquireRead on common uncontended case
    // 沒有寫鎖佔用,而且讀鎖被獲取的次數未達到最大值
    // 嘗試原子更新讀鎖被獲取的次數加1
    // 若是成功直接返回,若是失敗調用acquireRead()方法
    return ((whead == wtail && (s & ABITS) < RFULL &&
             U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
            next : acquireRead(false, 0L));
}
複製代碼

獲取讀鎖的時候先看看如今有沒有其它線程佔用着寫鎖,若是沒有的話再檢測讀鎖被獲取的次數有沒有達到最大,若是沒有的話直接嘗試獲取一次讀鎖,若是成功了直接返回版本號,若是沒成功就調用acquireRead()排隊。

下面咱們一塊兒來看看acquireRead()方法,這又是一個巨長無比的方法,請保持耐心,咱們一步步來分解:

(手機橫屏看源碼更方便)

private long acquireRead(boolean interruptible, long deadline) {
    // node爲新增節點,p爲尾節點
    WNode node = null, p;
    // 第一段自旋——入隊
    for (int spins = -1;;) {
        // 頭節點
        WNode h;
        // 若是頭節點等於尾節點
        // 說明沒有排隊的線程了,快輪到本身了,直接自旋不斷嘗試獲取讀鎖
        if ((h = whead) == (p = wtail)) {
            // 第二段自旋——不斷嘗試獲取讀鎖
            for (long m, s, ns;;) {
                // 嘗試獲取讀鎖,若是成功了直接返回版本號
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                    // 若是讀線程個數達到了最大值,會溢出,返回的是0
                    return ns;
                else if (m >= WBIT) {
                    // m >= WBIT表示有其它線程先一步獲取了寫鎖
                    if (spins > 0) {
                        // 隨機立減自旋次數
                        if (LockSupport.nextSecondarySeed() >= 0)
                            --spins;
                    }
                    else {
                        // 若是自旋次數爲0了,看看是否要跳出循環
                        if (spins == 0) {
                            WNode nh = whead, np = wtail;
                            if ((nh == h && np == p) || (h = nh) != (p = np))
                                break;
                        }
                        // 設置自旋次數
                        spins = SPINS;
                    }
                }
            }
        }
        // 若是尾節點爲空,初始化頭節點和尾節點
        if (p == null) { // initialize queue
            WNode hd = new WNode(WMODE, null);
            if (U.compareAndSwapObject(this, WHEAD, null, hd))
                wtail = hd;
        }
        else if (node == null)
            // 若是新增節點爲空,初始化之
            node = new WNode(RMODE, p);
        else if (h == p || p.mode != RMODE) {
            // 若是頭節點等於尾節點或者尾節點不是讀模式
            // 當前節點入隊
            if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }
        else if (!U.compareAndSwapObject(p, WCOWAIT,
                                         node.cowait = p.cowait, node))
            // 接着上一個elseif,這裏確定是尾節點爲讀模式了
            // 將當前節點加入到尾節點的cowait中,這是一個棧
            // 上面的CAS成功了是不會進入到這裏來的
            node.cowait = null;
        else {
            // 第三段自旋——阻塞當前線程並等待被喚醒
            for (;;) {
                WNode pp, c; Thread w;
                // 若是頭節點不爲空且其cowait不爲空,協助喚醒其中等待的讀線程
                if ((h = whead) != null && (c = h.cowait) != null &&
                    U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null) // help release
                    U.unpark(w);
                // 若是頭節點等待前前置節點或者等於前置節點或者前前置節點爲空
                // 這一樣說明快輪到本身了
                if (h == (pp = p.prev) || h == p || pp == null) {
                    long m, s, ns;
                    // 第四段自旋——又是不斷嘗試獲取鎖
                    do {
                        if ((m = (s = state) & ABITS) < RFULL ?
                            U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + RUNIT) :
                            (m < WBIT &&
                             (ns = tryIncReaderOverflow(s)) != 0L))
                            return ns;
                    } while (m < WBIT); // 只有當前時刻沒有其它線程佔有寫鎖就不斷嘗試
                }
                // 若是頭節點不曾改變且前前置節點也不曾改
                // 阻塞當前線程
                if (whead == h && p.prev == pp) {
                    long time;
                    // 若是前前置節點爲空,或者頭節點等於前置節點,或者前置節點已取消
                    // 從第一個for自旋開始重試
                    if (pp == null || h == p || p.status > 0) {
                        node = null; // throw away
                        break;
                    }
                    // 超時檢測
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        // 若是超時了,取消當前節點
                        return cancelWaiter(node, p, false);
                    
                    // 當前線程
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    // 設置進node中
                    node.thread = wt;
                    // 檢測以前的條件不曾改變
                    if ((h != pp || (state & ABITS) == WBIT) &&
                        whead == h && p.prev == pp)
                        // 阻塞當前線程並等待被喚醒
                        U.park(false, time);
                    
                    // 喚醒以後清除線程
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    // 若是中斷了,取消當前節點
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, p, true);
                }
            }
        }
    }
    
    // 只有第一個讀線程會走到下面的for循環處,參考上面第一段自旋中有一個break,當第一個讀線程入隊的時候break出來的
    
    // 第五段自旋——跟上面的邏輯差很少,只不過這裏單獨搞一個自旋針對第一個讀線程
    for (int spins = -1;;) {
        WNode h, np, pp; int ps;
        // 若是頭節點等於尾節點,說明快輪到本身了
        // 不斷嘗試獲取讀鎖
        if ((h = whead) == p) {
            // 設置自旋次數
            if (spins < 0)
                spins = HEAD_SPINS;
            else if (spins < MAX_HEAD_SPINS)
                spins <<= 1;
                
            // 第六段自旋——不斷嘗試獲取讀鎖
            for (int k = spins;;) { // spin at head
                long m, s, ns;
                // 不斷嘗試獲取讀鎖
                if ((m = (s = state) & ABITS) < RFULL ?
                    U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                    (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                    // 獲取到了讀鎖
                    WNode c; Thread w;
                    whead = node;
                    node.prev = null;
                    // 喚醒當前節點中全部等待着的讀線程
                    // 由於當前節點是第一個讀節點,因此它是在隊列中的,其它讀節點都是掛這個節點的cowait棧中的
                    while ((c = node.cowait) != null) {
                        if (U.compareAndSwapObject(node, WCOWAIT,
                                                   c, c.cowait) &&
                            (w = c.thread) != null)
                            U.unpark(w);
                    }
                    // 返回版本號
                    return ns;
                }
                // 若是當前有其它線程佔有着寫鎖,而且沒有自旋次數了,跳出當前循環
                else if (m >= WBIT &&
                         LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                    break;
            }
        }
        else if (h != null) {
            // 若是頭節點不等待尾節點且不爲空且其爲讀模式,協助喚醒裏面的讀線程
            WNode c; Thread w;
            while ((c = h.cowait) != null) {
                if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                    (w = c.thread) != null)
                    U.unpark(w);
            }
        }
        
        // 若是頭節點不曾變化
        if (whead == h) {
            // 更新前置節點及其狀態等
            if ((np = node.prev) != p) {
                if (np != null)
                    (p = np).next = node;   // stale
            }
            else if ((ps = p.status) == 0)
                U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
            else if (ps == CANCELLED) {
                if ((pp = p.prev) != null) {
                    node.prev = pp;
                    pp.next = node;
                }
            }
            else {
                // 第一個讀節點即將進入阻塞
                long time;
                // 超時設置
                if (deadline == 0L)
                    time = 0L;
                else if ((time = deadline - System.nanoTime()) <= 0L)
                    // 若是超時了取消當前節點
                    return cancelWaiter(node, node, false);
                Thread wt = Thread.currentThread();
                U.putObject(wt, PARKBLOCKER, this);
                node.thread = wt;
                if (p.status < 0 &&
                    (p != h || (state & ABITS) == WBIT) &&
                    whead == h && node.prev == p)
                    // 阻塞第一個讀節點並等待被喚醒
                    U.park(false, time);
                node.thread = null;
                U.putObject(wt, PARKBLOCKER, null);
                if (interruptible && Thread.interrupted())
                    return cancelWaiter(node, node, true);
            }
        }
    }
}
複製代碼

讀鎖的獲取過程比較艱辛,一共有六段自旋,Oh my god,讓咱們來大體地分解一下:

(1)讀節點進來都是先判斷是頭節點若是等於尾節點,說明快輪到本身了,就不斷地嘗試獲取讀鎖,若是成功了就返回;

(2)若是頭節點不等於尾節點,這裏就會讓當前節點入隊,這裏入隊又分紅了兩種;

(3)一種是首個讀節點入隊,它是會排隊到整個隊列的尾部,而後跳出第一段自旋;

(4)另外一種是非第一個讀節點入隊,它是進入到首個讀節點的cowait棧中,因此更確切地說應該是入棧;

(5)不論是入隊還入棧後,都會再次檢測頭節點是否是等於尾節點了,若是相等,則會再次不斷嘗試獲取讀鎖;

(6)若是頭節點不等於尾節點,那麼纔會真正地阻塞當前線程並等待被喚醒;

(7)上面說的首個讀節點實際上是連續的讀線程中的首個,若是是兩個讀線程中間夾了一個寫線程,仍是老老實實的排隊。

自旋,自旋,自旋,旋轉的木馬,讓我忘了傷^^

unlockRead()方法

釋放讀鎖。

public void unlockRead(long stamp) {
    long s, m; WNode h;
    for (;;) {
        // 檢查版本號
        if (((s = state) & SBITS) != (stamp & SBITS) ||
            (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
            throw new IllegalMonitorStateException();
        // 讀線程個數正常
        if (m < RFULL) {
            // 釋放一次讀鎖
            if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                // 若是讀鎖所有都釋放了,且頭節點不爲空且狀態不爲0,喚醒它的下一個節點
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                break;
            }
        }
        else if (tryDecReaderOverflow(s) != 0L)
            // 讀線程個數溢出檢測
            break;
    }
}

private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        // 將其狀態改成0
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
        // 若是頭節點的下一個節點爲空或者其狀態爲已取消
        if ((q = h.next) == null || q.status == CANCELLED) {
            // 從尾節點向前遍歷找到一個可用的節點
            for (WNode t = wtail; t != null && t != h; t = t.prev)
                if (t.status <= 0)
                    q = t;
        }
        // 喚醒q節點所在的線程
        if (q != null && (w = q.thread) != null)
            U.unpark(w);
    }
}
複製代碼

讀鎖釋放的過程就比較簡單了,將state的低7位減1,當減爲0的時候說明徹底釋放了讀鎖,就喚醒下一個排隊的線程。

tryOptimisticRead()方法

樂觀讀。

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
複製代碼

若是沒有寫鎖,就返回state的高25位,這裏把寫所在位置一塊兒返回了,是爲了後面檢測數據有沒有被寫過。

validate()方法

檢測樂觀讀版本號是否變化。

public boolean validate(long stamp) {
    // 強制加入內存屏障,刷新數據
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}
複製代碼

檢測二者的版本號是否一致,與SBITS與操做保證不受讀操做的影響。

變異的CLH隊列

StampedLock中的隊列是一種變異的CLH隊列,圖解以下:

StampedLock

總結

StampedLock的源碼解析到這裏就差很少了,讓咱們來總結一下:

(1)StampedLock也是一種讀寫鎖,它不是基於AQS實現的;

(2)StampedLock相較於ReentrantReadWriteLock多了一種樂觀讀的模式,以及讀鎖轉化爲寫鎖的方法;

(3)StampedLock的state存儲的是版本號,確切地說是高24位存儲的是版本號,寫鎖的釋放會增長其版本號,讀鎖不會;

(4)StampedLock的低7位存儲的讀鎖被獲取的次數,第8位存儲的是寫鎖被獲取的次數;

(5)StampedLock不是可重入鎖,由於只有第8位標識寫鎖被獲取了,並不能重複獲取;

(6)StampedLock中獲取鎖的過程使用了大量的自旋操做,對於短任務的執行會比較高效,長任務的執行會浪費大量CPU;

(7)StampedLock不能實現條件鎖;

彩蛋

StampedLock與ReentrantReadWriteLock的對比?

答:StampedLock與ReentrantReadWriteLock做爲兩種不一樣的讀寫鎖方式,彤哥大體概括了它們的異同點:

(1)二者都有獲取讀鎖、獲取寫鎖、釋放讀鎖、釋放寫鎖的方法,這是相同點;

(2)二者的結構基本相似,都是使用state + CLH隊列;

(3)前者的state分紅三段,高24位存儲版本號、低7位存儲讀鎖被獲取的次數、第8位存儲寫鎖被獲取的次數;

(4)後者的state分紅兩段,高16位存儲讀鎖被獲取的次數,低16位存儲寫鎖被獲取的次數;

(5)前者的CLH隊列能夠當作是變異的CLH隊列,連續的讀線程只有首個節點存儲在隊列中,其它的節點存儲的首個節點的cowait棧中;

(6)後者的CLH隊列是正常的CLH隊列,全部的節點都在這個隊列中;

(7)前者獲取鎖的過程當中有判斷首尾節點是否相同,也就是是否是快輪到本身了,若是是則不斷自旋,因此適合執行短任務;

(8)後者獲取鎖的過程當中非公平模式下會作有限次嘗試;

(9)前者只有非公平模式,一上來就嘗試獲取鎖;

(10)前者喚醒讀鎖是一次性喚醒連續的讀鎖的,並且其它線程還會協助喚醒;

(11)後者是一個接着一個地喚醒的;

(12)前者有樂觀讀的模式,樂觀讀的實現是經過判斷state的高25位是否有變化來實現的;

(13)前者各類模式能夠互轉,相似tryConvertToXxx()方法;

(14)前者寫鎖不可重入,後者寫鎖可重入;

(15)前者沒法實現條件鎖,後者能夠實現條件鎖;

差很少就這麼多吧,若是你還能想到,也歡迎補充哦^^

推薦閱讀

一、死磕 java同步系列之開篇

二、死磕 java魔法類之Unsafe解析

三、死磕 java同步系列之JMM(Java Memory Model)

四、死磕 java同步系列之volatile解析

五、死磕 java同步系列之synchronized解析

六、死磕 java同步系列之本身動手寫一個鎖Lock

七、死磕 java同步系列之AQS起篇

八、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖

九、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖

十、死磕 java同步系列之ReentrantLock VS synchronized

十一、死磕 java同步系列之ReentrantReadWriteLock源碼解析

十二、死磕 java同步系列之Semaphore源碼解析

1三、死磕 java同步系列之CountDownLatch源碼解析

1四、死磕 java同步系列之AQS終篇


歡迎關注個人公衆號「彤哥讀源碼」,查看更多源碼系列文章, 與彤哥一塊兒暢遊源碼的海洋。

qrcode
相關文章
相關標籤/搜索