java.util.concurrent 同步器框架詳解

做者簡介java

善光,一個半年纔打一次籃球的程序猿。一直從事 Java 化開發,目前負責物流壓力平衡、千里眼和風暴眼 Java 化項目。node

引言

通常的應用系統中,存在着大量的計算和大量的 I/O 處理,經過多線程可讓系統運行得更快。但在 Java 多線程編程中,會面臨不少的難題,好比線程安全、上下文切換、死鎖等問題。算法

線程安全

引用 《Java Concurrency in Practice》 的做者 Brian Goetz 對線程安全的定義:express

線程安全,當多個線程訪問一個對象時,若是不用考慮這些線程在運行時環境下的調度和交替執行,也不須要進行額外的同步,或者在調用方進行任何其餘的協調操做,調用這個對象的行爲均可以得到正確的結果,那這個對象就是線程安全的編程

那麼如何實現線程安全呢?互斥同步是最多見的一種線程安全保障手段,同步是指在多個線程併發訪問共享數據時,保證共享數據在同一時刻只被一條線程使用。而互斥是實現同步的一種手段,臨界區、互斥量和信號量都是主要的互斥實現方式。安全

在 Java 中,最基本的互斥同步手段就是 synchronized 關鍵字,synchronized 有兩種使用形式,分別爲同步方法和同步代碼塊:bash

/**
 * 同步方法在執行前先獲取一個監視器,若是是一個靜態方法,監視器關聯這個類,若是是一個實例方法, 
 * 則關聯這個調用方法的對象。同步方法是隱式的,同步方法常量池中會有一個 ACC_SYNCHRONIZED 標 
 * 志,當某個線程訪問某個方法的時候,會檢查是否有 ACC_SYNCHRONIZED,若是有,則須要先得到監 
 * 視器鎖,而後開始執行方法,方法執行以後再釋放監視器鎖,這時候若是有其餘線程來請求執行該方法, 
 * 會由於沒法得到監視器鎖而被阻塞住。
 */
class Test {
    int count;
    synchronized void bump(){
        count++;
    }
    static int classCount;
    static synchronized void classBump(){
        classCount ++;
    }
}
複製代碼
/** 
 * 同步塊在執行前先獲取一個監視器,監視器關聯括號裏面 expression 引用的對象。 同步代碼塊則是使用 
 * monitorenter 和 monitorexit 兩個指令實現的,monitorenter 能夠理解爲加鎖,monitorexit 能夠理解爲 
 * 釋放鎖,每一個對象自身維護着一個被加鎖次數的計數器,當計數器爲 1 時,只有得到鎖的線程才能再次獲 
 * 得鎖,便可重入鎖,當計數器爲0時表示任意線程能夠得到該鎖。
 * synchronized 的 expression 只能是引用類型,不然會發生編譯錯誤,若是爲空,則會拋出空指針異常,
 * 無論同步塊中的代碼是否正常執行完,監視器都會解鎖。
 */
synchronized (expression){
   // block code
}
複製代碼

那麼對象如何與監視器關聯呢,在 Java 中,對象包含三塊:對象頭、實例數據、填充數據,其中對象頭中就包含 Mark Word,Mark Word 通常存儲對象的 hashCode、GC分代年齡以及鎖信息,鎖信息就包含指向互斥量(重量級鎖)的指針,指向了一個監視器;監視器是經過 ObjectMonitor 來實現的,代碼以下:多線程

ObjectMonitor() {
    _header       = NULL;
    _count        = 0; //記錄個數
    _waiters      = 0,
    _recursions   = 0;
    _object       = NULL;
    _owner        = NULL;
    _WaitSet      = NULL; //處於wait狀態的線程,會被加入到_WaitSet
    _WaitSetLock  = 0 ;
    _Responsible  = NULL ;
    _succ         = NULL ;
    _cxq          = NULL ;
    FreeNext      = NULL ;
    _EntryList    = NULL ; //處於等待鎖block狀態的線程,會被加入到該列表
    _SpinFreq     = 0 ;
    _SpinClock    = 0 ;
    OwnerIsThread = 0 ;
}
複製代碼

