上一篇:
從ReentrantLock分析AbstractQueuedSynchronized源碼
1.示例代碼
演示一個生產者消費者的場景,await阻塞,signal喚醒node
細節點:ui
aqs裏面鎖是什麼,簡單來講就是state的值,state>0線程持有了鎖,state=0線程釋放了鎖。this
aqs裏面線程是什麼,簡單來講就是Node節點,Node節點經過構成鏈表結構來表明線程的執行的前後順序,這個鏈表有雙向的,有單向的,其屬性prev和next以及全局head,tail屬性用於構建雙向鏈表,其nextWaiter以及firstWaiter,lastWaiter構成單向鏈表,waitStatus的屬性用於表示當前線程的等待狀態;spa
調用await的方法的線程,必然持有鎖;.net
生產者put和消費者take的方法使用同一個lock,存在資源競爭;線程
await方法會阻塞當前線程,signal會喚醒對應的處於條件隊列中等待的線程,並將處於條件隊列中的的節點移動到雙向同步隊列裏面;對象
有了這幾個概念,有助於理解下面所說的。blog
final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putIndex, takeIndex, count; public void put(Object item) throws InterruptedException { //這裏會將state值修改,state值在aqs中是全局的,無論同步隊列仍是條件隊列都共享一個state lock.lock(); try { //items滿了,阻塞 while (count == items.length) { //調用await方法的線程必須持有鎖 //思考:這裏調用了await方法是否應該釋放當前線程的鎖呢?由於全局共用一個state值。 notFull.await(); } items[putIndex] = item; if (++putIndex == items.length) { putIndex = 0; } ++count; notEmpty.signal();//items中已經有值,通知notEmpty隊列消費 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { //items爲空 while (count == 0) { //調用await方法的線程必須持有鎖 //思考:這裏調用了await方法是否應該釋放當前線程的鎖呢? notEmpty.await(); } Object item = items[takeIndex]; if (++takeIndex == items.length) { takeIndex = 0; } --count; notFull.signal();//items中已經被消費掉,通知notFull生產 return item; } finally { lock.unlock(); } }
2.雙向同步隊列和Condition條件隊列
2.1.雙向同步隊列結構
這是lock加鎖和unLock釋放鎖過程當中造成的隊列,有一個虛的頭節點,虛節點不存儲線程等相關信息,nextWaiter屬性在這種隊列中無實際做用。經過prev和next屬性串聯節點,thread保存當前線程,waitStatus表示當前線程的等待狀態。隊列
2.2.Condition條件隊列結構
由示例代碼能夠看出,每一個Condition對象都對應了一個Condition條件隊列,並且每一個條件隊列是相互獨立的。資源
Condition條件隊列是一個單向的隊列,並且firstWaiter也有實際做用,彼此以前是經過nextWaiter關聯的,在Condition條件隊列中,prev和next屬性也並不會用到。
噹噹前線程調用了notFull.await()方法的時候,當前線程就會被包裝成Node被加入到notFull隊列的末尾。
在條件隊列中,咱們須要關注的是waitStatus的屬性爲CONDITION,當waitStatus=CONDITION的時候,咱們就認爲當前線程不須要等待,能夠出隊了。
2.3.雙向同步隊列和條件隊列的聯繫
通常狀況下,雙向同步隊列和條件隊列是相互獨立的。可是,當咱們調用條件隊列的signal方法的時候,會將條件隊列中的處於等待的線程喚醒,被喚醒的線程一樣和普通線程同樣要去爭搶鎖資源。若是爭搶失敗,一樣要被加到雙向同步隊列中去。這個時候,就須要將條件隊列中的節點一個個轉移到雙向同步隊列中去了。注意,這個節點從條件隊列遷移到雙向同步隊列是一個一個遷移的。
3.分析ConditionObject
final Condition notFull = lock.newCondition();
該類位於AbstractQueuedSynchronized裏面
核心屬性:
//條件隊列首節點 private transient Node firstWaiter; //條件隊列尾節點 private transient Node lastWaiter;
構造方法:
public ConditionObject() { }
3.1.分析ConditionObject.await()方法
await方法執行過程:
(1)lock加鎖;
(2)await方法檢測中斷狀態;
(3)fullyRelease方法釋放鎖;
(4)節點若是不位於同步隊列,掛起線程,等待被喚醒;
(5)節點被signal喚醒或者中斷喚醒;
(6)checkInterruptWhileWaiting檢查中斷模式,根據節點位於同步隊列仍是條件隊列設置中斷模式,並將節點加入到同步隊列中;
(7)acquireQueued嘗試加鎖,加鎖失敗在同步隊列中等待被喚醒,掛起線程;
(8)加鎖成功持有鎖,reportInterruptAfterWait處理中斷;
可以調用到await方法的線程都已經獲取到鎖。
調用await方法這裏有兩種狀況:
1.中斷髮生時,線程尚未被signal過,則線程恢復後,拋出異常;
2.中斷髮生時,線程已經被signal過,那麼線程已經被從condition隊列中移動到同步隊列中了,那麼await方法調用以後,
只是再補一下中斷,也就僅僅改變了線程的中斷狀態。
從await方法咱們能夠看出來,這個中斷在這裏僅僅只判斷中斷狀態。咱們直到thread.interrupt()方法只改變線程的中斷狀態爲true而並不能真正意義上的中止線程,再配上LockSupport.unpark喚醒線程,就可使得await方法拋出異常。
await方法退出的時候根據interruptMode來處理這兩種狀況:
先看interruptMode屬性值:
//若是是默認值0,表明線程整個過程當中沒有發生中斷 //=1表示退出await方法時再自我中斷,這種模式發生在signal方法調用以後,由於當前節點被加入到了同步隊列 //因此還須要中斷 private static final int REINTERRUPT = 1; //=-1說明退出await方法的時候拋出InterruptedException private static final int THROW_IE = -1;
public final void await() throws InterruptedException { //判斷當前線程是不是中斷狀態,若是是直接拋出異常 //await方法中發現了線程中斷,則拋出異常,後續reportInterruptAfterWait方法也會處理中斷模式 if (Thread.interrupted()) throw new InterruptedException(); //新建一個節點並加入到條件隊列中 Node node = addConditionWaiter(); //徹底釋放當前線程的鎖,等待其餘線程爭搶鎖 long savedState = fullyRelease(node); int interruptMode = 0;//中斷屬性默認值,表明不須要中斷 //判斷節點是否處於同步隊列中,返回false則表示在條件隊列中 //在調用signal方法以前,新加的節點都是位於條件隊列中 //調用signal方法會將處於條件隊列中的節點遷移到同步隊列中 while (!isOnSyncQueue(node)) { //節點位於條件隊列則掛起當前線程,等待signal方法喚醒,線程當前被掛起在這個位置 LockSupport.park(this); //線程如何被喚醒: //1.線程被signal喚醒後,馬上走到這裏 //3.被其餘線程中斷+喚醒,thread.interrupt() + LockSupport.unpark()方法; if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //能執行到這裏,說明node已經被遷移到同步隊列中了,並且當前線程也被喚醒了 //條件1:嘗試加鎖 //條件2:中斷屬性不爲-1 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //嘗試加鎖成功或中斷屬性不爲-1,則設置屬性爲1,表明須要自我中斷 interruptMode = REINTERRUPT; //node被加入到同步隊列中的時候,並無設置nextWaiter=null //若是有條件隊列中還有其餘節點,清除條件隊列中取消狀態的節點 if (node.nextWaiter != null) unlinkCancelledWaiters(); //根據中斷模式處理中斷 if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
3.1.1.分析ConditionObject.addConditionWaiter方法
//建立一個新節點並將它加入到條件隊列中,並返回該節點 //執行過程當中剔除了已取消的節點 private Node addConditionWaiter() { //獲取條件隊列最後一個節點 Node t = lastWaiter; //條件1:若是t!=null說明條件隊列中已經有Node節點了 //條件2:t.waitStatus != Node.CONDITION成立,說明當前尾節點取消了等待 //那麼咱們新加的節點就不該該在當前尾節點的後面了 if (t != null && t.waitStatus != Node.CONDITION) { //剔除全部取消狀態的節點 unlinkCancelledWaiters(); //更新臨時變量的值,可能由於上一個方法改變尾節點 t = lastWaiter; } //建立一個新節點並設置waitStatus=CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //若是沒有最終節點 if (t == null) //設置第一個節點爲當前節點 firstWaiter = node; else //若是有最終節點,設置當前節點的後續節點爲新建立的節點 t.nextWaiter = node; //更新最終節點爲當前節點 lastWaiter = node; return node; }
3.1.2.分析unlinkCancelledWaiters方法
//剔除取消等待的節點 private void unlinkCancelledWaiters() { //獲取首節點 Node t = firstWaiter; Node trail = null; //循環遍歷鏈表 while (t != null) { //獲取下一個節點 Node next = t.nextWaiter; //條件成立,說明當前節點取消了等待 if (t.waitStatus != Node.CONDITION) { //將首節點的的nextWaiter設置爲空,斷鏈操做,幫助gc t.nextWaiter = null; //若是鏈表尚未找到正常狀態的節點 if (trail == null) //將next節點設置爲首節點 firstWaiter = next; else //讓上一個正常節點指向取消等待節點的下一個節點 trail.nextWaiter = next; //若是不存在下一個節點,當前尾節點設置爲正常節點 if (next == null) lastWaiter = trail; } //條件不成立說明當前t節點是正常節點 else //給正常節點賦值,這裏就找出了全部的正常節點 trail = t; //循環繼續向下遍歷 t = next; } }
3.1.3.分析ConditionObject.fullRelease方法
//徹底釋放鎖,當failed=true的時候,說明當前線程未持有鎖調用了await方法 //調用await方法必須持有鎖 //假如failed=true,finally中會將加入到隊列中的節點狀態設置爲取消狀態 //後續節點入隊的過程會調用unlinkCancelledWaiters方法清除取消狀態的節點 //釋放鎖成功後返回當前鎖的state的值 final int fullyRelease(Node node) { boolean failed = true; try { //獲取當前線程state值的總值 int savedState = getState(); //直接調用release方法釋放鎖 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
3.1.4.分析ConditionObject.isOnSyncQueue方法
判斷節點是否處於同步隊列中,前面已經說過,當調用signal方法的時候,可能會將條件隊列中的節點遷移到同步隊列中。
final boolean isOnSyncQueue(Node node) { //當前節點的狀態爲CONDITION,說明處於條件隊列中 //當前節點的前驅節點爲空,說明處於條件隊列中,由於雙向同步隊列存在虛的首節點 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //當前節點的後續節點不爲空,說明處於同步隊列中,由於條件隊列的prev和next屬性爲空 if (node.next != null) return true; //若是上面條件都不成立,說明當前節點知足成爲同步隊列節點的屬性,即prev和next屬性不爲空 //循環鏈表從後向前遍歷,看是否能找到當前節點,找到則返回true return findNodeFromTail(node); }
private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } }
3.1.5.分析checkInterruptWhileWaiting方法
該方法返回線程的中斷的模式
private int checkInterruptWhileWaiting(Node node) { //Thread.interrupted返回當前線程的中斷狀態並將中斷狀態清除 //被喚醒的線程當前中斷狀態仍是中斷,須要清除 //以後調用transferAfterCancelledWait方法判斷節點是由signal喚醒仍是中斷喚醒 //位於condition隊列說明是中斷喚醒,而後中斷模式返回THROW_IE //位於同步隊列則說明是signal喚醒,返回中斷模式REINTERRUPT return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
3.1.6.分析transferAfterCancelledWait方法
該方法能夠判斷當前節點是否位於條件隊列,位於條件隊列,則須要將節點加入到同步隊列,並返回true;
若是位於同步隊列則返回false;
final boolean transferAfterCancelledWait(Node node) { //條件成立,說明當前node的waitStatus值爲CONDITION,當前node位於條件隊列中 //爲何會出現這種狀況,被signal方法喚醒後的線程不是在同步隊列中了嗎? //若是咱們是經過中斷喚醒,那麼這裏仍是須要將 //節點加入到同步隊列中的 if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //中斷喚醒的node也會被加到阻塞隊列中 enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
3.1.7.分析reportInterruptAfterWait方法
根據中斷模式處理中斷
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt();//線程自我中斷,僅僅改變了中斷狀態 }
3.2.分析ConditionObject.signal方法
public final void signal() { //是否獨佔鎖線程,不是則拋出異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //條件隊列有node節點,則進行條件隊列遷移到同步隊列的操做。 if (first != null) doSignal(first); }
//將條件隊列中全部節點遷移到同步隊列中 private void doSignal(Node first) { do { //firstWaiter = first.nextWaiter,處理完當前節點,則讓firstWaiter指向下一個節點 if ( (firstWaiter = first.nextWaiter) == null) //條件隊列爲空,則更新lastWaiter爲空 lastWaiter = null; //若是下一個節點爲空,則出while循環 first.nextWaiter = null; //while循環遷移某個節點成功和條件隊列爲空纔會退出循環 } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) { //經過CAS操做修改當前條件隊列中節點狀態爲0,由於節點要被遷移到同步隊列了 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將節點加入到同步隊列尾節點並返回其前一個節點 Node p = enq(node); int ws = p.waitStatus; //在同步隊列中,waitStatus只有1,0,-1三種狀態,大於0說明前驅節點是取消狀態 //若是前驅節點是取消狀態,則喚醒當前線程 //若是前驅節點不是取消狀態,則設置前驅節點狀態爲-1,表明須要喚醒後續節點 //若是compareAndSetWaitStatus(p, ws, Node.SIGNAL)返回false,說明當前node是 //lockInterrupt入隊的node,是會響應中斷的,若是外部線程中斷該node,前驅node會 //將節點狀態修改成取消狀態,並執行出隊邏輯 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }