早些時候(jdk 1.5以前),併發環境下作同步控制,你的選擇很少,多半是使用synchronized
關鍵字。不論是同步方法仍是同步塊,總之遇到這個關鍵字,未獲取鎖線程就會乖乖等候,直到已獲取鎖的線程釋放掉鎖。java
而jdk 1.5推出ReenntrantLock
以後,此工具一度很風靡,當時人們更喜歡用Lock而不是synchronized,主要是由於它用起來靈活吧。(本人到如今爲止,用synchronized的場景仍是Lock的時候多)直到後來,愈來愈多的文章,從性能、是否公平、實現原理各個方面對兩者比較,你們纔對他們有了更直觀的認識。node
本文旨在分析ReenntrantLock的主要實現邏輯,並初步窺探AQS結構。若是不犯懶的話,但願後續能將AQS作成系列,真正理解Doug Lea大神的這個經典實現。segmentfault
研究工具的原理以前,要先會使用工具。併發
public class ReentrantLockTest { Lock lock = new ReentrantLock(); //建立鎖 public void doSomething(){ //### 1-嘗試獲取鎖,成功 if(lock.tryLock()){ System.out.println(String.format("%s線程,獲取到鎖了",Thread.currentThread().getName())); try { //模擬邏輯執行 TimeUnit.MILLISECONDS.sleep(1100L); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s線程,業務執行完畢",Thread.currentThread().getName())); lock.unlock(); //### 1.1-邏輯執行完,釋放鎖 } //### 2-嘗試獲取鎖,失敗 else { System.out.println(String.format("%s線程,獲取鎖失敗",Thread.currentThread().getName())); } } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 3; while (total>0){ Thread t = new Thread(()->{ test.doSomething(); },"T-"+total); t.start(); total--; TimeUnit.MILLISECONDS.sleep(1000L); } } }
tryLock()
方法會嘗試獲取鎖,若是獲取不到,直接return false
(不會阻斷);若是獲取到鎖,return true
。工具
上面的例子,執行結果爲:源碼分析
T-3線程,獲取到鎖了 T-2線程,獲取鎖失敗 T-3線程,業務執行完畢 T-1線程,獲取到鎖了 T-1線程,業務執行完畢
修改下上例中的加鎖方式:性能
Lock lock = new ReentrantLock(); public void doSomething2(){ lock.lock(); System.out.println(String.format("%s線程,獲取到鎖了",Thread.currentThread().getName())); try { TimeUnit.MILLISECONDS.sleep(1000L); //模擬業務邏輯 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(String.format("%s線程,業務執行完畢",Thread.currentThread().getName())); lock.unlock(); } public static void main(String[] args) throws InterruptedException { ReentrantLockTest test = new ReentrantLockTest(); int total = 3; while (total>0){ Thread t = new Thread(()->{ test.doSomething2(); },"T-"+total); t.start(); total--; } }
與tryLock()不通,lock()
方式嘗試獲取鎖,若是獲取不到會持續等待。ui
執行結果會變爲:編碼
T-3線程,獲取到鎖了 T-3線程,業務執行完畢 T-2線程,獲取到鎖了 T-2線程,業務執行完畢 T-1線程,獲取到鎖了 T-1線程,業務執行完畢
ReenntrantLock 加 / 解鎖的使用方式就這些,而它是靠編碼實現的。下圖給出了ReenntrantLock類部分結構:spa
ReenntrantLock默認實現的是非公平鎖(本文也只分析非公平實現)。
final Sync sync; public ReentrantLock() { sync = new NonfairSync(); //成員變量sync,賦值成NonfairSync的對象 }
先從實現較簡單的tryLock()
研究:
## ReentrantLock類 public boolean tryLock() { return sync.nonfairTryAcquire(1); } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync類 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 1- 獲取AQS類中的state狀態值 if (c == 0) { // 2- 若是state是0(默認值),將state原子形修改爲1 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); // 2.1- 原子修改爲功,標記AOS中的exclusiveOwnerThread爲當前線程 return true; } } // 3- 此時state不是0,當前線程 == AOS中的exclusiveOwnerThread,將state修改成1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
tryLock()方法,核心邏輯就是原子修改AQS中的state
值,volatile
+CAS
(jdk9 VarHandle實現)。
具體一些:
實現過程當中,只在首次修改state
值,即將其從0改爲1的時候,採用了原子的CAS
方式。
以後只判斷當前線程和owner線程
(AOS中的exclusiveOwnerThread)是否一致,若是一致state++;不一致,直接return false
。
unLock()實現一樣簡單
## ReentrantLock類 public void unlock() { sync.release(1); } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync類 public final boolean release(int arg) { ... tryRelease(arg) //嘗試釋放 ... } ↓↓↓↓↓ ↓↓↓↓↓ ## ReentrantLock.Sync類 protected final boolean tryRelease(int releases) { int c = getState() - releases; // state-- // 1-驗證線程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { //2-若是state==0時,將結果賦值爲true,清空owner線程 free = true; setExclusiveOwnerThread(null); } setState(c); //state賦值 return free; }
若是操做線程是owner
線程(首次tryLock()時會記錄owner):tryLock()
每次調用,state++
;unLock()
每次調用,state--
(state=0時,清空owner線程)。
Tip: 註釋1處,若是當前線程非owner線程,會直接拋出異常!
對於 tryLock() 而言,它在實現上,徹底沒用到AQS的精華。既然叫Abstract Queued Synchronizer——抽象隊列同步器,隊列、同步什麼的纔是重點。別急,lock()
方法會用到這些。
public void lock() { sync.acquire(1); } public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //中斷interrupt }
對於默認的非公平鎖實現,acquire(int arg)
徹底可替換
成以下寫法:
public final void acquire(int arg) { ##### tryAcquire(arg) 改爲了 tryLock(arg) if (!tryLock(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); //線程 interrupt }
如此替換後,邏輯就很好理解了:在用tryLock()
獲取鎖失敗的狀況下,會調用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
而 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 顯然也分紅了兩個方法addWaiter
和acquireQueued
addWaiter(Node.EXCLUSIVE)
部分:private Node addWaiter(Node mode) { Node node = new Node(mode); //建立node,建立的同時綁定線程 for (;;) { Node oldTail = tail; if (oldTail != null) { //循環2-將node節點和首次循環中初始化的隊列關聯 node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); //循環1-初始化同步隊列 } } }
這裏需關注 AQS.Node 類 的一些關鍵屬性(已文字標明各屬性用途):
## 表示Node節點的狀態,有CANCELLED(待取消)、SIGNAL(待喚醒)、CONDITION或默認的0幾個狀態 volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; volatile Node prev; //prev指向前節點 volatile Node next; //next指向後節點 ## 節點綁定線程 volatile Thread thread;
經過下圖,可更清楚的看出addWaiter方法的執行過程(此時線程T-3
在執行中):
結論1:
`addWaiter`會建立隊列,並返回尾節點,即圖中的`Node2`
acquireQueued(final Node node, int arg)
方法:final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor(); //獲取pre節點,就是Node1 if (p == head && tryAcquire(arg)) { //### 註釋1-再次嘗試獲取鎖 setHead(node); //獲取到鎖了,去掉Node1,Node2變成新的head節點 p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } ... } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //2次循環,將waitStatus==Node.SIGNAL,renturn true return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { pred.compareAndSetWaitStatus(ws, Node.SIGNAL); //首次循環,將pre節點Node1的waitStatus修改爲SIGNAL } return false; }
這裏依照上圖,詳細解釋下:acquireQueued
方法的入參前面提到了,就是 addWaiter方法 新增的尾節點,即入參node
= Node2,那麼node.predecessor()天然是Node1了——p
= Node1。
註釋1位置,先判斷p
是否是 頭結點:
若是p
是頭節點(上圖中,p就是頭結點),tryAcquire(arg)
會再次嘗試獲取鎖。此時也有兩種狀況:
線程T-3
已經執行完並釋放了鎖,那麼當前線程T-2
能夠獲取到鎖;以後去掉當前頭結點Node1,將Node2設置成頭結點。線程T-3
未執行完,那麼當前線程T-2
沒法獲取鎖,以後會執行shouldParkAfterFailedAcquire(Node pred, Node node)方法p
不是頭結點,一樣會執行shouldParkAfterFailedAcquire(Node pred, Node node)方法而因爲shouldParkAfterFailedAcquire(Node pred, Node node)方法在循環中,可能會執行兩次:
waitStatus
修改爲SIGNAL
(注意,因爲循環的原故,還會再次執行到註釋1
處,也就會再次嘗試獲取鎖——上次線程T-3未結束,此次就有可能結束了);waitStatus
已是SIGNAL
,直接return true
。後面的parkAndCheckInterrupt()方法會將當前線程T-2
阻塞。給出線程T-2
未獲取鎖狀況下的隊列狀況:
列出線程T-1
也參與其中的完整隊列圖。可看到尾節點以前的節點,綁定的線程都是阻塞
狀態(park),而waitStatus
都是待喚醒
狀態(waitStatus = SIGNAL = -1):
總結以上內容,做爲結論2:
`acquireQueued`方法,若是當前線程是第1個獲取鎖失敗的線程(例子中「線程T-3」正在執行,「線程T-2」就是第一個獲取鎖失敗的線程),會再嘗試2次獲取鎖; 獲取鎖失敗 或 當前線程非第1個獲取鎖失敗的線程(例子中T-1就不是第一個獲取鎖失敗的線程),將前置節點狀態修改爲待喚醒,並阻塞關聯線程。
爲了便於理解,畫出整個acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
方法的邏輯圖:
阻塞並不是終點,還要再次看下unlock()
時作了什麼。
## ReentrantLock類 public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { //嘗試釋放,前面的已經分析過了 Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); // ### 重點看unparkSuccessor(h)方法,入參是`頭節點` return true; } return false; } ## AQS類 private void unparkSuccessor(Node node) { // 獲取Node節點的waitStatus,若是<0(好比待喚醒SIGNA = -1),原子形還原成0 int ws = node.waitStatus; if (ws < 0) node.compareAndSetWaitStatus(ws, 0); // 獲取頭結點的下一個節點,若是是空(CANCELLED可能產生空),鏈表尾部遍歷,取最前面一個waitStatus<0的節點 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node p = tail; p != node && p != null; p = p.prev) if (p.waitStatus <= 0) s = p; } if (s != null) LockSupport.unpark(s.thread); // 喚醒 }
先不考慮CANCELLED狀況,那麼第二個節點對應的線程會被喚醒。第二個節點是什麼來路?前面已經分析了,第1個獲取鎖失敗的線程會和第二個節點綁定(例子中的Node2,對應的線程天然是T-2,下圖):
線程T-2
被喚醒後,會作什麼?
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { //循環 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); //### 線程T-2本來被阻塞於此 } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } }
很顯然,若是線程T-2
被喚醒後,因爲循環的原故,會再次進入以下邏輯:
final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); //head易主 p.next = null; return interrupted; } private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
tryAcquire(arg)
再次嘗試獲取鎖,顯然此時線程T-3
已經執行完了(否則也不會執行unlock),那麼線程T-2
極可能會獲取到鎖——
那麼,head易主,隊列發生以下變化:
最後給出加 / 解鎖過程當中的隊列變化,幫助理解。
以上,終於分析完了 ReentrantLock的主要方法的實現。(有點細碎哈)
本系列的下一篇文章 AQS系列二:源碼分析「公平」ReentrantLock和Condition 會繼續探索ReentrantLock
的公平鎖實現,敬請期待!