從ReentrantLock看AbstractQueuedSynchronized源碼之Condition條件隊列

上一篇:

從ReentrantLock分析AbstractQueuedSynchronized源碼

1.示例代碼

演示一個生產者消費者的場景,await阻塞,signal喚醒node

細節點:ui

aqs裏面鎖是什麼,簡單來講就是state的值,state>0線程持有了鎖,state=0線程釋放了鎖。this

aqs裏面線程是什麼,簡單來講就是Node節點,Node節點經過構成鏈表結構來表明線程的執行的前後順序,這個鏈表有雙向的,有單向的,其屬性prev和next以及全局head,tail屬性用於構建雙向鏈表,其nextWaiter以及firstWaiter,lastWaiter構成單向鏈表,waitStatus的屬性用於表示當前線程的等待狀態;spa

調用await的方法的線程,必然持有鎖;.net

生產者put和消費者take的方法使用同一個lock,存在資源競爭;線程

await方法會阻塞當前線程,signal會喚醒對應的處於條件隊列中等待的線程,並將處於條件隊列中的的節點移動到雙向同步隊列裏面;對象

有了這幾個概念,有助於理解下面所說的。blog

final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];

int putIndex, takeIndex, count;

public void put(Object item) throws InterruptedException {
    //這裏會將state值修改,state值在aqs中是全局的,無論同步隊列仍是條件隊列都共享一個state
    lock.lock();
    try {
        //items滿了,阻塞
        while (count == items.length) {
            //調用await方法的線程必須持有鎖
            //思考:這裏調用了await方法是否應該釋放當前線程的鎖呢?由於全局共用一個state值。
            notFull.await();
        }
        items[putIndex] = item;
        if (++putIndex == items.length) {
            putIndex = 0;
        }
        ++count;
        notEmpty.signal();//items中已經有值,通知notEmpty隊列消費
    } finally {
        lock.unlock();
    }
}

public Object take() throws InterruptedException {
    lock.lock();
    try {
        //items爲空
        while (count == 0) {
            //調用await方法的線程必須持有鎖
            //思考:這裏調用了await方法是否應該釋放當前線程的鎖呢?
            notEmpty.await();
        }
        Object item = items[takeIndex];
        if (++takeIndex == items.length) {
            takeIndex = 0;
        }
        --count;
        notFull.signal();//items中已經被消費掉,通知notFull生產
        return item;
    } finally {
        lock.unlock();
    }
}

2.雙向同步隊列和Condition條件隊列

2.1.雙向同步隊列結構

這是lock加鎖和unLock釋放鎖過程當中造成的隊列,有一個虛的頭節點,虛節點不存儲線程等相關信息,nextWaiter屬性在這種隊列中無實際做用。經過prev和next屬性串聯節點,thread保存當前線程,waitStatus表示當前線程的等待狀態。隊列

2.2.Condition條件隊列結構

由示例代碼能夠看出,每一個Condition對象都對應了一個Condition條件隊列,並且每一個條件隊列是相互獨立的。資源

Condition條件隊列是一個單向的隊列,並且firstWaiter也有實際做用,彼此以前是經過nextWaiter關聯的,在Condition條件隊列中,prev和next屬性也並不會用到。

噹噹前線程調用了notFull.await()方法的時候,當前線程就會被包裝成Node被加入到notFull隊列的末尾。

在條件隊列中,咱們須要關注的是waitStatus的屬性爲CONDITION,當waitStatus=CONDITION的時候,咱們就認爲當前線程不須要等待,能夠出隊了。

2.3.雙向同步隊列和條件隊列的聯繫

通常狀況下,雙向同步隊列和條件隊列是相互獨立的。可是,當咱們調用條件隊列的signal方法的時候,會將條件隊列中的處於等待的線程喚醒,被喚醒的線程一樣和普通線程同樣要去爭搶鎖資源。若是爭搶失敗,一樣要被加到雙向同步隊列中去。這個時候,就須要將條件隊列中的節點一個個轉移到雙向同步隊列中去了。注意,這個節點從條件隊列遷移到雙向同步隊列是一個一個遷移的。

3.分析ConditionObject

final Condition notFull = lock.newCondition();

該類位於AbstractQueuedSynchronized裏面

核心屬性:

//條件隊列首節點
private transient Node firstWaiter;
//條件隊列尾節點
private transient Node lastWaiter;

構造方法:

public ConditionObject() { }

3.1.分析ConditionObject.await()方法

await方法執行過程:

(1)lock加鎖;

(2)await方法檢測中斷狀態;

(3)fullyRelease方法釋放鎖;

(4)節點若是不位於同步隊列,掛起線程,等待被喚醒;

(5)節點被signal喚醒或者中斷喚醒;

(6)checkInterruptWhileWaiting檢查中斷模式,根據節點位於同步隊列仍是條件隊列設置中斷模式,並將節點加入到同步隊列中;

(7)acquireQueued嘗試加鎖,加鎖失敗在同步隊列中等待被喚醒,掛起線程;

(8)加鎖成功持有鎖,reportInterruptAfterWait處理中斷;

 

可以調用到await方法的線程都已經獲取到鎖。

調用await方法這裏有兩種狀況:

1.中斷髮生時,線程尚未被signal過,則線程恢復後,拋出異常;

2.中斷髮生時,線程已經被signal過,那麼線程已經被從condition隊列中移動到同步隊列中了,那麼await方法調用以後,

只是再補一下中斷,也就僅僅改變了線程的中斷狀態。

從await方法咱們能夠看出來,這個中斷在這裏僅僅只判斷中斷狀態。咱們直到thread.interrupt()方法只改變線程的中斷狀態爲true而並不能真正意義上的中止線程,再配上LockSupport.unpark喚醒線程,就可使得await方法拋出異常。

await方法退出的時候根據interruptMode來處理這兩種狀況:

先看interruptMode屬性值:

//若是是默認值0,表明線程整個過程當中沒有發生中斷

//=1表示退出await方法時再自我中斷,這種模式發生在signal方法調用以後,由於當前節點被加入到了同步隊列
//因此還須要中斷
private static final int REINTERRUPT =  1;
//=-1說明退出await方法的時候拋出InterruptedException
private static final int THROW_IE    = -1;
public final void await() throws InterruptedException {
    //判斷當前線程是不是中斷狀態,若是是直接拋出異常
    //await方法中發現了線程中斷,則拋出異常,後續reportInterruptAfterWait方法也會處理中斷模式
    if (Thread.interrupted())
        throw new InterruptedException();
    //新建一個節點並加入到條件隊列中
    Node node = addConditionWaiter();
    //徹底釋放當前線程的鎖,等待其餘線程爭搶鎖
    long savedState = fullyRelease(node);
    int interruptMode = 0;//中斷屬性默認值,表明不須要中斷
    //判斷節點是否處於同步隊列中,返回false則表示在條件隊列中
    //在調用signal方法以前,新加的節點都是位於條件隊列中
    //調用signal方法會將處於條件隊列中的節點遷移到同步隊列中
    while (!isOnSyncQueue(node)) {
        //節點位於條件隊列則掛起當前線程,等待signal方法喚醒,線程當前被掛起在這個位置
        LockSupport.park(this);
        //線程如何被喚醒:
        //1.線程被signal喚醒後,馬上走到這裏
        //3.被其餘線程中斷+喚醒,thread.interrupt() + LockSupport.unpark()方法;
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //能執行到這裏,說明node已經被遷移到同步隊列中了,並且當前線程也被喚醒了
    //條件1:嘗試加鎖
    //條件2:中斷屬性不爲-1
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        //嘗試加鎖成功或中斷屬性不爲-1,則設置屬性爲1,表明須要自我中斷
        interruptMode = REINTERRUPT;
    //node被加入到同步隊列中的時候,並無設置nextWaiter=null
    //若是有條件隊列中還有其餘節點,清除條件隊列中取消狀態的節點
    if (node.nextWaiter != null) 
        unlinkCancelledWaiters();
    //根據中斷模式處理中斷
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

3.1.1.分析ConditionObject.addConditionWaiter方法

//建立一個新節點並將它加入到條件隊列中,並返回該節點
//執行過程當中剔除了已取消的節點
private Node addConditionWaiter() {
    //獲取條件隊列最後一個節點
    Node t = lastWaiter;
    //條件1:若是t!=null說明條件隊列中已經有Node節點了
    //條件2:t.waitStatus != Node.CONDITION成立,說明當前尾節點取消了等待
    //那麼咱們新加的節點就不該該在當前尾節點的後面了
    if (t != null && t.waitStatus != Node.CONDITION) {
        //剔除全部取消狀態的節點
        unlinkCancelledWaiters();
        //更新臨時變量的值,可能由於上一個方法改變尾節點
        t = lastWaiter;
    }
    //建立一個新節點並設置waitStatus=CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //若是沒有最終節點
    if (t == null)
        //設置第一個節點爲當前節點
        firstWaiter = node;
    else
        //若是有最終節點,設置當前節點的後續節點爲新建立的節點
        t.nextWaiter = node;
    //更新最終節點爲當前節點
    lastWaiter = node;
    return node;
}

3.1.2.分析unlinkCancelledWaiters方法

//剔除取消等待的節點
private void unlinkCancelledWaiters() {
    //獲取首節點
    Node t = firstWaiter;
    Node trail = null;
    //循環遍歷鏈表
    while (t != null) {
        //獲取下一個節點
        Node next = t.nextWaiter;
        //條件成立,說明當前節點取消了等待
        if (t.waitStatus != Node.CONDITION) {
            //將首節點的的nextWaiter設置爲空,斷鏈操做,幫助gc
            t.nextWaiter = null;
            //若是鏈表尚未找到正常狀態的節點
            if (trail == null)
                //將next節點設置爲首節點
                firstWaiter = next;
            else
                //讓上一個正常節點指向取消等待節點的下一個節點
                trail.nextWaiter = next;
            //若是不存在下一個節點,當前尾節點設置爲正常節點
            if (next == null)
                lastWaiter = trail;
        }
        //條件不成立說明當前t節點是正常節點
        else
            //給正常節點賦值,這裏就找出了全部的正常節點
            trail = t;
        //循環繼續向下遍歷
        t = next;
    }
}

3.1.3.分析ConditionObject.fullRelease方法

//徹底釋放鎖,當failed=true的時候,說明當前線程未持有鎖調用了await方法
//調用await方法必須持有鎖
//假如failed=true,finally中會將加入到隊列中的節點狀態設置爲取消狀態
//後續節點入隊的過程會調用unlinkCancelledWaiters方法清除取消狀態的節點
//釋放鎖成功後返回當前鎖的state的值
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //獲取當前線程state值的總值
        int savedState = getState();
        //直接調用release方法釋放鎖
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

3.1.4.分析ConditionObject.isOnSyncQueue方法

判斷節點是否處於同步隊列中,前面已經說過,當調用signal方法的時候,可能會將條件隊列中的節點遷移到同步隊列中。

final boolean isOnSyncQueue(Node node) {
    //當前節點的狀態爲CONDITION,說明處於條件隊列中
    //當前節點的前驅節點爲空,說明處於條件隊列中,由於雙向同步隊列存在虛的首節點
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //當前節點的後續節點不爲空,說明處於同步隊列中,由於條件隊列的prev和next屬性爲空
    if (node.next != null)
        return true;
    //若是上面條件都不成立,說明當前節點知足成爲同步隊列節點的屬性,即prev和next屬性不爲空
    //循環鏈表從後向前遍歷,看是否能找到當前節點,找到則返回true
    return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

3.1.5.分析checkInterruptWhileWaiting方法

該方法返回線程的中斷的模式

private int checkInterruptWhileWaiting(Node node) {
    //Thread.interrupted返回當前線程的中斷狀態並將中斷狀態清除
    //被喚醒的線程當前中斷狀態仍是中斷,須要清除
    //以後調用transferAfterCancelledWait方法判斷節點是由signal喚醒仍是中斷喚醒
    //位於condition隊列說明是中斷喚醒,而後中斷模式返回THROW_IE
    //位於同步隊列則說明是signal喚醒,返回中斷模式REINTERRUPT
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

3.1.6.分析transferAfterCancelledWait方法

該方法能夠判斷當前節點是否位於條件隊列,位於條件隊列,則須要將節點加入到同步隊列,並返回true;

若是位於同步隊列則返回false;

final boolean transferAfterCancelledWait(Node node) {
    //條件成立,說明當前node的waitStatus值爲CONDITION,當前node位於條件隊列中
    //爲何會出現這種狀況,被signal方法喚醒後的線程不是在同步隊列中了嗎?
    //若是咱們是經過中斷喚醒,那麼這裏仍是須要將
    //節點加入到同步隊列中的
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //中斷喚醒的node也會被加到阻塞隊列中
        enq(node);
        return true;
    }
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

3.1.7.分析reportInterruptAfterWait方法

根據中斷模式處理中斷

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();//線程自我中斷,僅僅改變了中斷狀態
}

3.2.分析ConditionObject.signal方法

public final void signal() {
    //是否獨佔鎖線程,不是則拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    //條件隊列有node節點,則進行條件隊列遷移到同步隊列的操做。
    if (first != null)
        doSignal(first);
}
//將條件隊列中全部節點遷移到同步隊列中
private void doSignal(Node first) {
    do {
        //firstWaiter = first.nextWaiter,處理完當前節點,則讓firstWaiter指向下一個節點
        if ( (firstWaiter = first.nextWaiter) == null)
            //條件隊列爲空,則更新lastWaiter爲空
            lastWaiter = null;
        
        //若是下一個節點爲空,則出while循環
        first.nextWaiter = null;
        //while循環遷移某個節點成功和條件隊列爲空纔會退出循環
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    //經過CAS操做修改當前條件隊列中節點狀態爲0,由於節點要被遷移到同步隊列了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
 //將節點加入到同步隊列尾節點並返回其前一個節點
    Node p = enq(node);
    int ws = p.waitStatus;
    //在同步隊列中,waitStatus只有1,0,-1三種狀態,大於0說明前驅節點是取消狀態
    //若是前驅節點是取消狀態,則喚醒當前線程
    //若是前驅節點不是取消狀態,則設置前驅節點狀態爲-1,表明須要喚醒後續節點
    //若是compareAndSetWaitStatus(p, ws, Node.SIGNAL)返回false,說明當前node是
    //lockInterrupt入隊的node,是會響應中斷的,若是外部線程中斷該node,前驅node會
    //將節點狀態修改成取消狀態,並執行出隊邏輯
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
相關文章
相關標籤/搜索