Lock與Condition

Lock與Condition

阿里巴巴2021版JDK源碼筆記(2月第三版).pdfjava

連接:https://pan.baidu.com/s/1XhVcfbGTpU83snOZVu8AXg
提取碼:l3gynode

1. 互斥鎖

1.1 鎖的可重入性

當一個線程調用 object.lock()拿到鎖,進入互斥區後,再次調用object.lock(), 仍然能夠拿到該鎖(不然會死鎖)數組

1.2 類的繼承關係

lock.java安全

public interface Lock {
	void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

1.3 鎖的公平性與非公平性

Sync是一個抽象類,它有兩個子類FairSync與NonfairSync,分別 對應公平鎖和非公平鎖多線程

  • 公平鎖:遵循先到者優先服務,先搶資源的先獲取CPU
  • 非公平鎖:線程來了直接搶鎖,獲取CPU資源不是按照順序獲取(提升效率,減小線程切換)

1.4 鎖實現的基本原理

Sync的父類AbstractQueuedSynchronizer常常被稱做隊列同步器 (AQS),這個類很是關鍵ide

AbstractOwnableSynchronizer具備阻塞線程的做用,爲了實現一把具備阻塞和喚醒功能的鎖,須要一下核心要素:函數

    1. 須要一個state變量,標記該鎖的狀態,state變量至少有兩個值:0,1 對state變量的操做,要確保線程安全,也就是會用到CAS
    1. 須要記錄當前是哪一個線程持有鎖
    1. 須要底層支持對一個線程進行阻塞或喚醒操做
    1. 須要有一個隊列維護全部阻塞的線程。這個隊列也必須是線程安全的無鎖隊列,也須要用到CAS

針對1,2ui

  • state取值不只能夠是0、1,還能夠大於1,就是爲了支持鎖的可 重入性。例如,一樣一個線程,調用5次lock,state會變成5;而後調用5次unlock,state減爲0。
  • 當state=0時,沒有線程持有鎖,exclusiveOwnerThread=null;
  • 當state=1時,有一個線程持有鎖,exclusiveOwnerThread=該線程;
  • 當state > 1時,說明該線程重入了該鎖。

針對3this

  • 在Unsafe類中,提供了阻塞或喚醒線程的一對操做原語,也就是park/unpark線程

  • LockSupport對其作了簡單的封裝

    public class LockSupport {
    	public static void park(Object blocker) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            UNSAFE.park(false, 0L);
            setBlocker(t, null);
        }
        public static void unpark(Thread thread) {
            if (thread != null)
                UNSAFE.unpark(thread);
        }
    }
  • 在當前線程中調用park(),該線程就會被阻塞;在另一個線 程中,調用unpark(Thread t),傳入一個被阻塞的線程,就能夠喚醒阻塞在park()地方的線程

  • 尤爲是 unpark(Thread t),它實現了一個線程對另一個線程 的「精準喚醒」

針對4

  • 在AQS中利用雙向鏈表和CAS實現了一個阻塞隊列。

    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        static final class Node {
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
        }
        private transient volatile Node head;
        private transient volatile Node tail;
    }

1.5 公平與非公平的lock()的實現差別

FairSync 公平鎖

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {//等於0,資源空閒,能夠拿到鎖
        if (!hasQueuedPredecessors() && //判斷是否存在等待隊列或者當前線程是不是隊頭
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//被鎖了,可是當前線程就是已經獲取鎖了(重入鎖),state+1 
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

NonfairSync 非公平鎖

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

公平鎖和非公平鎖的區別:

公平鎖就多了這塊代碼 !hasQueuedPredecessors(),看源碼

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

這裏其實就是判斷當前線程是否能夠被公平的執行(隊列爲空,或者當前在隊頭的時候表示到當前線程處理了)

1.6 阻塞隊列與喚醒機制

AQS類中,有嘗試拿鎖的方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) && //這裏嘗試去拿鎖,沒有拿到鎖才執行下一個條件
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
        //將當前線程入隊進入等待
        //addWaiter就是將線程加入到隊列中,
        //acquireQueued該線程被阻塞。在該函數返回 的一刻,就是拿到鎖的那一刻,也就是被喚醒的那一刻,此時會刪除隊列的第一個元素(head指針前移1個節點)
        selfInterrupt();
}

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//這裏會阻塞住,直到拿到鎖
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

1.7 unlock()實現分析

unlock不區分公平仍是非公平

public void unlock() {
    sync.release(1);
}

 public final boolean release(int arg) {
     if (tryRelease(arg)) {
         Node h = head;
         if (h != null && h.waitStatus != 0)
             unparkSuccessor(h);
         return true;
     }
     return false;
 }

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //只有鎖的擁有者才能夠釋放鎖
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //這裏須要考慮重入鎖
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

release()裏面作了兩件事:tryRelease(..)函數釋放鎖;unparkSuccessor(..)函數喚醒隊列中的後繼者。

1.8 lockInterruptibly()實現分析

當parkAndCheckInterrupt()返回true的時候,說明有其餘線程發送中斷信號,直接拋出InterruptedException,跳出for循環,整個函數返回。

1.9 tryLock()實現分析

tryLock()實現基於調用非公平鎖的tryAcquire(..),對state進行CAS操做,若是操做成功就拿到鎖;若是操做不成功則直接返回false,也不阻塞

2. 讀寫鎖

和互斥鎖相比,讀寫鎖(ReentrantReadWriteLock)就是讀線程 和讀線程之間能夠不用互斥了。在正式介紹原理以前,先看一下相關類的繼承體系。

public interface ReadWriteLock {
	Lock readLock();
    Lock writeLock();
}

2.1 代碼中使用

當使用 ReadWriteLock 的時候,並非直接使用,而是得到其內部的讀鎖和寫鎖,而後分別調用lock/unlock。

public static void main(String[] args) {
    ReadWriteLock rwlock = new ReentrantReadWriteLock();
    Lock rlock = rwlock.readLock();
    rlock.lock();
    rlock.unlock();
    Lock wlock = rwlock.writeLock();
    wlock.lock();
    wlock.unlock();
}

2.2 讀寫鎖實現的基本原理

從表面來看,ReadLock和WriteLock是兩把鎖,實際上它只是同一 把鎖的兩個視圖而已

  • 兩個視圖: 能夠理解爲是一把鎖,線程分紅兩類:讀線程和寫線程。讀線程和讀線程之間不互斥(能夠同時拿到這把鎖),讀線程和寫線程互斥,寫線程和寫線程也互斥。

  • readerLock和writerLock實際共 用同一個sync對象

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }
  • 同互斥鎖同樣,讀寫鎖也是用state變量來表示鎖狀態的。只是state變量在這裏的含義和互斥鎖徹底不一樣

  • 是把 state 變量拆成兩半,低16位,用來記錄寫鎖,高16位,用來「讀」鎖。但同一 時間既然只能有一個線程寫,爲何還須要16位呢?這是由於一個寫 線程可能屢次重入

    abstract static class Sync extends AbstractQueuedSynchronizer {
         static final int SHARED_SHIFT   = 16;
         static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
         static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
         static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
         
         static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
         static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
     }
  • 爲何要把一個int類型變量拆成兩半, 而不是用兩個int型變量分別表示讀鎖和寫鎖的狀態呢?這是由於沒法 用一次CAS 同時操做兩個int變量,因此用了一個int型的高16位和低16位分別表示讀鎖和寫鎖的狀態。

  • 當state=0時,說明既沒有線程持有讀鎖,也沒有線程持有寫鎖; 當state!=0時,要麼有線程持有讀鎖,要麼有線程持有寫鎖,二者不 能同時成立,由於讀和寫互斥。

2.3 AQS的兩對模板方法

ReentrantReadWriteLock的兩個內部類ReadLock和WriteLock中,是如何使用state變量的

