JUC鎖框架——Condition

Condition介紹

Condition的做用是對鎖進行更精確的控制。Condition中的await()方法至關於Object的wait()方法,Condition中的signal()方法至關於Object的notify()方法,Condition中的signalAll()至關於Object的notifyAll()方法。不一樣的是,Object中的wait(),notify(),notifyAll()方法是和"同步鎖"(synchronized關鍵字)捆綁使用的;而Condition是須要與"互斥鎖"/"共享鎖"捆綁使用的html

Condition接口中的方法

public interface Condition {

// 形成當前線程在接到信號或被中斷以前一直處於等待狀態。會釋放鎖
void await()
// 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。
boolean await(long time, TimeUnit unit)
// 形成當前線程在接到信號、被中斷或到達指定等待時間以前一直處於等待狀態。
long awaitNanos(long nanosTimeout)
// 形成當前線程在接到信號以前一直處於等待狀態。
void awaitUninterruptibly()
// 形成當前線程在接到信號、被中斷或到達指定最後期限以前一直處於等待狀態。
boolean awaitUntil(Date deadline)
// 喚醒一個等待線程。
void signal()
// 喚醒全部等待線程。
void signalAll()

Condition的簡單應用

public class BoundedQueue<T> {
    public List<T> q; //這個列表用來存隊列的元素
    private int maxSize; //隊列的最大長度
    private Lock lock = new ReentrantLock();
    private Condition addConditoin = lock.newCondition();
    private Condition removeConditoin = lock.newCondition();

    public BoundedQueue(int size) {
        q = new ArrayList<>(size);
        maxSize = size;
    }

