上篇文章 AQS系列一:源碼分析非公平ReentrantLock 中,咱們分析了ReentrantLock的非公平實現,本篇會承接上文,繼續分析ReentrantLock的公平鎖實現(以及Condition的實現)。java
在此以前咱們要先弄明白,「不公平」體如今哪裏。node
好吧,我也不清楚。
因而我對比了ReentrantLock的非公平和公平實現,即NonfairSync
vs FairSync
,發現差異主要體現在加鎖,更確切的說是獲取鎖環節。segmentfault
## 非公平獲取鎖 final boolean nonfairTryAcquire(int acquires) { ... if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } ... } ## 公平獲取鎖 protected final boolean tryAcquire(int acquires) { ... if (!hasQueuedPredecessors() //### 差異體如今此處 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } ... }
顯然,公平鎖多執行了!hasQueuedPredecessors()
,看看此方法的邏輯。併發
public final boolean hasQueuedPredecessors() { Node h, s; if ((h = head) != null) { ## h頭結點 if ((s = h.next) == null ... ) { ## s二號節點 ... } if (s != null && s.thread != Thread.currentThread()) ##檢查2號節點綁定線程,是否當前線程 return true; } return false; }
hasQueuedPredecessors方法只有在 2號節點不爲空,且綁定線程非當前線程的前提下,會返回true。
返回ture意味着!hasQueuedPredecessors() = false
,沒有資格獲取鎖(就是沒機會執行compareAndSetState——嘗試修改state)函數
反過來說,沒有隊列(無線程正在執行),或者沒有2號節點(取消或者臨時狀態),再或者2號節點的綁定線程就是當前線程時,才會嘗試獲取鎖。工具
咱們分析下最後這種狀況,2號節點綁定的線程是第1個等待的線程(第1個獲取鎖失敗的線程),第1個等待的線程在hasQueuedPredecessors()的運做下,成爲了第1個有資格嘗試獲取鎖的線程。而這,就是公平!源碼分析
那麼沒有hasQueuedPredecessors方法的非公平鎖,到底「不公平」在哪兒呢?
咱們回想一下,在加 / 解鎖的過程當中,nonfairTryAcquire方法被調用的位置就能獲得答案了。ui
public final void acquire(int arg) { if (!tryAcquire(arg) ### 位置1,嘗試獲取 && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; ... for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { ### 位置2,嘗試獲取 setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } ... }
在上述代碼中,tryAcquire(非公平實現會調用nonfairTryAcquire)會在位置一、2兩處觸發。試想以下場景:this
線程T-3
執行完畢,調用了unlock;隨着線程T-2
被喚醒,位置2處代碼可能會被執行。線程T-1
的介入,位置1處的代碼也有可能被執行。所以線程T-2
和T-1
誰能在併發中搶到鎖,存在不肯定性。spa
原理說完了,那具體怎麼構建公平的ReentrantLock呢?構造函數傳參便可:
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); ## 入參fair傳入true,構建公平鎖 }
ReentrantLock的加解鎖過程已詳細分析了一遍,若是你常用這個工具,確定對衍生出另外一個大咖condition
有所瞭解。
二話不說,先甩出demo:
static Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();; public void doSomething(){ lock.lock(); System.out.println(String.format("%s線程,獲取到鎖了",Thread.currentThread().getName())); try { System.out.println(String.format("%s線程,await",Thread.currentThread().getName())); TimeUnit.SECONDS.sleep(2L); //模擬耗時業務邏輯執行 condition.await(); //await System.out.println(String.format("%s線程,await被喚醒",Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s線程,業務執行完畢",Thread.currentThread().getName())); lock.unlock(); } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 1; while (total>0){ Thread t = new Thread(()->{ test.doSomething(); },"T-"+total); t.start(); TimeUnit.MILLISECONDS.sleep(200L); //讓子線程T-1率先獲取到鎖 lock.lock(); System.out.println(String.format("%s線程,獲取到鎖了",Thread.currentThread().getName())); test.condition.signal(); System.out.println(String.format("%s線程,signal",Thread.currentThread().getName())); lock.unlock(); total--; } }
結合已掌握的加解鎖原理,分析demo執行過程:
線程T-1
先獲取到鎖,200ms後main線程
也會嘗試獲取鎖,固然main線程
獲取不到——因爲耗時達2s的業務邏輯瘋狂執行中。(sleep處,此時main線程應該構建了同步隊列,main線程
做爲2號節點的綁定線程被無情阻塞,下圖)線程T-1
搞定了難纏的業務邏輯,卻又遭遇condition.await()
的伏擊線程main
發現本身神奇的不被阻塞了,又神奇的獲取到了鎖。因而投桃報李,condition.signal()
接unlock
二連招喚醒了線程T-1
線程T-1
覺醒於await
處,執行完剩餘邏輯demo的執行結果,能初步證實上述分析:
T-1線程,獲取到鎖了 T-1線程,await main線程,獲取到鎖了 main線程,signal T-1線程,await被喚醒 T-1線程,業務執行完畢
從構造函數出發:
public Condition newCondition() { return sync.newCondition(); } ## Sync類建立ConditionObject final ConditionObject newCondition() { return new ConditionObject(); }
ConditionObject是AQS中的另外一內部類,看看它的屬性:
## ConditionObject類 private transient Node firstWaiter; private transient Node lastWaiter;
感受上和AQS的設定上有些像?
## AQS類 private transient volatile Node head; private transient volatile Node tail;
先大膽猜想一下,condition中極可能會再次構建同步隊列。
接下來就是驗證咱們的猜想的過程:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); ## 建立等待隊列,node是尾節點。 詳情參看[addConditionWaiter詳情] int savedState = fullyRelease(node); ## 重置state,返回重置前的state值。 詳情參看[fullyRelease詳情] int interruptMode = 0; while (!isOnSyncQueue(node)) { ## 是否在AQS同步隊列中 LockSupport.park(this); ## 不在AQS同步隊列的節點,阻塞當前線程 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
private Node addConditionWaiter() { if (!isHeldExclusively()) ## 當前線程是否owner線程,若是不是,拋異常——這兒決定了await必須用在lock()方法以後 throw new IllegalMonitorStateException(); Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Node.CONDITION); ## 建立新節點,原子形賦值waitStatus=CONDITION=-2,並綁定當前線程到node節點 ## node會做爲尾節點,置於隊列最後 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
final int fullyRelease(Node node) { try { int savedState = getState(); ## 獲取當前state if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } } public final boolean release(int arg) { if (tryRelease(arg)) { ## 嘗試「清0」state Node h = head; ## 此處head不爲空,unpark線程main,return true if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) ## 當前線程驗證,若是當前線程!=owner,拋異常 throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { ## 若是state清0,同時清空owner線程,return true free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
有了以前分析ReentrantLock的經驗,與之很是相像的condition代碼應該不難拿下。
這裏畫出await方法中fullyRelease(node)執行先後的節點和關鍵屬性的變化:
圖右側(await方法執行到了LockSupport.park(this)時),線程T-1
已經阻塞,線程main
則解除阻塞狀態。
經過上圖很容易看出,咱們以前的猜想是正確的:await
方法又構建了一個同步隊列,不過此次的頭、尾指針在ConditionObject
類中。
再來看看signal方法做了什麼:
public final void signal() { if (!isHeldExclusively()) ## 和await()方法中的同樣,先驗證owner線程 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) ## condition頭結點傳遞 && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false; Node p = enq(node); ## 將ConditionObject頭結點移動到AQS隊列尾部。 詳情參看[enq詳情] int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) ## 取消或修改waitStatus失敗才做unpark操做,此處unpark不會觸發 LockSupport.unpark(node.thread); return true; }
private Node enq(Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); ## 入參node,成爲了AQS隊列新的尾節點 if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); ## 初始化AQS隊列 } } }
signal中最神奇的莫過於enq(node)方法,它完成了節點的轉移,condition隊列頭結點 -> AQS隊列尾節點。
經過下圖觀察整個signal方法產生的各對象結構和屬性變化:
觀察可知,signal執行後節點轉移已經完成,線程T-1依然阻塞,此時ConditionObject已經完成了它的歷史使命。
線程T-1何時解除阻塞呢?其實這部分上篇文章已經分析過了,就是咱們的老朋友unlock()。
區別在於線程T-1被喚醒後,執行的是await後續的邏輯:
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { ## 2.下次循環,node已經在AQS隊列中,返回true,跳出循環 LockSupport.park(this); ## 1.線程T-1覺醒於此 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) ## 3.再度獲取到鎖 && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
至此,咱們已經瞭解了ReentrantLock的主邏輯的源碼實現(公平、非公平、condition),本系列的下篇文章將進入下一副本——CountDownLatch
,敬請期待!