這篇文章咱們來聊聊Java併發包中鎖的實現。
閱讀完本篇文章,你將瞭解到:
抽象同步器AQS的實現原理
ReentrantLock實現原理
非公平鎖和公平鎖實現的區別
基於這些內容,您也能夠本身進一步探索可中斷鎖的實現原理
AQS的核心是state字段以及雙端等待隊列
如何優雅的中斷一個線程
如下內容是基於JDK 1.8進行分析的。
bash
咱們查看下java.util.concurrent.locks
包下面,發現主要包含以下類:
咱們來構建他們的UML圖:
如上圖,拋開內部類,抽象類,接口,主要實現了三把鎖:
ReentrantLock
,StampedLock
,ReentrantReadWriteLock
。
ReentrantLock
了,關於ReentrantLock
的詳細說明以及使用案例:
咱們能夠發現ReentrantLock和ReentrantReadWriteLock頂層都是AbstractQueueSynchronizer類。
AbstractQueuedSynchronizer
類。
AbstractQueuedSynchronizer
,簡寫爲AQS
,抽象隊列同步器。
ReentrantLock
,Semaphore
,CountDownLatch
,ReentrantReadWriteLock
,SynchronousQueue
,FutureTask
。
接下來,咱們先來看看這個抽象同步隊列的原理。
說到AQS,咱們必需要先知道它是幹嗎的,而後再去研究它。
這樣勢必會涉及很頻繁的隊列入隊出隊操做,以及線程的阻塞喚醒操做。
AQS使用了模板方法,把同步隊列都封裝起來了,同時提供瞭如下五個未實現的方法,用於子類的重寫:
AQS同步器數據結構
如上圖,AQS中:
state
:
exclusiveOwnerThread
:
等待隊列(同步隊列)
:
雙向鏈表
結構。
head
:
tail
:
等待隊列中的節點結構是怎樣子的呢?
AQS隊列節點數據結構
pre
:
waitStatus
:
CANCELLED
:
SIGNAL
:
CONDITION
:
PROPAGATE
:
thread
:
nextWaiter
:
next
:
若是想要了解AQS的實現,您須要先知道如下這些內容,由於源碼中會大量使用:
AQS中線程的阻塞和喚醒基本上都使用這兩個方法實現的。
LockSupport是用來建立鎖和其餘同步類的基本線程阻塞
的原語。
此類與使用它的每一個線程關聯一個許可(permit: 0表示無許可,1 表示有許可),若是有許可,將馬上返回對park()的調用,而且在此過程化消耗掉它。
該類中常見的兩個方法兩個方法:
park(Object blocker)
:
unpark(Thread thread)
:
這兩個方法底層是調用了Unsafe中的park和unpark的native方法。
具體底層實現,能夠參考這裏[1]
咱們知道,計算機中提供了cas相關指令,這是一種樂觀的併發策略,須要硬件指令集的發展才能支持,實現了:
IA64 和 X86 使用cmpxchg指令完成CAS功能。
cas 內存位置 舊預期值 新值
CAS存在ABA問題,可使用版本號進行控制,保證其正確性。
JDK中的CAS,相關類:
Unsafe
裏面的compareAndSwapInt()
以及compareAndSwapLong()
等幾個方法包裝提供。只有啓動類加載器加載的class才能訪問他,或者經過反射獲取。
詳細說明:
相關閱讀:
爲了分析AQS的實現原理,咱們先挑一個方法來分析。
爲了弄清楚AQS中是如何進行隊列同步的,咱們先從一個簡單的獨佔加鎖方法提及。
這個方法是使用獨佔模式獲取鎖,忽略中斷。
咱們先看一下這個方法的入口代碼:
1public final void acquire(int arg) {2 if (!tryAcquire(arg) && // 嘗試獲取鎖,這裏是一個在AQS中未實現的方法,具體由子類實現3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 獲取不到鎖,則 1.添加到等待隊列 2.不斷循環等待重試4 selfInterrupt();5}複製代碼
一開始,會嘗試調用AQS中未實現的方法tryAcquire()
嘗試獲取鎖,獲取成功則表示獲取鎖了,該方法的實現通常經過CAS進行設置state嘗試獲取鎖:
不一樣的鎖能夠有不一樣的tryAcquire()
實現,因此,你能夠看到ReentrantLock鎖裏面會有非公平鎖和公平鎖的實現方式。
ReentrantLock公平鎖的實現代碼在獲取鎖以前多了一個判斷:
!hasQueuedPredecessors(),這個是判斷若是當前線程節點以前沒有其餘節點了,那麼咱們才能夠嘗試獲取鎖,這就是公平鎖的體現。
獲取鎖失敗以後,則會進入這一步,這裏會嘗試把線程節點追加到等待隊列後面,是經過CAS進行追加的,追加失敗的狀況下,會循環重試,直至追加成功爲止。
1private Node addWaiter(Node mode) { 2 // 將當期線程構形成Node節點 3 Node node = new Node(Thread.currentThread(), mode); 4 // Try the fast path of enq; backup to full enq on failure 5 Node pred = tail; 6 if (pred != null) { 7 // 將原來尾節點設置爲新節點的上一個節點 8 node.prev = pred; 9 // 嘗試用新節點取代原來的尾節點10 if (compareAndSetTail(pred, node)) {11 // 取代成功,則將原來尾指針的下一個節點指向新節點12 pred.next = node;13 return node;14 }15 }16 // 若是當前尾指針爲空,則調用enq方法17 enq(node);18 return node;19}複製代碼
加入等待隊列以後,會執行該方法,不斷循環地判斷當前線程節點是否在head後面一位,若是是則調用tryAcquire()獲取鎖,若是獲取成功,則把線程節點做爲Node head,並把原Node head的next設置爲空,斷開原來的Node head。
1final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true; 3 try { 4 boolean interrupted = false; 5 for (;;) { 6 // 獲取該節點的上一個節點,判斷是否頭節點,若是是則嘗試獲取鎖 7 final Node p = node.predecessor(); 8 if (p == head && tryAcquire(arg)) { 9 // 獲取鎖成功,把當前節點變爲頭節點10 setHead(node);11 p.next = null; // help GC12 failed = false;13 return interrupted;14 }15 // 判斷是否須要阻塞線程,該方法中會把取消狀態的節點移除掉,而且把當前節點的前一個節點設置爲SIGNAL16 if (shouldParkAfterFailedAcquire(p, node) &&17 parkAndCheckInterrupt())18 interrupted = true;19 }20 } finally {21 if (failed)22 cancelAcquire(node);23 }24}複製代碼
若是當前節點的pre不是head,或者爭搶失敗,則會將前面節點的狀態設置爲SIGNAL。
若是前面的節點狀態大於0,表示節點被取消,這個時候會把該節點從隊列中移除掉。
下圖爲嘗試CAS爭搶鎖,但失敗了,而後把head節點狀態設置爲SIGNAL:
而後再會循環一次嘗試獲取鎖,若是獲取失敗了,就調用LockSupport.park(this)
掛起線程。
那麼時候纔會觸發喚起線程呢?
你們看AQS的源碼的時候,能夠發現這裏的線程阻塞與喚醒基本上是用一個循環+LockSupport.park+LockSupport.unpark實現的,你們知道爲何要這樣作?
相關閱讀:
如何優雅的掛起線程
入口代碼以下:
1public final boolean release(int arg) {2 if (tryRelease(arg)) { // 嘗試釋放鎖3 Node h = head; 4 if (h != null && h.waitStatus != 0) // 若是頭節點waitStatus不爲0,則喚醒後續線程節點繼續處理5 unparkSuccessor(h);6 return true;7 }8 return false;9}複製代碼
tryRelease()具體由子類實現。
若是釋放鎖成功,而且頭節點waitStatus!=0,那麼會調用unparkSuccessor()
通知喚醒後續的線程節點進行處理。
注意:
CANCELLED
那麼就會忽略這個節點,而後從隊列尾部向前遍歷,找到與頭結點最近的沒有被取消的節點進行喚醒操做。
喚醒以後,節點對應的線程2又從acquireQueued()
方法的阻塞處醒來繼續參與爭搶鎖。
這樣一個AQS獨佔式、非中斷的搶佔鎖的流程就結束了。
最後咱們再以另外一個維度的流程來演示下這個過程。
首先有4個線程爭搶鎖,線程1,成功了,其餘三個失敗了,分別依次入等待隊列:
線程二、線程3依次入隊列:
如今忽然發生了點事情,假設線程3用的是帶有超時時間的tryLock,超過了等待時間,線程3狀態變爲取消狀態了,這個時候,線程4追加到等待隊列中後,發現前一個節點的狀態是1取消狀態,那麼會執行操做把線程3節點從隊列中移除掉:
最後,線程1釋放了鎖,而後把head節點ws設置爲0,而且找到了離head最靠近的一個waitStatus<=0的線程並喚醒,而後參與競爭獲取鎖:
最終,線程2獲取到了鎖,而後把本身變爲了Head節點,並取代了原來的Head節點:
接着就一直這樣循環,我就再也不畫圖了,聰明的你應該對此瞭如指掌了。
好了,有了這個AQS,咱們就能夠很快速的構造屬於本身的鎖了。
咱們來分別構造一個獨佔不可中斷公平鎖和非公平鎖吧?
發現裏面分別有一個公平鎖和非公平鎖的實現。
除此以外,ReentrantLock同時提供瞭如下幾個經常使用的API:
lock()
: 調用該方法會使鎖計數器加1,若是共享資源最初是空閒的,則將鎖定並授予線程;
unlock()
: 調用該方法使鎖計數器減1,當計數達到0的時候,將釋放資源;
tryLock()
: 若是資源沒有被任何其餘線程佔用,那麼該方法返回true,而且鎖計數器加1。
lockInterruptible()
: 該方法使得資源空閒時容許該線程在獲取資源時被其餘線程中斷。
感興趣能夠看個人這篇文章進一步瞭解:
咱們能夠看到,ReentrantLock是經過委託AQS的子類FairSync
和NonfairSync
來調用AQS的方法,而不是直接擴展AQS,這樣作能夠避免ASQ中的方法污染了鎖的API,破壞鎖接口的簡潔性。
好了,咱們今天就講到這裏了,最後,我留下兩個課堂做業給你們思考下:
一、ReentrantLock的公平鎖是怎麼實現的?
如何作到公平的? 二、ReentrantLock的非公平鎖是怎麼實現的?
爲何說它是非公平的? 三、ReentrantLock的可中斷鎖是如何實現的?
interrupt()函數執行原理是什麼? 四、ReentrantLock的可超時的鎖是如何實現的?
相信聰明的你很快能夠找到答案。
本文爲arthinking基於相關技術資料和官方文檔撰寫而成,確保內容的準確性,若是你發現了有何錯漏之處,煩請高擡貴手幫忙指正,萬分感激。
你們能夠關注個人博客:
itzhai.com
獲取更多文章,我將持續更新後端相關技術,涉及JVM、Java基礎、架構設計、網絡編程、數據結構、數據庫、算法、併發編程、分佈式系統等相關內容。
若是您以爲讀完本文有所收穫的話,能夠關注個人帳號,或者點個贊吧,碼字不易,您的支持就是我寫做的最大動力,再次感謝!
關注個人公衆號,及時獲取最新的文章。
更多文章
JVM系列專題:
[1]: 淺談Java併發編程系列(八)—— LockSupport原理剖析
[2]: Java Thread Primitive Deprecation
本文做者:
arthinking
博客連接:https://www.itzhai.com/cpj/aqs-and-lock-implementation-in-concurrent-packages.html
AQS與併發包中鎖的實現
版權聲明:BY-NC-SA許可協議: 創做不易,如需轉載,請務必附加上博客連接,謝謝!
·END·
訪問IT宅(itzhai.com)查看個人博客更多文章
掃碼關注及時獲取新內容↓↓↓
Java後端技術架構 · 技術專題 · 經驗分享
碼字不易,若有收穫,點個「贊」哦~