acquire/release、acquireShared/releaseShared 是AQS裏面的 兩對模板方法。互斥鎖和讀寫鎖的寫鎖都是基於acquire/release模板 方法來實現的。讀寫鎖的讀鎖是基於acquireShared/releaseShared這對模板方法來實現的

將讀/寫、公平/非公平進行排列組合,就有4種組合

  • 讀鎖的公平實現:Sync.tryAccquireShared()+FairSync中的兩個覆寫的子函數。
  • 讀鎖的非公平實現:Sync.tryAccquireShared()+NonfairSync中的兩個覆寫的子函數
  • 寫鎖的公平實現:Sync.tryAccquire()+FairSync中的兩個覆寫的子函數
  • 寫鎖的非公平實現:Sync.tryAccquire()+NonfairSync中的兩個覆寫的子函數。

對於公平,比較容易理解,不管是讀鎖,仍是寫鎖,只要隊列中 有其餘線程在排隊(排隊等讀鎖,或者排隊等寫鎖),就不能直接去搶鎖,要排在隊列尾部。

對於非公平,讀鎖和寫鎖的實現策略略有差別。先說寫鎖,寫線 程能搶鎖,前提是state=0,只有在沒有其餘線程持有讀鎖或寫鎖的情 況下,它纔有機會去搶鎖。或者state!=0,但那個持有寫鎖的線程是 它本身,再次重入。寫線程是非公平的,就是無論三七二十一就去搶,即一直返回false。

由於讀線程和讀線程是不互斥的,假設當前線程被讀線程持有,而後其餘讀線程還非公平地一直去搶,可能致使寫線程永遠拿不到鎖,所 以對於讀線程的非公平,要作一些「約束」

當發現隊列的第1個元素 是寫線程的時候,讀線程也要阻塞一下,不能「肆無忌憚」地直接去搶

2.4 WriteLock公平與非公平實現

寫鎖是排他鎖,實現策略相似於互斥鎖,重寫了tryAcquire/tryRelease方法。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
  • if (c!=0) and w==0,說明當前必定是讀線程拿着鎖,寫鎖必定拿不到,返回false。
  • if (c!=0) and w!=0,說明當前必定是寫線程拿着鎖, 執行current!=getExclusive-OwnerThread()的判斷,發現ownerThread不是本身,返回false。
  • c ! =0 , w ! =0 , 且 current=getExclusiveOwnerThread(),纔會走到if (w+exclusive-Count(acquires)> MAX_COUNT)。判斷重入次數,重入次數超過最大值,拋出異常。
  • if(c=0),說明當前既沒有讀線程,也沒有寫線程持有該鎖。能夠經過CAS操做開搶了。
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null); 
    setState(nextc);//寫鎖是排他的
    return free;
}

2.5 ReadLock公平與非公平實現

讀鎖是共享鎖,重寫了 tryAcquireShared/tryReleaseShared 方法,其實現策略和排他鎖有很大的差別。

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 && //寫鎖被某線程持有,且不是本身,讀鎖確定拿不到,直接返回
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&//公平和非公平的差別
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {//高位讀鎖+1
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
  • 低16位不等於0,說明有寫線程持有鎖,而且只有當ownerThread!=本身時,才返回-1。這裏面有一個潛臺詞:若是current=ownerThread,則這段代碼不會返回。這是由於一個寫線程能夠再次去拿讀 鎖!也就是說,一個線程在持有了WriteLock後,再去調用ReadLock.lock也是能夠的。
  • 上面的compareAndSetState(c,c+SHARED_UNIT),實際上是 把state的高16位加1(讀鎖的狀態),但由於是在高16位,必須把1左移16位再加1。
  • firstReader,cachedHoldConunter 之類的變量,只是一些 統計變量,在 ReentrantRead-WriteLock對外的一些查詢函數中會用 到,例如,查詢持有讀鎖的線程列表,但對整個讀寫互斥機制沒有影響,此處再也不展開解釋
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;
    }
}

