圖解併發包中鎖的通用實現

導讀

這篇文章咱們來聊聊Java併發包中鎖的實現。 由於這其中涉及到了一點數據結構和線程掛起、喚醒等處理流程,我將源碼中的關鍵邏輯繪製成圖片的格式,方便你們有一個更加直觀的理解。 html

閱讀完本篇文章,你將瞭解到: java

  1. 抽象同步器AQS的實現原理 node

  2. ReentrantLock實現原理 算法

  3. 非公平鎖和公平鎖實現的區別 數據庫

  4. 基於這些內容,您也能夠本身進一步探索可中斷鎖的實現原理 編程

  5. AQS的核心是state字段以及雙端等待隊列 segmentfault

  6. 如何優雅的中斷一個線程 後端

一、包結構介紹

如下內容是基於JDK 1.8進行分析的。 bash

咱們查看下java.util.concurrent.locks包下面,發現主要包含以下類: 網絡

image-20200301111726969
image-20200301111726969

咱們來構建他們的UML圖:

image-20200301113556313
image-20200301113556313

如上圖,拋開內部類,抽象類,接口,主要實現了三把鎖: ReentrantLockStampedLockReentrantReadWriteLock最經常使用的就是ReentrantLock了,關於ReentrantLock的詳細說明以及使用案例: ReentrantLock介紹與使用

咱們能夠發現ReentrantLock和ReentrantReadWriteLock頂層都是AbstractQueueSynchronizer類。 咱們先來介紹下AbstractQueuedSynchronizer類。

二、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer,簡寫爲AQS,抽象隊列同步器。 它是一個用於構建鎖和同步器的框架,許多同步器均可以經過AQS很容易而且高效的構造出來,如下都是經過ASQ構造出來的: ReentrantLockSemaphoreCountDownLatchReentrantReadWriteLockSynchronousQueueFutureTask

接下來,咱們先來看看這個抽象同步隊列的原理。

2.一、AQS原理

說到AQS,咱們必需要先知道它是幹嗎的,而後再去研究它。 那我直接先講重點了: AQS是經過隊列來輔助實現線程同步的。 線程併發爭奪state資源,爭奪失敗的則進入等待隊列(同步隊列)並進入阻塞狀態,在state資源被釋放以後,從隊列頭喚醒被阻塞的線程節點,進行state資源的競爭。

這樣勢必會涉及很頻繁的隊列入隊出隊操做,以及線程的阻塞喚醒操做。 這些操做偏偏是最難編寫,最容易出錯的,爲此AQS把這些操做作了封裝,以模板的方式提供出來,咱們能夠經過實現模板的相關方法,實現不同的鎖或者同步器。

AQS使用了模板方法,把同步隊列都封裝起來了,同時提供瞭如下五個未實現的方法,用於子類的重寫:

AQS數據結構

AQS同步器數據結構

image-20200307215156707
image-20200307215156707

如上圖,AQS中:

  • state全部線程經過經過CAS嘗試給state設值,當state>0時表示被線程佔用; 同一個線程屢次獲取state,會疊加state的值,從而實現了可重入;

  • exclusiveOwnerThread在獨佔模式下該屬性會用到,當線程嘗試以獨佔模式成功給state設值,該線程會把本身設置到exclusiveOwnerThread變量中,代表當前的state被當前線程獨佔了;

  • 等待隊列(同步隊列)等待隊列中存放了全部爭奪state失敗的線程,是一個雙向鏈表結構。 state被某一個線程佔用以後,其餘線程會進入等待隊列; 一旦state被釋放(state=0),則釋放state的線程會喚醒等待隊列中的線程繼續嘗試cas設值state;

  • head指向等待隊列的頭節點,延遲初始化,除了初始化以外,只能經過setHead方法進行修改;

  • tail指向等待隊列的隊尾,延遲初始化,只能經過enq方法修改tail,該方法主要是往隊列後面添加等待節點。

等待隊列中的節點結構是怎樣子的呢? 下面咱們來看看。

AQS隊列節點數據結構

image-20200307220200532
image-20200307220200532
  • pre指向隊列中的上一個節點;

  • waitStatus節點的等待狀態,初始化爲0,表示正常同步等待:

  • CANCELLED1 節點因超時或者被中斷而取消時設置爲取消狀態;

  • SIGNAL-1 指示當前節點被釋放後,須要調用unpark通知後面節點,若是後面節點發生競爭致使獲取鎖失敗,也會將當前節點設置爲SIGNAL;

  • CONDITION-2 指示該線程正在進行條件等待,條件隊列中會用到;

  • PROPAGATE-3 共享模式下釋放節點時設置的狀態,表示無限傳播下去。

  • thread當前節點操做的線程;

  • nextWaiter該字段在Condition條件等待中會用到,指向條件隊列的下一個節點。 或者連接到SHARED常量,表示節點正在以共享模式等待;

  • next指向隊列中的下一個節點。