從上面代碼能夠看到有 ObjectMonitor 兩個隊列,分別是 _WaitSet 和 _EntryList,_owner 指向持有 ObjectMonitor 對象的線程,當多個線程獲取到對象 monitor 後進入 _owner 區域,並把 _owner 設置爲指向當前線程,並把 _count 數量加1;當調用 wait() 方法後,將釋放當前持有的 monitor,_owner 置爲空,_count 減 1 操做,同時,將該線程進入 _WaitSet 集合中等待喚醒,總結以下圖: 架構

image

上下文切換

現代操做系統中,運行一個程序,系統會爲它建立一個進程。現代操做系統調度的最小單元是線程,也叫輕量級進程(Light Weight Process),在一個進程裏能夠建立多個線程,這些線程都擁有各自的計數器、堆棧和局部變量等屬性,而且可以訪問共享內存變量。併發

主要有三種方式實現線程:內核線程實現、用戶線程實現、用戶線程加輕量級進程混合實現。Java 裏面的 Thread 類,它的全部關鍵方法都是聲明 Native 的,和平臺有關的方法。從 JDK 1.2 起,對於 Sun JDK 來講,它的 Windows 版與 Linux 版都使用一對一的線程模型實現,一條 Java 線程就映射到一條輕量級進程中,程序通常不會直接去使用內核線程,而是去使用內核線程的一種高級接口——輕量級進程,輕量級進程就是咱們一般意義上所說的線程,因爲每一個輕量級進程都由一個內核線程支持,所以,只有先支持內核級線程,纔能有輕量級進程。這種輕量級進程與內核線程之間 1:1 的關係稱爲一對一的線程模型。

image

因爲有內核線程的支持,每一個輕量級進程都成爲一個獨立的調度單元,即便有一個輕量級進程阻塞了,也不會影響整個進程的工做,可是輕量級進程有它的侷限性,每一個輕量級進程都須要一個內核線程的支持,所以輕量級進程要消耗必定的內核資源,所以一個系統支持輕量級進程的數量是有限的,其次,系統調用的代價相對較高,須要在用戶態(User Mode)和內核態(Kernal Mode)中來回切換,線程上下文切換直接的損耗CPU寄存器須要保存和加載, 系統調度器的代碼須要執行, TLB實例須要從新加載, CPU 的pipeline須要刷掉。對於搶佔式操做系統來講:

  • 當前執行任務的時間片用完以後,系統CPU正常調度下一個任務
  • 當前執行任務碰到IO阻塞,調度器將此任務掛起,繼續下一任務
  • 多個任務搶佔鎖資源,當前任務沒有搶到鎖資源,被調度器掛起,繼續下一任務
  • 用戶代碼掛起當前任務,讓出CPU時間
  • 硬件中斷

綜上所述,在 JDK 1.5 以前,經過 synchronized 關鍵字是保證線程安全的一種重要手段,可是 synchronized 是一個重量級鎖,爲何說 synchronized 是一個重量級鎖呢?由於 synchronized 依賴於操做系統的 MutexLock(互斥鎖)來實現的,且等待獲取鎖的線程將會阻塞,被阻塞的線程不會消耗 CPU,可是阻塞或喚醒一個線程都須要涉及到上下文切換,涉及到用戶態和內核態的切換,因此是比較耗時的。

那除了 synchronized 以外,咱們還有其餘的方案嗎?答案是確定的,咱們可使用 java.util.concurrent 的 ReentrantLock,可是在瞭解 ReentrantLock 以前,咱們先了解下同步器框架。

同步器框架

概要

