condition實現原理

  condition是對線程進行控制管理的接口,具體實現是AQS的一個內部類ConditionObject,主要功能是控制線程的啓/停(這麼說並不嚴格,還要有鎖的競爭排隊)。
condition主要方法:
 
void await() throws InterruptedException
進入等待,直到被通知或中斷
void awaitUninterruptibly()
進入等待,直到被通知,不響應中斷
long awaitNanos(long nanosTimeout) throws InterruptedException
等待xxx納秒
boolean awaitUntil(Date deadline) throws InterruptedException
等待,直到某個時間點
void signal()
喚醒
void signalAll()
喚醒全部等待在condition上的線程,會從等待隊列挨個signal()
使用示例:
   經過實現一個有界隊列來深刻理解Condition的使用方式。有界隊列:一種特殊的隊列,當隊列爲空時,隊列的獲取操做將會阻塞獲取線程,直到隊列中有新增元素,當隊列已滿時,隊列的插入操做將會阻塞插入線程,
,直到隊列出現空位。
public class BoundedQueue<T> {
    private Object[] items;
    //添加的下標,刪除的下標和數組當前數量
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition empty = lock.newCondition();
    private Condition full = lock.newCondition();
    //構造方法
    public BoundedQueue(int size){
        items = new Object[size];
    }
    //添加元素,若是數組滿,則添加線程進入等待,直到有空位
    public void add(T t) throws InterruptedException{
        lock.lock();
        try {
            while (count == items.length)  //改爲if會如何
                full.await();
            items[addIndex] = t;
            if(++addIndex == items.length)
                addIndex = 0;
            ++count;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    //從頭部刪除一個元素,若是數組空,則刪除線程進入等待狀態,直到添加新元素
    public T remove() throws InterruptedException{
        lock.lock();
        try{
            while (count == 0)
                empty.await();
            Object x = items[removeIndex];
            if(++removeIndex == items.length)
                removeIndex = 0;
            --count;
            full.signal();
            return (T)x;
        }finally {
            lock.unlock();
        }
    }
}
  在這裏,阻塞隊列的數據存放是在items數組中,注意幾個下標的賦值操做,當addIndex到頭的時候,由於以前可能有remove操做,故items數組的頭部或者中間位置多是空的,若是繼續添加數據,數據應添加在頭部,至關於造成了一個「環」,這就是19-20行的含義。至於16行代碼中的while替換成if,書中給出的解釋是:使用while目的是爲了防止過早或意外的通知,只有條件符合才能退出循環。這個地方沒有想出相應的場景,僅從目前的代碼邏輯來講,換成if也是能夠的,但若是考慮異常致使等待線程被喚醒,那阻塞隊列就沒法正常工做了。
原理分析:
  ConditionObject是同步器AQS的內部類,由於Condition的操做須要獲取相關聯的鎖,因此做爲同步器的內部類也較爲合理。每一個Condition對象都包含着一個隊列(等待隊列),該隊列是condition對象實現等待/通知的功能的關鍵。
   等待隊列:
  等待隊列是一個FIFO的隊列,在隊列中的每一個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,若是一個線程調用了Condition.await()方法,那麼該線程將會釋放鎖、構形成節點加入等待隊列並進入等待狀態。事實上,節點的定義複用了同步器中節點的定義, 也就是說,同步隊列和等待隊列中節點類型都是同步器的靜態內部類AbstractQueuedSynchronizer.Node

  如圖所示,Condition擁有首尾節點的引用,而新增節點只須要將原有的尾節點nextWaiter指向它,而且更新尾節點便可。 上述節點引用更新的過程並無使用CAS保證,緣由在於調用await()方法的線程一定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。
  在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列, 而併發包中的Lock實現類擁有一個同步隊列和多個等待隊列:

  如上圖所示,Condition的實現是同步器的內部類,所以每一個Condition實例都可以訪問同步器提供的方法,至關於每一個Condition都擁有所屬同步器的引用。
等待:
  調用Condition的await()方法(或者以await開頭的方法),會使當前線程進入等待隊列並釋放鎖,同時線程狀態變爲等待狀態。當從await()方法返回時,當前線程必定獲取了Condition相關聯的鎖。
  若是從隊列(同步隊列和等待隊列)的角度看await()方法,當調用await()方法時,至關於同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中。
   Condition的await()方法:
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //當前線程加入等待隊列
    int savedState = fullyRelease(node); //釋放同步狀態,也就是釋放鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  調用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構形成節點並加入等待隊列中,( 由於已經獲取了同步狀態,因此無需經過cas,在隊列尾部添加等待節點 )而後釋放同步狀態,喚醒同步隊列中的後繼節點,而後當前線程會進入等待狀態。
  當等待隊列中的節點被喚醒,則喚醒節點的線程開始嘗試獲取同步狀態。若是不是經過其餘線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException。
  若是從隊列的角度去看,當前線程加入Condition的等待隊列,如圖所示,同步隊列的首節點並不會直接加入等待隊列,而是經過addConditionWaiter()方法把當前線程構形成一個新的節點並將其加入等待隊列中:
通知:
  調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點以前,會將節點移到同步隊列末尾。
  signal方法:
public final void signal() {
    if (!isHeldExclusively()) //是否獲取了鎖(重入鎖中是直接 return  獨佔鎖線程==當前線程)
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; //等待隊列頭節點
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null) //等待隊列首節點的後繼節點爲空,說明只有一個節點,那麼尾節點也置空
            lastWaiter = null; 
        first.nextWaiter = null;
    } while (!transferForSignal(first)  && (first = firstWaiter) != null); // 等待隊列首節點喚醒失敗,則喚醒下個節點
}
final boolean transferForSignal(Node node) {// 將節點加入同步隊列
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 設置節點狀態爲0----(等待隊列狀態只能爲-2,-3,用-2進行cas設置失敗,說明是-3)
        return false;
    Node p = enq(node); // 放入同步隊列尾部
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
  調用signal方法的前置條件是當前線程必須獲取了鎖,能夠看到signal()方法進行了isHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程。接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。
  節點從等待隊列移動到同步隊列的過程以下圖所示:
  被喚醒後的線程,將從await()方法中的while循環中退出(isOnSyncQueue(Node node)方法返回true,節點已經在同步隊列中),進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。成功獲取同步狀態以後,被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。
  Condition的signalAll()方法,至關於對等待隊列中的每一個節點均執行一次signal()方法,效果就是將等待隊列中全部節點所有移動到同步隊列中,並喚醒每一個節點的線程。
-----------------------------------------------------------------------------------
想到的問題:
  一、wait,sleep,park都有啥區別,這個讓出cpu資源的操做底層實現是同樣的麼?
  wait是Object的方法,會釋放鎖,原理是monitor機制(搶佔對象頭信息的鎖標誌位),是個native方法,也就是是java的一套機制,底層是用c編寫的,具體如何掛起線程的邏輯未知。
  sleep是Thread的方法,不會釋放鎖,也是個native方法,具體底層邏輯未知。
  park是unsafe類的方法,這個unsafe類提供了不少直接操做內存的方法,這個park也是個native方法,openjdk源碼顯示底層調用了操做系統的函數,停掉了線程。wait跟sleep應該也是相似原理。
  二、線程的喚醒是怎麼實現的,在線程很是多的狀況下,喚醒了就能立刻執行麼?
  喚醒確定也是最終調用os的函數來實現的,線程很是多的狀況下,硬件線程數固定,操做系統或者jvm確定也要有相應機制來調用線程,這個喚醒只是讓線程「醒」了而已,醒了!=執行,因此,喚醒了可能並不能立刻執行,而是要等待別的線程執行完後進行搶佔,成功後才能執行。
相關文章
相關標籤/搜索