    public void add(T e) {
        lock.lock();
        try {
            while (q.size() == maxSize) {
                addConditoin.await();
            }
            q.add(e);
            removeConditoin.signal(); //執行了添加操做後喚醒因隊列空被阻塞的刪除操做
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public T remove() {
        lock.lock();
        try {
            while (q.size() == 0) {
                removeConditoin.await();
            }
            T e = q.remove(0);
            addConditoin.signal(); //執行刪除操做後喚醒因隊列滿而被阻塞的添加操做
            return e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }
}

Condition的實現類ConditionObject源碼分析

ConditionObject整體分析

ConditionObject是Condition在java併發中的具體的實現,它是AQS的內部類。由於Condition相關操做都須要獲取鎖,因此做爲AQS的內部類也比較合理。接下來就以ConditionObject的等待隊列、等待、通知爲切入點分析ConditionObject的具體實現。java

有關AQS中的內容能夠參考https://my.oschina.net/cqqcqqok/blog/1931790 在此咱們在來看看AQS中Node節點node

static final class Node {
    //標識節點當前在共享模式下
    static final Node SHARED = new Node();

    //標識節點當前在獨佔模式下
    static final Node EXCLUSIVE = null;


    /*
        這幾個int常量表示當前節點的等待狀態(waitStatus)
    */
    //結束狀態,線程取消鎖的爭搶,通常在超時或被中斷設置爲CANCELLED狀態而該狀態表示的節點會被踢出隊列
    static final int CANCELLED =  1;

    //其表示當前node的後繼節點對應的線程須要被喚醒(unpark)。若是當前線程的後繼線程處於阻塞狀態,而當前線程被release或cancel掉,所以須要喚醒當前線程的後繼線程.
    static final int SIGNAL    = -1;

    //與Condition相關,該標識的結點處於等待隊列中,結點的線程等待在Condition上,當其餘線程調用了Condition的signal()方法後,CONDITION狀態的結點將從等待隊列轉移到同步隊列中,等待獲取同步鎖。
    static final int CONDITION = -2;

    //與共享模式相關,在共享模式中,該狀態標識結點的線程處於可運行狀態。
    static final int PROPAGATE = -3;

    //節點的等待狀態,新節點的waitStatus=0
    volatile int waitStatus;
    volatile Node prev;//當前節點的前一個節點。
    volatile Node next;//當前節點的後一個節點
    volatile Thread thread;//當前節點對應的線程
    Node nextWaiter;//存儲condition隊列中的後繼節點
}

Condition實現等待的時候內部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每個節點也是一個AbstractQueuedSynchronizer.Node實例。安全

每一個Condition對象中保存了firstWaiter和lastWaiter做爲隊列首節點和尾節點,每一個節點使用Node.nextWaiter保存下一個節點的引用,所以等待隊列是一個單向隊列。併發

每當一個線程調用Condition.await()方法,那麼該線程會釋放鎖,構形成一個Node節點加入到等待隊列的隊尾。源碼分析

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
        ..................
}

ConditionObject的await方法

public final void await() throws InterruptedException {
    if (Thread.interrupted())throw new InterruptedException();
    Node node = addConditionWaiter();//構造一個新的等待隊列Node加入到隊尾
    long savedState = fullyRelease(node);//釋放當前線程的獨佔鎖,無論重入幾回,都把state釋放爲0
    int interruptMode = 0;
    //若是當前節點沒有在同步隊列上,即尚未被signal,則將當前線程阻塞
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //被中斷則直接退出自旋
        //注意區分兩種中斷:是在被signal前中斷仍是在被signal後中斷,
        //若是是被signal前就被中斷則拋出 InterruptedException,
        //不然執行 Thread.currentThread().interrupt();
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //退出了上面自旋說明當前節點已經在同步隊列上,可是當前節點不必定在同步隊列隊首。
    //acquireQueued將阻塞直到當前節點成爲隊首,即當前線程得到了鎖。
    //acquireQueued源碼介紹可參考AQS源碼分析https://my.oschina.net/cqqcqqok/blog/1931790
    //而後await()方法就能夠退出了,讓線程繼續執行await()後的代碼。
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
final long fullyRelease(Node node) {
    boolean failed = true;
    try {
        long savedState = getState();
        if (release(savedState)) {//真正的釋放邏輯在AQS的子類中
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
final boolean isOnSyncQueue(Node node) {
    //若是當前節點狀態是CONDITION或node.prev是null,則證實當前節點在等待隊列上而不是同步隊列上。
    //之因此能夠用node.prev來判斷,是由於一個節點若是要加入同步隊列,在加入前就會設置好prev字段。
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null)  //若是node.next不爲null,則必定在同步隊列上,由於node.next是在節點加入同步隊列後設置的
        return true;
    //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是否是當前節點。
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

Condition.signal()方法

public final void signal() {
    //若是同步狀態不是被當前線程獨佔,直接拋出異常。從這裏也能看出來,Condition只能配合獨佔類同步組件使用。
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);//通知等待隊列隊首的節點。
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    }
    //transferForSignal方法嘗試喚醒當前節點,若是喚醒失敗,則繼續嘗試喚醒當前節點的後繼節點。
    while (!transferForSignal(first) && (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
       //若是當前節點狀態爲CONDITION,則將狀態改成0準備加入同步隊列;若是當前狀態不爲CONDITION,說明該節點等待已被中斷,則該方法返回false,doSignal()方法會繼續嘗試喚醒當前節點的後繼節點
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        //將節點加入同步隊列,返回的p是節點在同步隊列中的先驅節點
        Node p = enq(node);
        int ws = p.waitStatus;
        //若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,線程被喚醒後會執行acquireQueued方法,該方法會從新嘗試將節點的先驅狀態設爲SIGNAL並再次park線程;若是當前設置前驅節點狀態爲SIGNAL成功,那麼就不須要立刻喚醒線程了,當它的前驅節點成爲同步隊列的首節點且釋放同步狀態後,會自動喚醒它。
        //其實筆者認爲這裏不加這個判斷條件應該也是能夠的。只是對於CAS修改前驅節點狀態爲SIGNAL成功這種狀況來講,若是不加這個判斷條件,提早喚醒了線程,等進入acquireQueued方法了節點發現本身的前驅不是首節點,還要再阻塞,等到其前驅節點成爲首節點並釋放鎖時再喚醒一次;而若是加了這個條件,線程被喚醒的時候它的前驅節點確定是首節點了,線程就有機會直接獲取同步狀態從而避免二次阻塞,節省了硬件資源。
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

Condition等待通知的本質

總的來講,Condition的本質就是等待隊列和同步隊列的交互:ui

當一個持有鎖的線程調用Condition.await()時,它會執行如下步驟:this

  • 一、構造一個新的等待隊列節點加入到等待隊列隊尾
  • 二、釋放鎖,也就是將它的同步隊列節點從同步隊列隊首移除
  • 三、自旋,直到它在等待隊列上的節點移動到了同步隊列(經過其餘線程調用signal())或被中斷
  • 四、阻塞當前節點,直到它獲取到了鎖,也就是它在同步隊列上的節點排隊排到了隊首。

當一個持有鎖的線程調用Condition.signal()時,它會執行如下操做:.net

  • 從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操做;若是節點CANCELLED,就嘗試喚醒下一個節點;若是再CANCELLED則繼續迭代。線程

  • 對每一個節點執行喚醒操做時,首先將節點加入同步隊列,此時await()操做的步驟3的解鎖條件就已經開啓了。而後分兩種狀況討論:

    1. 若是先驅節點的狀態爲CANCELLED(>0) 或設置先驅節點的狀態爲SIGNAL失敗,那麼就當即喚醒當前節點對應的線程,此時await()方法就會完成步驟3,進入步驟4.

    2. 若是成功把先驅節點的狀態設置爲了SIGNAL,那麼就不當即喚醒了。等到先驅節點成爲同步隊列首節點並釋放了同步狀態後,會自動喚醒當前節點對應線程的,這時候await()的步驟3才執行完成,進入步驟4

總結

對Condition的源碼理解,主要就是理解等待隊列,等待隊列能夠類比同步隊列,並且等待隊列比同步隊列要簡單,由於等待隊列是單向隊列,同步隊列是雙向隊列。

如下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不一樣意見:

之因此同步隊列要設計成雙向的,是由於在同步隊列中,節點喚醒是接力式的,由每個節點喚醒它的下一個節點,若是是由next指針獲取下一個節點,是有可能獲取失敗的,由於虛擬隊列每添加一個節點,是先用CAS把tail設置爲新節點,而後才修改原tail的next指針到新節點的。所以用next向後遍歷是不安全的,可是若是在設置新節點爲tail前,爲新節點設置prev,則能夠保證從tail往前遍歷是安全的。所以要安全的獲取一個節點Node的下一個節點,先要看next是否是null,若是是null,還要從tail往前遍歷看看能不能遍歷到Node。

而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,所以每次要執行喚醒操做的時候,直接喚醒等待隊列的首節點就好了。等待隊列的實現中不須要遍歷隊列,所以也不須要prev指針。

參考地址

相關文章
相關標籤/搜索