在 JDK1.5 的 java.util.concurrent 包中,大部分的併發類都是基於 AbstractQueuedSynchronizer(簡稱 AQS) 這個簡單的同步器框架構建的。這個框架爲原子性管理同步狀態、阻塞和喚醒線程、排隊等提供了一種通用的機制。

提供了一個框架,用於實現依賴先進先出(FIFO)等待隊列的阻塞鎖和相關同步器(semaphores、events 等),這些同步器依賴於單個原子 int 值來表示同步狀態,提供了原子方法更新狀態 getState()、 setState(int)、compareAndSetState(int, int)。

AQS 是一個抽象類,並無對併發類提供了一個統一的接口定義,而是由子類根據自身的狀況實現相應的方法,AQS 中通常包含兩個方法 acquire(int)、release(int),獲取同步狀態和釋放同步狀態,AQS 根據其狀態是否獨佔分爲獨佔模式共享模式

  • 獨佔模式:同一時刻最多隻有一個線程獲取同步狀態,處於該模式下,其餘線程試圖獲取該鎖將沒法獲取成功。
  • 共享模式:同一時刻會有多個線程獲取共享同步狀態,處於該模式下,其餘線程試圖獲取該鎖可能會獲取成功。

這個類還定義了 AbstractQueuedSynchronizer.ConditionObject,實現了 Condition 接口,用於支持管程形式的 await、signal 操做,這些操做與獨佔模式的 Lock 類有關,且 Condition 的實現天生就和與其關聯的Lock 類緊密相關。

設計與實現

同步器的背後的基本思想很是簡單,acquire 操做以下:

while (synchronization state does not allow acquire) {
    enqueue current thread if not already queued;
    possibly block current thread;
}
dequeue current thread if it was queued;
複製代碼

release 操做以下:

update synchronization state;
if (state may permit a blocked thread to acquire)
    unblock one or more queued threads;
複製代碼

爲了實現上述 acquire、release 操做,須要完成如下三個功能:

  • 同步狀態的原子性管理;
  • 線程的阻塞與喚醒;
  • 排隊機制;

同步狀態

AQS 類經過使用單個 int 類型來保存同步狀態,並提供了 getState()、 setState(int)、compareAndSetState(int, int) 三個方法來讀取和更新狀態,而且此同步狀態經過關鍵字 volatile 修飾,保證了多線程環境下的可見性,compareAndSetState(int, int) 是經過 CAS(Compare and swap,比較並交換) 實現的,當多個線程同時對某個資源進行 CAS 操做的時候,只能有一個線程操做成功,但並不會阻塞其餘線程,其餘線程會收到操做失敗的信號,CAS 是一個輕量級的樂觀鎖。CAS 的底層經過 Unsafe 類實現的,利用處理器提供的 CMPXCHG 指令實現其原子性,使得僅當同步器狀態爲一個指望值的時候,纔會被原子的更新成目標值,相比 synchronized 不會致使過多的上下文切換切換和掛起線程。在 java.util.concurrent 包中,大量地使用 CAS 來實現原子性。

線程的阻塞和喚醒

LockSupport 類是一個很是方便的線程阻塞工具類,它能夠在線程任意位置讓線程阻塞。和Thread.suspend() 相比,它彌補了因爲 resume() 在前發生,致使線程沒法繼續執行的狀況。和Object.wait() 相比,它不須要先得到某個對象的鎖,也不會拋出 InterruptedException 異常。LockSupport.park() 方法阻塞當前線程,這是由於 LockSupport 類使用相似信號量的機制,它爲每個線程準備了一個許可,若是許可可用,那麼 park() 會當即返回,而且消費這個許可(設置許可不可用),就會阻塞,而 unpark() 則使得一個許可變爲可用(可是和信號量不一樣的是,許可不可累加可用,你不可能擁有超過一個許可,它永遠只有一個)。

排隊機制