由於讀鎖是共享鎖,多個線程會同時持有讀鎖,因此對讀鎖的釋 放不能直接減1,而是須要經過一個for循環+CAS操做不斷重試。這是tryReleaseShared和tryRelease的根本差別所在。

3. Condition

Condition自己也是一個接口,其功能和wait/notify相似

public interface Condition {
	void await() throws InterruptedException;
    void signal();
    void signalAll();
}

3.1 Condition與Lock的關係

在講多線程基礎的時候,強調wait()/notify()必須和synchronized一塊兒使用,Condition也是如此,必須和Lock一塊兒使用。所以,在Lock的接口中,有一個與Condition相關的接口:

public interface Lock {
    Condition newCondition();
}

3.2 Condition的使用場景

爲一個用數組實現的阻塞 隊列,執行put(..)操做的時候,隊列滿了,生成者線程被阻塞;執行take()操做的時候,隊列爲空,消費者線程被阻塞。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    //核心就是一把鎖,兩個條件
	final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
}

3.3 Condition實現原理

使用很簡潔,避免了 wait/notify 的生成者通知生 成者、消費者通知消費者的問題。

由於Condition必須和Lock一塊兒使用,因此Condition的實現也是Lock的一部分

3.4 await()實現分析

public final void await() throws InterruptedException {
    if (Thread.interrupted())//正要執行await操做,收到了中斷信號,拋出異常
        throw new InterruptedException();
    Node node = addConditionWaiter();//加入condition等待隊列
    long savedState = fullyRelease(node);//阻塞在condition以前必須釋放鎖,不然會釋放鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);//本身阻塞本身
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//從新拿鎖
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • 線程調用 await()的時候,確定已經先拿到了鎖。因此, 在 addConditionWaiter()內部,對這個雙向鏈表的操做不須要執行CAS操做,線程天生是安全的
  • 在線程執行wait操做以前,必須先釋放鎖。也就是fullyRelease(node),不然會發生死鎖。這個和wait/notify與synchronized的配合機制同樣。
  • 線程從wait中被喚醒後,必須用acquireQueued(node,savedState)函數從新拿鎖。
  • checkInterruptWhileWaiting(node)代碼在park(this) 代碼以後,是爲了檢測在park期間是否收到過中斷信號。當線程從park中醒來時,有兩種可能:一種是其餘線程調用了unpark,另外一種是收 到中斷信號。這裏的await()函數是能夠響應中斷的,因此當發現自 己是被中斷喚醒的,而不是被unpark喚醒的時,會直接退出while循環,await()函數也會返回。
  • isOnSyncQueue(node)用於判斷該Node是否在AQS的同步隊 列裏面。初始的時候,Node只在Condition的隊列裏,而不在AQS的隊列裏。但執行notity操做的時候,會放進AQS的同步隊列。

3.5 awaitUninterruptibly()實現分析

與await()不一樣,awaitUninterruptibly()不會響應中斷,其 函數的定義中不會有中斷異常拋出,下面分析其實現和await()的區別

public final void awaitUninterruptibly() {
     Node node = addConditionWaiter();
     long savedState = fullyRelease(node);
     boolean interrupted = false;
     while (!isOnSyncQueue(node)) {
         LockSupport.park(this);
         if (Thread.interrupted())//從park中醒來,收到中斷,不退出,繼續執行循環
             interrupted = true;
     }
     if (acquireQueued(node, savedState) || interrupted)
         selfInterrupt();
 }

能夠看出,總體代碼和 await()相似,區別在於收到異常後,不會拋出異常,而是繼續執行while循環。

3.6 signal()實現分析

public final void signal() {
    if (!isHeldExclusively())//只有持有鎖的隊列才能夠調用signal
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {//喚醒隊列的第一個線程
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node);//先把Node放入互斥鎖的同步隊列中,再調用下面的unpark方法
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
  • 同 await()同樣,在調用 notify()的時候,必須先拿到鎖 (不然就會拋出上面的異常),是由於前面執行await()的時候,把鎖釋放了。
  • 從隊列中取出firstWait,喚醒它。在經過調用unpark喚醒 它以前,先用enq(node)函數把這個Node放入AQS的鎖對應的阻塞隊 列中

4. StampedLock

JDK8引入

4.1 爲何要引入?

  • ReentrantLock: 讀與讀互斥,寫與寫互斥,讀與寫互斥
  • ReentrantReadWriteLock:讀與讀不互斥,寫與寫互斥,讀與寫互斥
  • StampedLock:讀與讀不互斥,寫與寫不互斥,讀與寫互斥

StampedLock引入了「樂觀讀」策略,讀的時候不加讀鎖,讀出來發現數據被修改 了,再升級爲「悲觀讀」,至關於下降了「讀」的地位,把搶鎖的天平往「寫」的一方傾斜了一下,避免寫線程被餓死。

4.2 使用場景

public class Point {
    private double x, y;
    private final StampedLock s1 = new StampedLock();

    void move(double deltaX, double deltaY) {
        //多個線程調用,修改x,y的值
        long stamp = s1.writeLock();
        try {
            x = deltaX;
            y = deltaY;
        } finally {
            s1.unlock(stamp);
        }
    }

    double distanceFromOrigin() {

        long stamp = s1.tryOptimisticRead();//使用樂觀鎖
        double currentX = x, currentY = y;
        if (!s1.validate(stamp)) {
            /**
             * 上面這三行關鍵代碼對順序很是敏感,不能有重排序。 因
             * 爲 state 變量已是volatile,因此能夠禁止重排序,但stamp並 不是volatile的。
             * 爲此,在validate(stamp)函數裏面插入內存屏 障。
             */
            stamp = s1.readLock();//升級悲觀鎖
            try {
                currentX = x;
                currentY = y;
            } finally {
                s1.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}

4.3 「樂觀讀」的實現原理

StampedLock是一個讀寫鎖,所以也會像讀寫鎖那樣,把一 個state變量分紅兩半,分別表示讀鎖和寫鎖的狀態。同時,它還須要 一個數據的version。但正如前面所說,一次CAS沒有辦法操做兩個變 量,因此這個state變量自己同時也表示了數據的version。下面先分析state變量。

  • 用最低的8位表示讀和寫的狀態,其中第8位表 示寫鎖的狀態,最低的7位表示讀鎖的狀態。由於寫鎖只有一個bit位,因此寫鎖是不可重入的。

4.4 悲觀讀/寫:「阻塞」與「自旋」策略實現差別

同ReadWriteLock同樣,StampedLock也要進行悲觀的讀鎖和寫鎖 操做。不過,它不是基於AQS實現的,而是內部從新實現了一個阻塞隊列

public class StampedLock implements java.io.Serializable {
	static final class WNode {
        volatile WNode prev;
        volatile WNode next;
        volatile WNode cowait;    // list of linked readers
        volatile Thread thread;   // non-null while possibly parked
        volatile int status;      // 0, WAITING, or CANCELLED
        final int mode;           // RMODE or WMODE
        WNode(int m, WNode p) { mode = m; prev = p; }
    }
}

這個阻塞隊列和 AQS 裏面的很像。剛開始的時候,whead=wtail=NULL,而後初始化,建一個空節點,whead和wtail都指向這個空節 點,以後往裏面加入一個個讀線程或寫線程節點。但基於這個阻塞隊 列實現的鎖的調度策略和AQS很不同,也就是「自旋」。在AQS裏 面,當一個線程CAS state失敗以後,會當即加入阻塞隊列,而且進入 阻塞狀態。但在StampedLock中,CAS state失敗以後,會不斷自旋, 自旋足夠多的次數以後,若是還拿不到鎖,才進入阻塞狀態。爲此, 根據CPU的核數,定義了自旋次數的常量值。若是是單核的CPU,確定不能自旋,在多核狀況下,才採用自旋策略。

相關文章
相關標籤/搜索