AQS系列二:源碼分析「公平」ReentrantLock和Condition

上篇文章 AQS系列一:源碼分析非公平ReentrantLock 中,咱們分析了ReentrantLock的非公平實現,本篇會承接上文,繼續分析ReentrantLock的公平鎖實現(以及Condition的實現)。java

在此以前咱們要先弄明白,「不公平」體如今哪裏。node

爲什麼「不公」

好吧,我也不清楚。
因而我對比了ReentrantLock的非公平和公平實現,即NonfairSync vs FairSync,發現差異主要體現在加鎖,更確切的說是獲取鎖環節segmentfault

## 非公平獲取鎖
final boolean nonfairTryAcquire(int acquires) {
    ...
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    ...
}
## 公平獲取鎖
protected final boolean tryAcquire(int acquires) {
    ...
    if (!hasQueuedPredecessors() //### 差異體如今此處
        && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
    ...
}

顯然,公平鎖多執行了!hasQueuedPredecessors(),看看此方法的邏輯。併發

public final boolean hasQueuedPredecessors() {
    Node h, s;
    if ((h = head) != null) { ## h頭結點
        if ((s = h.next) == null ... ) { ## s二號節點
            ...
        }
        if (s != null && s.thread != Thread.currentThread()) ##檢查2號節點綁定線程,是否當前線程
            return true;
    }
    return false;
}

hasQueuedPredecessors方法只有在 2號節點不爲空,且綁定線程非當前線程的前提下,會返回true
返回ture意味着!hasQueuedPredecessors() = false,沒有資格獲取鎖(就是沒機會執行compareAndSetState——嘗試修改state)函數

反過來說,沒有隊列(無線程正在執行),或者沒有2號節點(取消或者臨時狀態),再或者2號節點的綁定線程就是當前線程時,才會嘗試獲取鎖工具

clipboard.png

咱們分析下最後這種狀況,2號節點綁定的線程是第1個等待的線程(第1個獲取鎖失敗的線程),第1個等待的線程在hasQueuedPredecessors()的運做下,成爲了第1個有資格嘗試獲取鎖的線程。而這,就是公平源碼分析

那麼沒有hasQueuedPredecessors方法的非公平鎖,到底「不公平」在哪兒呢?
咱們回想一下,在加 / 解鎖的過程當中,nonfairTryAcquire方法被調用的位置就能獲得答案了。ui

public final void acquire(int arg) {
    if (!tryAcquire(arg) ### 位置1,嘗試獲取
        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    ...
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { ### 位置2,嘗試獲取
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    ...
}

在上述代碼中,tryAcquire(非公平實現會調用nonfairTryAcquire)會在位置一、2兩處觸發。試想以下場景:this

  • 線程T-3執行完畢,調用了unlock;隨着線程T-2被喚醒,位置2處代碼可能會被執行。
  • 與此同時,隨着新的線程T-1的介入,位置1處的代碼也有可能被執行。

所以線程T-2T-1誰能在併發中搶到鎖,存在不肯定性spa

  • 若是線程T-2先執行了,T-1失敗於位置1處,後續會阻塞於隊列尾部;
  • 若是線程T-1先執行了,T-2失敗於位置2處,面臨又一輪阻塞,這種狀況就不怎麼「公平」——新來的線程T-1搶先了!

原理說完了,那具體怎麼構建公平的ReentrantLock呢?構造函數傳參便可:

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();  ## 入參fair傳入true,構建公平鎖
}

Condition解析

使用

ReentrantLock的加解鎖過程已詳細分析了一遍,若是你常用這個工具,確定對衍生出另外一個大咖condition有所瞭解。
二話不說,先甩出demo:

static Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();;

public void doSomething(){
    lock.lock();
    System.out.println(String.format("%s線程,獲取到鎖了",Thread.currentThread().getName()));
    try {
        System.out.println(String.format("%s線程,await",Thread.currentThread().getName()));
        TimeUnit.SECONDS.sleep(2L); //模擬耗時業務邏輯執行
        condition.await();  //await
        System.out.println(String.format("%s線程,await被喚醒",Thread.currentThread().getName()));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println(String.format("%s線程,業務執行完畢",Thread.currentThread().getName()));
    lock.unlock();
}

public static void main(String[] args) throws InterruptedException {
    ReentrantLockTest test = new ReentrantLockTest();
    int total = 1;
    while (total>0){
        Thread t = new Thread(()->{
            test.doSomething();
        },"T-"+total);
        t.start();

        TimeUnit.MILLISECONDS.sleep(200L);  //讓子線程T-1率先獲取到鎖
        lock.lock();
        System.out.println(String.format("%s線程,獲取到鎖了",Thread.currentThread().getName()));
        test.condition.signal();
        System.out.println(String.format("%s線程,signal",Thread.currentThread().getName()));
        lock.unlock();
        total--;
    }
}

結合已掌握的加解鎖原理,分析demo執行過程:

  • 人爲控制讓子線程T-1先獲取到鎖,200ms後main線程也會嘗試獲取鎖,固然main線程獲取不到——因爲耗時達2s的業務邏輯瘋狂執行中。(sleep處,此時main線程應該構建了同步隊列,main線程做爲2號節點的綁定線程被無情阻塞,下圖)

clipboard.png

  • 2s後,線程T-1搞定了難纏的業務邏輯,卻又遭遇condition.await()的伏擊
  • 此時,線程main發現本身神奇的不被阻塞了,又神奇的獲取到了鎖。因而投桃報李,condition.signal()unlock二連招喚醒了線程T-1
  • 線程T-1覺醒於await處,執行完剩餘邏輯

demo的執行結果,能初步證實上述分析:

T-1線程,獲取到鎖了
T-1線程,await
main線程,獲取到鎖了
main線程,signal
T-1線程,await被喚醒
T-1線程,業務執行完畢

原理

構造器

從構造函數出發:

public Condition newCondition() {
    return sync.newCondition();
}

## Sync類建立ConditionObject
final ConditionObject newCondition() {
    return new ConditionObject();
}

ConditionObject是AQS中的另外一內部類,看看它的屬性:

## ConditionObject類
private transient Node firstWaiter;
private transient Node lastWaiter;

感受上和AQS的設定上有些像?

## AQS類
private transient volatile Node head;
private transient volatile Node tail;

先大膽猜想一下,condition中極可能會再次構建同步隊列。

await()

接下來就是驗證咱們的猜想的過程:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();    ## 建立等待隊列,node是尾節點。    詳情參看[addConditionWaiter詳情]
    int savedState = fullyRelease(node);    ## 重置state,返回重置前的state值。    詳情參看[fullyRelease詳情]
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {    ## 是否在AQS同步隊列中
        LockSupport.park(this);    ## 不在AQS同步隊列的節點,阻塞當前線程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • addConditionWaiter詳情
private Node addConditionWaiter() {
    if (!isHeldExclusively())    ## 當前線程是否owner線程,若是不是,拋異常——這兒決定了await必須用在lock()方法以後
        throw new IllegalMonitorStateException();
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    Node node = new Node(Node.CONDITION); ## 建立新節點,原子形賦值waitStatus=CONDITION=-2,並綁定當前線程到node節點

    ## node會做爲尾節點,置於隊列最後
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node; 
}
  • fullyRelease詳情
final int fullyRelease(Node node) {
    try {
        int savedState = getState();    ## 獲取當前state
        if (release(savedState))
            return savedState;
        throw new IllegalMonitorStateException();
    } catch (Throwable t) {
        node.waitStatus = Node.CANCELLED;
        throw t;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {    ## 嘗試「清0」state
        Node h = head;    ## 此處head不爲空,unpark線程main,return true
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())    ## 當前線程驗證,若是當前線程!=owner,拋異常
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {    ## 若是state清0,同時清空owner線程,return true
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

有了以前分析ReentrantLock的經驗,與之很是相像的condition代碼應該不難拿下。
這裏畫出await方法中fullyRelease(node)執行先後的節點和關鍵屬性的變化:

clipboard.png

圖右側(await方法執行到了LockSupport.park(this)時),線程T-1已經阻塞,線程main則解除阻塞狀態。

經過上圖很容易看出,咱們以前的猜想是正確的:await方法又構建了一個同步隊列,不過此次的頭、尾指針在ConditionObject類中

signal()

再來看看signal方法做了什麼:

public final void signal() {
    if (!isHeldExclusively())    ## 和await()方法中的同樣,先驗證owner線程
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first)    ## condition頭結點傳遞
             && (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
    Node p = enq(node);    ## 將ConditionObject頭結點移動到AQS隊列尾部。    詳情參看[enq詳情]
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))    ## 取消或修改waitStatus失敗才做unpark操做,此處unpark不會觸發
        LockSupport.unpark(node.thread);
    return true;
}
  • enq詳情
private Node enq(Node node) {
    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            node.setPrevRelaxed(oldTail);    ## 入參node,成爲了AQS隊列新的尾節點
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return oldTail;
            }
        } else {
            initializeSyncQueue();    ## 初始化AQS隊列
        }
    }
}

signal中最神奇的莫過於enq(node)方法,它完成了節點的轉移,condition隊列頭結點 -> AQS隊列尾節點。

經過下圖觀察整個signal方法產生的各對象結構和屬性變化:

image.png

觀察可知,signal執行後節點轉移已經完成,線程T-1依然阻塞,此時ConditionObject已經完成了它的歷史使命。

線程T-1何時解除阻塞呢?其實這部分上篇文章已經分析過了,就是咱們的老朋友unlock()。
區別在於線程T-1被喚醒後,執行的是await後續的邏輯:

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {    ## 2.下次循環,node已經在AQS隊列中,返回true,跳出循環
        LockSupport.park(this);    ## 1.線程T-1覺醒於此
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) ## 3.再度獲取到鎖
            && interruptMode != THROW_IE)    
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

後記

至此,咱們已經瞭解了ReentrantLock的主邏輯的源碼實現(公平、非公平、condition),本系列的下篇文章將進入下一副本——CountDownLatch,敬請期待!

相關文章
相關標籤/搜索