同步隊列 AQS 整個框架的關鍵都是如何管理被阻塞線程的隊列,在 AQS 中,運用到了 CLH 鎖的思想,CLH 鎖被用於自旋鎖,能夠確保沒有飢餓感,提供先到先得的公平服務,CLH鎖是基於列表的可伸縮,高性能,公平和自旋鎖,應用程序線程僅在局部變量上旋轉,它不斷輪詢前驅狀態,若是發現預釋放鎖定結束旋轉。

AQS 同步隊列是一個 FIFO 隊列,在此同步隊列中,一個節點表示一個線程,它保存着線程的引用、狀態、前驅節點、後繼節點。同步隊列經過兩個節點 tail 和 head 來存取,初始化時,tail、head 初始化爲一個空節點,線程要加入到同步隊列中,經過 CAS 原子地拼接爲新的 tail 節點,線程要退出隊列,只需設置 head 節點指向當前線程節點。

image

同步隊列的優勢在於其出隊和入隊的操做都是無鎖的、快速的。爲了將 CLH 鎖隊列用於阻塞同步器,該同步隊列須要作些額外的修改以提供一種高效的方式定位某個節點的後繼節點,在自旋鎖中,一個節點只需改變其狀態,下一次自旋中其後繼節點就能注意到這個改變。可是在阻塞式同步器中,一個節點須要顯示地喚醒其後繼節點。同步隊列包含一個 next 連接到它的後繼節點。第二個對 CLH 鎖隊列主要的修改是將每一個節點都有的狀態字段用於控制阻塞而非自旋。

基本的 acquire 操做的最終實現通常形式(互斥、非中斷、無超時):

if(!tryAcquire(arg)) {
	node = create and enqueue new node;
    pred = node's effective predecessor; while (pred is not head node || !tryAcquire(arg)) { if (pred's signal bit is set)
            park();
        else
            compareAndSet pred's signal bit to true; pred = node's effective predecessor;
    }
    head = node;
}
複製代碼

release 操做以下:

if(tryRelease(arg) && head node's signal bit is set) { compareAndSet head's bit to false;
    unpark head's successor, if one exist } 複製代碼

acquire 的操做的主循環次數依賴於具體實現類 tryAcquire 的實現方式。支持取消或超時的操做主要是在 acquire 循環裏的 park() 返回時檢查中斷或超時,由超時或中斷而被取消等待的線程會設置其節點狀態,而後 unpark 其後繼節點。因爲「取消」操做,該線程不再會被阻塞,節點的連接和狀態字段能夠被快速重建。

儘管同步隊列是基於 FIFO 的,但它們並不必定是公平的,能夠注意到基礎的 acquire 算法中,tryAcquire 是在入隊前被執行的,所以一個新的 acquire 線程可以」竊取「本該屬於隊列頭部的第一個線程獲取經過同步器的機會,可闖入FIFO 策略一般會提供比其餘技術更高的吞吐率。固然,須要嚴格的公平性需求時,能夠改寫 tryAcquire 方法定義,能夠經過框架提供的 getFirstQueuedThread() 方法檢查是不是頭節點,若是是則獲取同步狀態,若是不是則返回失敗。

等待隊列 AQS 框架提供了一個 ConditionObject 類,給維護獨佔同步的類及實現 Lock 接口的類使用。一個鎖對象能夠關聯任意數目的條件對象,能夠提升相似管程風格的 await、signal 和 signalAll 操做,包括帶有超時的以及一些檢測、監控的方法。

等待隊列是一個 FIFO 隊列,在隊列中的每一個節點都包含一個線程引用,該線程就是在 Condition 對象上等待的線程,若是一個線程調用了 Condition.await() 方法,那麼線程將會釋放鎖,構形成節點將加入等待隊列進入等待狀態。事實上,節點的定義複用了同步器中節點的定義,也就是說同步隊列和等待隊列中節點類型都是同步器的靜態內部類 java.util.concurrent.locks.AbstractQueuedSynchronizer.Node。

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
複製代碼

如圖所示,Condition 擁有首尾節點的引用,而新增節點只要將原有的尾節點 nextWaiter 指向它,而且更新尾節點便可。上述節點的更新並不須要使用 CAS 保證,緣由在於調用 await() 方法的線程一定獲取了鎖的線程,也就是該過程是由鎖來保證線程安全的。調用 sigal() 方法將會喚醒在等待隊列中等待時間最長的節點(首節點),主要將 CONDITION 狀態節點從等待隊列中移到同步隊列中。 AQS 的圖結構以下:

image

用法

AQS 是一個抽象類,提供了一些重要的方法,以下: 同步狀態管理

方法 說明
getState() 返回同步狀態的當前值
setState(int newState) 設置同步狀態的值
compareAndSetState(int expect, int update) 若是當前狀態值等於指望值,則自動將同步狀態設置爲給定的更新值

同步方法重寫

方法 說明
tryAcquire(int arg) 試圖以獨佔模式獲取
tryRelease(int arg) 試圖以獨佔模式釋放
tryAcquireShared(int arg) 試圖在共享模式下獲取
tryReleaseShared(int arg) 試圖在共享模式下釋放
isHeldExclusively() 檢查當前線程是否在獨佔模式下佔用同步狀態

同步模板方法

方法 說明
acquire(int arg) 以獨佔模式獲取,忽略中斷
acquireInterruptibly(int arg) 以獨佔模式獲取,響應中斷
tryAcquireNanos(int arg, long nanosTimeout) 嘗試以獨佔模式獲取,響應中斷,若是超時超時將失敗
acquireShared(int arg) 以共享模式獲取,忽略中斷
acquireSharedInterruptibly(int arg) 以共享模式獲取,響應中斷
tryAcquireSharedNanos(int arg, long nanosTimeout) 嘗試以共享模式獲取,響應中斷,若是超時超時將失敗
release(int arg) 以獨佔模式釋放
releaseShared(int arg) 以共享模式釋放

同步器

同步器根據同步狀態分爲獨佔模式和共享模式,獨佔模式包括類:ReentrantLock、ReentrantReadWriteLock.WriteLock,共享模式包括:Semaphore、CountDownLatch、ReentrantReadWriteLock.ReadLock,這裏面着重介紹 ReentrantLock 的具體實現,其餘有興趣的同窗能夠本身研究下。

ReentrantLock

概述

一個可重入互斥 Lock 具備與使用 synchronized 方法和語句訪問的隱式監視鎖相同的基本行爲和語義,但具備擴展功能。ReentrantLock 支持可重入性,並提供了公平鎖和非公平鎖兩種實現方式(默認非公平鎖)。由於是獨佔模式,重寫了 tryAcquire、tryRelease、isHeldExclusively 三個方法,支持響應中斷、超時機制,下面詳細解讀下 JDK 1.8 中的 ReentrantLock 的源代碼實現。

構造方法

提供兩種構造方法:(1)無參構造方法,構造非公平鎖(2)有參構造方法,根據參數值來構造公平、非公平鎖。

public ReentrantLock() {
    sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼

從上面代碼看出,公平鎖和非公平鎖基於類 FairSync、NonfairSync 實現的,而 FairSync、NonfairSync 繼承了 Sync,Sync 又繼承了 AbstractQueuedSynchronizer,能夠看出,Sync 定義了一個抽象方法 lock(),FairSync、NonfairSync 均實現了這個抽象方法,類圖關係以下:

image

lock()

從上面經過構造方法建立了一個可重入鎖對象,接下來咱們開始看下 ReentrantLock.lock() 方法的源代碼實現:

/**
 * 得到鎖的方法,若是鎖沒有被另外一個線程佔用而且當即返回,則將鎖定計數設置爲1。
 * 若是當前線程已經保持鎖定,則保持計數增長1,該方法當即返回。
 * 若是鎖被另外一個線程保持,則當前線程將被禁用以進行線程調度,而且在鎖定已被獲取以前處於休眠狀態,此時鎖定保持計數被設置爲1。
 **/
public void lock() {
    sync.lock();
}
複製代碼

默認實現非公平鎖,那咱們看下 NonfairSync.lock() 方法實現:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
複製代碼

調用 compareAndSetState(int,int) 方法,若是同步狀態指望值是 0,則原子的設置同步狀態值爲 1,表示當前線程獲取同步狀態成功,並設置當前鎖的佔用線程爲當前線程。這就體現了非公平鎖的性質,無論同步隊列是否有線程在排隊,直接調用同步狀態,這樣的設計加強了鎖的吞吐率,然而對後面的線程而言不能體現出公平的按照 FIFO 原則獲取鎖。

若是調用 compareAndSetState(int,int) 失敗,表示當前同步狀態已經有線程佔用了,多是其餘線程也多是當前線程重入佔用了,接着調用 AbstractQueuedSynchronizer.acquire(int) 方法。

/**
 * 以獨佔模式獲取(忽略中斷),經過調用至少一次 tryAcquire(int) 實現,成功返回。
 * 不然線程排隊,可能會重複阻塞和解除阻塞,調用 tryAcquire(int) 直到成功
 **/
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
複製代碼

tryAcquire(int) 方法會調用 Sync.nonfairTryAcquire(int) 方法,代碼以下:

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
複製代碼

從上面代碼看出,若是當前同步狀態爲 0,表示未被線程佔用,則調用 compareAndSetState(int,int) 原子設置同步狀態爲 1,若是設置成功,並設置同步狀態佔用線程爲當前線程,並返回爲 true。

若是同步狀態的佔用線程爲當前線程,則爲重入性,將當前 state 加 1,不然返回爲 false。緊接着會調用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法,addWaiter(Node.EXCLUSIVE) 方法的源代碼以下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
複製代碼

上述方法在獲取同步狀態失敗的基礎上,建立獨佔模式的節點,若是尾節點不爲空,則將新節點的 prev 指向當前最後一個節點,當前的最後一個節點的 next 指向當前節點,而後 tail 指向當前節點。若是調用失敗,則調用 enq() 方法設置尾節點。

在隊列中加入節點成功後,接下來會調用 acquireQueued(Node,int) 方法,自旋的獲取同步狀態,固然前提是當前節點的 prev 節點是 head 節點才能獲取同步狀態,調用 tryAcquire(int) 方法獲取同步狀態,若是成功獲取到,並將當前節點設置爲 head 節點,並將當前節點 prev 設置爲 null(由於按照 FIFO 的同步隊列原則,由於是非公平鎖的模式,因此即便是第一個節點也有可能被新來的線程搶佔到)。

若是獲取同步狀態失敗,則調用 shouldParkAfterFailedAcquire(node,node) ,則根據條件判斷是否應該阻塞,防止 CPU 處於忙碌狀態,會浪費 CPU 資源,若是 prev 節點狀態是 SIGNAL 則阻塞,若是是 CANCELLED 狀態則向前遍歷,移除前面全部爲該狀態的節點 ,此方法裏面阻塞當前節點用的方法是 LockSupport.park()。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } 複製代碼

綜上所述 acquireQueued(Node,int) 方法,若是當前線程被中斷,則返回爲 true,而後調用 selfInterrupt() 方法中斷當前線程。

若是是實現公平鎖,與非公平鎖的不一樣的是 tryAcquire(int) 方法中,若是同步狀態未被佔用,在調用 compareAndSetState(int,int) 方法以前,會調用 AQS 中的 hasQueuedPredecessors() 方法,源代碼以下:

if (c == 0) {
    if (!hasQueuedPredecessors() &&
        compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(current);
        return true;
    }
}
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
複製代碼

此方法被設計用做是公平鎖來用,查詢任何線程是否等待獲取比當前線程更長的時間,若是當前線程以前有一個排隊的線程,則返回爲 true;若是當前線程在隊列的頭部或隊列爲空,則返回爲 false。一樣下面 lockInterruptibly()、tryLock(long,long) 方法中公平鎖的實現模式也是 tryAcquire(int) 方法與非公平鎖不一樣,關鍵之處也是調用了 hasQueuedPredecessors() 方法。

lockInterruptibly()

從方法名字來看,與 lock() 方法不一樣的是,此方法獲取同步鎖狀態並響應中斷,並拋出 InterruptedException 異常,下面咱們看下 lockInterruptibly() 方法與 lock() 方法的差異,源代碼以下:

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
複製代碼

首先判斷當前線程是否已經中斷,若是已中斷,拋出InterruptedException異常;緊接着和 lock() 方法同樣,調用 tryAcquire(int) 方法,非阻塞獲取同步狀態,若是獲取到,則返回,反之獲取不到調用 doAcquireInterruptibly(int) 方法,與 lock(int) 方法不一樣的是,當前方法若是線程中斷則直接拋出 InterruptedException 異常,源碼以下:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
複製代碼

tryLock(long,long)

獲取同步狀態,可響應超時、中斷,若是在超時以前獲取到同步狀態,則返回爲 true,若是在超時後還未獲取同步狀態則返回爲 false,或者被其餘線程中斷了,則拋出 InterruptedException 異常。

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
複製代碼

與 lockInterruptibly() 方法大同小異,不一樣的是調用 doAcquireNanos(int,long) 方法,源代碼以下:

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 計算超時截止時間(單位:納秒)
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 計算到截止時間的剩餘時間(單位:納秒)
                nanosTimeout = deadline - System.nanoTime();
                // 超時了,返回 false
                if (nanosTimeout <= 0L)
                    return false;
                // 若是須要阻塞,而且時間大於 1000 納秒纔會阻塞(小於 1000 納秒認爲超時)必定的時間,若是線程中斷,則拋出 InterruptedException 異常
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

unlock()

嘗試釋放該鎖,若是當前線程是該鎖的持有者,則保持計數遞減。 若是保持計數如今爲零,則鎖定被釋放。 若是當前線程不是該鎖的持有者,則拋出IllegalMonitorStateException,源代碼以下:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼

tryRelease(int) 屬於 AQS 中的抽象方法,源代碼以下:

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
複製代碼

先將當前同步狀態減 1 並記錄爲 c,並判斷當前持有鎖的線程是否是當前線程,若是不是,則拋出 IllegalMonitorStateException 異常。若是 c 等於 0,並設置當前線程爲 null,並設置同步狀態爲 c,僅當 c 等於 0 才返回 true。

再看 release() 方法,成功釋放鎖後,喚醒同步隊列中的下一個節點,會調用 LockSupport.unpark() 方法釋放下一個節點。

綜上所述,介紹了 ReentrantLock 了主要方法實現,在 JDK 1.5 及以前,ReentrantLock 提供了比 synchronized 更多的功能,而且在性能上也高。但在 JDK 1.6 及以後,synchronized 的性能和 ReentrantLock 大概持平,JDK 1.6 中加入了不少針對鎖的優化,感興趣的同窗能夠了解下鎖的優化,包括偏向鎖、輕量級鎖等。

總結

j.u.c 大多數同步器都是基於 AQS 同步框架實現的,大量運用樂觀鎖 CAS、排隊機制、阻塞與喚醒工具實現多線程的同步,使得開發人員在多線程編程中更加簡單和高效。理解同步器框架的實現與設計,有助於開發人員理解和分析其餘同步器。

參考





閱讀博客還不過癮?

歡迎你們掃二維碼經過添加羣助手,加入交流羣,討論和博客有關的技術問題,還能夠和博主有更多互動

博客轉載、線下活動及合做等問題請郵件至 shadowfly_zyl@hotmail.com 進行溝通
相關文章
相關標籤/搜索