若是想要了解AQS的實現,您須要先知道如下這些內容,由於源碼中會大量使用:

LockSupport.park(Object blocker)和LockSupport.unpark(Thread thread)

AQS中線程的阻塞和喚醒基本上都使用這兩個方法實現的。 其底層都是依賴Unsafe實現的。

LockSupport是用來建立鎖和其餘同步類的基本線程阻塞的原語。

此類與使用它的每一個線程關聯一個許可(permit: 0表示無許可,1 表示有許可),若是有許可,將馬上返回對park()的調用,而且在此過程化消耗掉它。 不然,park()會致使線程進入阻塞; 調用 unpark() 可以使許可證可用,若是尚不可用。 不過與信號量不一樣的是,許可證不會累加,最多隻有一個。

該類中常見的兩個方法兩個方法:

  • park(Object blocker)實現線程的阻塞。 除非有許可,不然出於線程調度目的將阻塞線程; 若是有許可,則將許可消耗,而後線程往下繼續執行;

  • unpark(Thread thread)實現解除線程的阻塞。 若是線程在park方法上被阻塞,則調用該方法將取消阻塞。 不然,許可變爲1,保證下一次調用park方法不會阻塞。

這兩個方法底層是調用了Unsafe中的park和unpark的native方法。

具體底層實現,能夠參考這裏[1]

cas

咱們知道,計算機中提供了cas相關指令,這是一種樂觀的併發策略,須要硬件指令集的發展才能支持,實現了: 操做+衝突檢測的原子性。

IA64 和 X86 使用cmpxchg指令完成CAS功能。

cas 內存位置 舊預期值 新值

CAS存在ABA問題,可使用版本號進行控制,保證其正確性。

JDK中的CAS,相關類: Unsafe裏面的compareAndSwapInt()以及compareAndSwapLong()等幾個方法包裝提供。 只有啓動類加載器加載的class才能訪問他,或者經過反射獲取。

詳細說明: 一文帶你完全理解同步和鎖的本質(乾貨)#2.1.五、基於硬件指令

interrupt

相關閱讀: 如何優雅的中斷線程

爲了分析AQS的實現原理,咱們先挑一個方法來分析。

2.二、AQS中的通常處理流程

爲了弄清楚AQS中是如何進行隊列同步的,咱們先從一個簡單的獨佔加鎖方法提及。

2.2.一、public final void acquire(int arg)

這個方法是使用獨佔模式獲取鎖,忽略中斷。 經過至少調用一次tryAcquire成功返回來實現。 不然,線程將排隊,並可能反覆阻塞和解除阻塞,並調用tryAcquire直到成功。

咱們先看一下這個方法的入口代碼:

1public final void acquire(int arg) {2  if (!tryAcquire(arg) &&  // 嘗試獲取鎖,這裏是一個在AQS中未實現的方法,具體由子類實現3      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 獲取不到鎖,則 1.添加到等待隊列 2.不斷循環等待重試4    selfInterrupt();5}複製代碼

tryAcquire

一開始,會嘗試調用AQS中未實現的方法tryAcquire()嘗試獲取鎖,獲取成功則表示獲取鎖了,該方法的實現通常經過CAS進行設置state嘗試獲取鎖:

image-20200311103257880
image-20200311103257880

不一樣的鎖能夠有不一樣的tryAcquire()實現,因此,你能夠看到ReentrantLock鎖裏面會有非公平鎖和公平鎖的實現方式。

ReentrantLock公平鎖的實現代碼在獲取鎖以前多了一個判斷: !hasQueuedPredecessors(),這個是判斷若是當前線程節點以前沒有其餘節點了,那麼咱們才能夠嘗試獲取鎖,這就是公平鎖的體現。

addWaiter

獲取鎖失敗以後,則會進入這一步,這裏會嘗試把線程節點追加到等待隊列後面,是經過CAS進行追加的,追加失敗的狀況下,會循環重試,直至追加成功爲止。 若是追加的時候,發現head節點還不存在,則先初始化一個head節點,而後追加上去:

1private Node addWaiter(Node mode) { 2  // 將當期線程構形成Node節點 3  Node node = new Node(Thread.currentThread(), mode); 4  // Try the fast path of enq; backup to full enq on failure 5  Node pred = tail; 6  if (pred != null) { 7    // 將原來尾節點設置爲新節點的上一個節點 8    node.prev = pred; 9    // 嘗試用新節點取代原來的尾節點10    if (compareAndSetTail(pred, node)) {11      // 取代成功,則將原來尾指針的下一個節點指向新節點12      pred.next = node;13      return node;14    }15  }16  // 若是當前尾指針爲空,則調用enq方法17  enq(node);18  return node;19}複製代碼
image-20200311103646009
image-20200311103646009

acquireQueued

加入等待隊列以後,會執行該方法,不斷循環地判斷當前線程節點是否在head後面一位,若是是則調用tryAcquire()獲取鎖,若是獲取成功,則把線程節點做爲Node head,並把原Node head的next設置爲空,斷開原來的Node head。 注意這個Node head只是佔位做用,每次處理的都是Node head的下一個節點:

1final boolean acquireQueued(final Node node, int arg) { 2  boolean failed = true; 3  try { 4    boolean interrupted = false; 5    for (;;) { 6      // 獲取該節點的上一個節點,判斷是否頭節點,若是是則嘗試獲取鎖 7      final Node p = node.predecessor(); 8      if (p == head && tryAcquire(arg)) { 9        // 獲取鎖成功,把當前節點變爲頭節點10        setHead(node);11        p.next = null; // help GC12        failed = false;13        return interrupted;14      }15      // 判斷是否須要阻塞線程,該方法中會把取消狀態的節點移除掉,而且把當前節點的前一個節點設置爲SIGNAL16      if (shouldParkAfterFailedAcquire(p, node) &&17          parkAndCheckInterrupt())18        interrupted = true;19    }20  } finally {21    if (failed)22      cancelAcquire(node);23  }24}複製代碼

若是當前節點的pre不是head,或者爭搶失敗,則會將前面節點的狀態設置爲SIGNAL。

若是前面的節點狀態大於0,表示節點被取消,這個時候會把該節點從隊列中移除掉。

下圖爲嘗試CAS爭搶鎖,但失敗了,而後把head節點狀態設置爲SIGNAL:

image-20200311103510923
image-20200311103510923

而後再會循環一次嘗試獲取鎖,若是獲取失敗了,就調用LockSupport.park(this)掛起線程。

那麼時候纔會觸發喚起線程呢? 這個時候咱們得先看看釋放鎖是怎麼作的了。

你們看AQS的源碼的時候,能夠發現這裏的線程阻塞與喚醒基本上是用一個循環+LockSupport.park+LockSupport.unpark實現的,你們知道爲何要這樣作?

相關閱讀: 如何優雅的掛起線程

2.2.二、public final boolean release(int arg)

入口代碼以下:

1public final boolean release(int arg) {2  if (tryRelease(arg)) { // 嘗試釋放鎖3    Node h = head; 4    if (h != null && h.waitStatus != 0)  // 若是頭節點waitStatus不爲0,則喚醒後續線程節點繼續處理5      unparkSuccessor(h);6    return true;7  }8  return false;9}複製代碼

tryRelease()具體由子類實現。 通常處理流程是讓state減1。

若是釋放鎖成功,而且頭節點waitStatus!=0,那麼會調用unparkSuccessor()通知喚醒後續的線程節點進行處理。

注意: 在遍歷隊列查找喚醒下一個節點的過程當中,若是發現下一個節點狀態是CANCELLED那麼就會忽略這個節點,而後從隊列尾部向前遍歷,找到與頭結點最近的沒有被取消的節點進行喚醒操做。

image-20200311103813051
image-20200311103813051

喚醒以後,節點對應的線程2又從acquireQueued()方法的阻塞處醒來繼續參與爭搶鎖。 而且爭搶成功了,那麼會把head節點的下一個節點設置爲null,讓本身所處的節點變爲head節點:

image-20200311104443722
image-20200311104443722

這樣一個AQS獨佔式、非中斷的搶佔鎖的流程就結束了。

2.2.三、完整流程

最後咱們再以另外一個維度的流程來演示下這個過程。

首先有4個線程爭搶鎖,線程1,成功了,其餘三個失敗了,分別依次入等待隊列:

image-20200311105159906
image-20200311105159906

線程二、線程3依次入隊列:

image-20200311110811069
image-20200311110811069

如今忽然發生了點事情,假設線程3用的是帶有超時時間的tryLock,超過了等待時間,線程3狀態變爲取消狀態了,這個時候,線程4追加到等待隊列中後,發現前一個節點的狀態是1取消狀態,那麼會執行操做把線程3節點從隊列中移除掉:

image-20200311110905140
image-20200311110905140

最後,線程1釋放了鎖,而後把head節點ws設置爲0,而且找到了離head最靠近的一個waitStatus<=0的線程並喚醒,而後參與競爭獲取鎖:

image-20200311110947681
image-20200311110947681

最終,線程2獲取到了鎖,而後把本身變爲了Head節點,並取代了原來的Head節點:

image-20200311111049725
image-20200311111049725

接着就一直這樣循環,我就再也不畫圖了,聰明的你應該對此瞭如指掌了。

三、使用AQS實現的鎖

好了,有了這個AQS,咱們就能夠很快速的構造屬於本身的鎖了。

咱們來分別構造一個獨佔不可中斷公平鎖和非公平鎖吧? 沒必要了,其實ReentrantLock正是這樣一個鎖:

發現裏面分別有一個公平鎖和非公平鎖的實現。 相信通過本文介紹,你能夠很快看懂他們的源碼,並知道實現原理了。

除此以外,ReentrantLock同時提供瞭如下幾個經常使用的API:

  • lock(): 調用該方法會使鎖計數器加1,若是共享資源最初是空閒的,則將鎖定並授予線程;

  • unlock(): 調用該方法使鎖計數器減1,當計數達到0的時候,將釋放資源;

  • tryLock(): 若是資源沒有被任何其餘線程佔用,那麼該方法返回true,而且鎖計數器加1。 若是資源不是空閒的,則該方法返回false。 這個時候線程不會阻塞,而是直接退出返回結果;

  • lockInterruptible(): 該方法使得資源空閒時容許該線程在獲取資源時被其餘線程中斷。 也就是說: 若是當前線程正在等待鎖,但其餘線程請求該鎖,則當前線程將被中斷並當即返回,不會繼續等待獲取鎖;

感興趣能夠看個人這篇文章進一步瞭解: https://www.itzhai.com/cpj/introduction-and-use-of-reentrantlock.html

3.一、組合大於繼承原則

咱們能夠看到,ReentrantLock是經過委託AQS的子類FairSyncNonfairSync來調用AQS的方法,而不是直接擴展AQS,這樣作能夠避免ASQ中的方法污染了鎖的API,破壞鎖接口的簡潔性。 

結語

好了,咱們今天就講到這裏了,最後,我留下兩個課堂做業給你們思考下:

一、ReentrantLock的公平鎖是怎麼實現的? 如何作到公平的?

二、ReentrantLock的非公平鎖是怎麼實現的? 爲何說它是非公平的?

三、ReentrantLock的可中斷鎖是如何實現的? interrupt()函數執行原理是什麼?

四、ReentrantLock的可超時的鎖是如何實現的?

相信聰明的你很快能夠找到答案。

本文爲arthinking基於相關技術資料和官方文檔撰寫而成,確保內容的準確性,若是你發現了有何錯漏之處,煩請高擡貴手幫忙指正,萬分感激。

你們能夠關注個人博客: itzhai.com 獲取更多文章,我將持續更新後端相關技術,涉及JVM、Java基礎、架構設計、網絡編程、數據結構、數據庫、算法、併發編程、分佈式系統等相關內容。

若是您以爲讀完本文有所收穫的話,能夠關注個人帳號,或者點個贊吧,碼字不易,您的支持就是我寫做的最大動力,再次感謝!

關注個人公衆號,及時獲取最新的文章。



更多文章

JVM系列專題: 公衆號發送 JVM


References

[1]: 淺談Java併發編程系列(八)—— LockSupport原理剖析

[2]: Java Thread Primitive Deprecation


本文做者: arthinking
博客連接: https://www.itzhai.com/cpj/aqs-and-lock-implementation-in-concurrent-packages.html
AQS與併發包中鎖的實現
版權聲明: BY-NC-SA許可協議: 創做不易,如需轉載,請務必附加上博客連接,謝謝!



·END·

 訪問IT宅(itzhai.com)查看個人博客更多文章

掃碼關注及時獲取新內容↓↓↓


Java架構雜談

Java後端技術架構 · 技術專題 · 經驗分享

blog: itzhai.com


碼字不易,若有收穫,點個
「贊」哦~

相關文章
相關標籤/搜索