📦 本文以及示例源碼已歸檔在 javacorehtml
確保線程安全最多見的作法是利用鎖機制(Lock
、sychronized
)來對共享數據作互斥同步,這樣在同一個時刻,只有一個線程能夠執行某個方法或者某個代碼塊,那麼操做必然是原子性的,線程安全的。java
在工做、面試中,常常會聽到各類五花八門的鎖,聽的人云裏霧裏。鎖的概念術語不少,它們是針對不一樣的問題所提出的,經過簡單的梳理,也不難理解。git
可重入鎖又名遞歸鎖,是指 同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖。github
可重入鎖能夠在必定程度上避免死鎖。面試
ReentrantLock
、ReentrantReadWriteLock
是可重入鎖。這點,從其命名也不難看出。synchronized
也是一個可重入鎖。synchronized void setA() throws Exception{ Thread.sleep(1000); setB(); } synchronized void setB() throws Exception{ Thread.sleep(1000); }
上面的代碼就是一個典型場景:若是使用的鎖不是可重入鎖的話,setB
可能不會被當前線程執行,從而形成死鎖。編程
公平鎖爲了保證線程申請順序,勢必要付出必定的性能代價,所以其吞吐量通常低於非公平鎖。api
公平鎖與非公平鎖 在 Java 中的典型實現:數組
synchronized
只支持非公平鎖。ReentrantLock
、ReentrantReadWriteLock
,默認是非公平鎖,但支持公平鎖。獨享鎖與共享鎖是一種廣義上的說法,從實際用途上來看,也常被稱爲互斥鎖與讀寫鎖。緩存
獨享鎖與共享鎖在 Java 中的典型實現:安全
synchronized
、ReentrantLock
只支持獨享鎖。ReentrantReadWriteLock
其寫鎖是獨享鎖,其讀鎖是共享鎖。讀鎖是共享鎖使得併發讀是很是高效的,讀寫,寫讀 ,寫寫的過程是互斥的。樂觀鎖與悲觀鎖不是指具體的什麼類型的鎖,而是處理併發同步的策略。
悲觀鎖與樂觀鎖在 Java 中的典型實現:
悲觀鎖在 Java 中的應用就是經過使用 synchronized
和 Lock
顯示加鎖來進行互斥同步,這是一種阻塞同步。
樂觀鎖在 Java 中的應用就是採用 CAS 機制(CAS 操做經過 Unsafe
類提供,但這個類不直接暴露爲 API,因此都是間接使用,如各類原子類)。
所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細。顯然,控制粒度越細,阻塞開銷越小,併發性也就越高。
Java 1.6 之前,重量級鎖通常指的是 synchronized
,而輕量級鎖指的是 volatile
。
Java 1.6 之後,針對 synchronized
作了大量優化,引入 4 種鎖狀態: 無鎖狀態、偏向鎖、輕量級鎖和重量級鎖。鎖能夠單向的從偏向鎖升級到輕量級鎖,再從輕量級鎖升級到重量級鎖 。
輕量級鎖 - 是指當鎖是偏向鎖的時候,被另外一個線程所訪問,偏向鎖就會升級爲輕量級鎖,其餘線程會經過自旋的形式嘗試獲取鎖,不會阻塞,提升性能。
重量級鎖 - 是指當鎖爲輕量級鎖的時候,另外一個線程雖然是自旋,但自旋不會一直持續下去,當自旋必定次數的時候,尚未獲取到鎖,就會進入阻塞,該鎖膨脹爲重量級鎖。重量級鎖會讓其餘申請的線程進入阻塞,性能下降。
分段鎖實際上是一種鎖的設計,並非具體的一種鎖。所謂分段鎖,就是把鎖的對象分紅多段,每段獨立控制,使得鎖粒度更細,減小阻塞開銷,從而提升併發性。這其實很好理解,就像高速公路上的收費站,若是隻有一個收費口,那全部的車只能排成一條隊繳費;若是有多個收費口,就能夠分流了。
Hashtable
使用 synchronized
修飾方法來保證線程安全性,那麼面對線程的訪問,Hashtable 就會鎖住整個對象,全部的其它線程只能等待,這種阻塞方式的吞吐量顯然很低。
Java 1.7 之前的 ConcurrentHashMap
就是分段鎖的典型案例。ConcurrentHashMap
維護了一個 Segment
數組,通常稱爲分段桶。
final Segment<K,V>[] segments;
當有線程訪問 ConcurrentHashMap
的數據時,ConcurrentHashMap
會先根據 hashCode 計算出數據在哪一個桶(即哪一個 Segment),而後鎖住這個 Segment
。
Java 1.5 以前,協調對共享對象的訪問時可使用的機制只有 synchronized
和 volatile
。這兩個都屬於內置鎖,即鎖的申請和釋放都是由 JVM 所控制。
Java 1.5 以後,增長了新的機制:ReentrantLock
、ReentrantReadWriteLock
,這類鎖的申請和釋放均可以由程序所控制,因此常被稱爲顯示鎖。
💡
synchronized
的用法和原理能夠參考:Java 併發基礎機制 - synchronized 。🔔 注意:若是不須要
ReentrantLock
、ReentrantReadWriteLock
所提供的高級同步特性,應該優先考慮使用synchronized
。理由以下:
- Java 1.6 之後,
synchronized
作了大量的優化,其性能已經與ReentrantLock
、ReentrantReadWriteLock
基本上持平。- 從趨勢來看,Java 將來更可能會優化
synchronized
,而不是ReentrantLock
、ReentrantReadWriteLock
,由於synchronized
是 JVM 內置屬性,它能執行一些優化。ReentrantLock
、ReentrantReadWriteLock
申請和釋放鎖都是由程序控制,若是使用不當,可能形成死鎖,這是很危險的。
如下對比一下顯示鎖和內置鎖的差別:
synchronized
不能主動獲取鎖和釋放鎖。獲取鎖和釋放鎖都是 JVM 控制的。ReentrantLock
能夠主動獲取鎖和釋放鎖。(若是忘記釋放鎖,就可能產生死鎖)。synchronized
不能響應中斷。ReentrantLock
能夠響應中斷。synchronized
沒有超時機制。ReentrantLock
有超時機制。ReentrantLock
能夠設置超時時間,超時後自動釋放鎖,避免一直等待。synchronized
只支持非公平鎖。ReentrantLock
支持非公平鎖和公平鎖。synchronized
修飾的方法或代碼塊,只能被一個線程訪問(獨享)。若是這個線程被阻塞,其餘線程也只能等待ReentrantLock
能夠基於 Condition
靈活的控制同步條件。synchronized
不支持讀寫鎖分離;ReentrantReadWriteLock
支持讀寫鎖,從而使阻塞讀寫的操做分開,有效提升併發性。
AbstractQueuedSynchronizer
(簡稱 AQS)是隊列同步器,顧名思義,其主要做用是處理同步。它是併發鎖和不少同步工具類的實現基石(如ReentrantLock
、ReentrantReadWriteLock
、Semaphore
等)。所以,要想深刻理解
ReentrantLock
、ReentrantReadWriteLock
等併發鎖和同步工具,必須先理解 AQS 的要點和原理。
在 java.util.concurrent.locks
包中的相關鎖(經常使用的有 ReentrantLock
、 ReadWriteLock
)都是基於 AQS 來實現。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync
類去繼承 AQS。爲何要這樣呢?由於鎖面向的是使用用戶,而同步器面向的則是線程控制,那麼在鎖的實現中聚合同步器而不是直接繼承 AQS 就能夠很好的隔離兩者所關注的事情。
AQS 提供了對獨享鎖與共享鎖的支持。
獲取、釋放獨享鎖的主要 API 以下:
public final void acquire(int arg) public final void acquireInterruptibly(int arg) public final boolean tryAcquireNanos(int arg, long nanosTimeout) public final boolean release(int arg)
acquire
- 獲取獨佔鎖。acquireInterruptibly
- 獲取可中斷的獨佔鎖。tryAcquireNanos
- 嘗試在指定時間內獲取可中斷的獨佔鎖。在如下三種狀況下回返回:
release
- 釋放獨佔鎖。獲取、釋放共享鎖的主要 API 以下:
public final void acquireShared(int arg) public final void acquireSharedInterruptibly(int arg) public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) public final boolean releaseShared(int arg)
acquireShared
- 獲取共享鎖。acquireSharedInterruptibly
- 獲取可中斷的共享鎖。tryAcquireSharedNanos
- 嘗試在指定時間內獲取可中斷的共享鎖。release
- 釋放共享鎖。閱讀 AQS 的源碼,能夠發現:AQS 繼承自 AbstractOwnableSynchronize
。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 等待隊列的隊頭,懶加載。只能經過 setHead 方法修改。 */ private transient volatile Node head; /** 等待隊列的隊尾,懶加載。只能經過 enq 方法添加新的等待節點。*/ private transient volatile Node tail; /** 同步狀態 */ private volatile int state; }
state
- AQS 使用一個整型的 volatile
變量來 維護同步狀態。
ReentrantLock
中該狀態值表示全部者線程已經重複獲取該鎖的次數,Semaphore
中該狀態值表示剩餘的許可數量。head
和 tail
- AQS 維護了一個 Node
類型(AQS 的內部類)的雙鏈表來完成同步狀態的管理。這個雙鏈表是一個雙向的 FIFO 隊列,經過 head
和 tail
指針進行訪問。當 有線程獲取鎖失敗後,就被添加到隊列末尾。再來看一下 Node
的源碼
static final class Node { /** 該等待同步的節點處於共享模式 */ static final Node SHARED = new Node(); /** 該等待同步的節點處於獨佔模式 */ static final Node EXCLUSIVE = null; /** 線程等待狀態,狀態值有: 0、一、-一、-二、-3 */ volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; /** 前驅節點 */ volatile Node prev; /** 後繼節點 */ volatile Node next; /** 等待鎖的線程 */ volatile Thread thread; /** 和節點是否共享有關 */ Node nextWaiter; }
很顯然,Node 是一個雙鏈表結構。
waitStatus
- Node
使用一個整型的 volatile
變量來 維護 AQS 同步隊列中線程節點的狀態。waitStatus
有五個狀態值:
CANCELLED(1)
- 此狀態表示:該節點的線程可能因爲超時或被中斷而 處於被取消(做廢)狀態,一旦處於這個狀態,表示這個節點應該從等待隊列中移除。SIGNAL(-1)
- 此狀態表示:後繼節點會被掛起,所以在當前節點釋放鎖或被取消以後,必須喚醒(unparking
)其後繼結點。CONDITION(-2)
- 此狀態表示:該節點的線程 處於等待條件狀態,不會被看成是同步隊列上的節點,直到被喚醒(signal
),設置其值爲 0,再從新進入阻塞狀態。PROPAGATE(-3)
- 此狀態表示:下一個 acquireShared
應無條件傳播。AQS 中使用 acquire(int arg)
方法獲取獨佔鎖,其大體流程以下:
詳細流程能夠用下圖來表示,請結合源碼來理解(一圖勝千言):
AQS 中使用 release(int arg)
方法釋放獨佔鎖,其大體流程以下:
AQS 中使用 acquireInterruptibly(int arg)
方法獲取可中斷的獨佔鎖。
acquireInterruptibly(int arg)
實現方式相較於獲取獨佔鎖方法( acquire
)很是類似,區別僅在於它會經過 Thread.interrupted
檢測當前線程是否被中斷,若是是,則當即拋出中斷異常(InterruptedException
)。
AQS 中使用 tryAcquireNanos(int arg)
方法獲取超時等待的獨佔鎖。
doAcquireNanos 的實現方式 相較於獲取獨佔鎖方法( acquire
)很是類似,區別在於它會根據超時時間和當前時間計算出截止時間。在獲取鎖的流程中,會不斷判斷是否超時,若是超時,直接返回 false;若是沒超時,則用 LockSupport.parkNanos
來阻塞當前線程。
AQS 中使用 acquireShared(int arg)
方法獲取共享鎖。
acquireShared
方法和 acquire
方法的邏輯很類似,區別僅在於自旋的條件以及節點出隊的操做有所不一樣。
成功得到共享鎖的條件以下:
tryAcquireShared(arg)
返回值大於等於 0 (這意味着共享鎖的 permit 尚未用完)。AQS 中使用 releaseShared(int arg)
方法釋放共享鎖。
releaseShared
首先會嘗試釋放同步狀態,若是成功,則解鎖一個或多個後繼線程節點。釋放共享鎖和釋放獨享鎖流程大致類似,區別在於:
對於獨享模式,若是須要 SIGNAL,釋放僅至關於調用頭節點的 unparkSuccessor
。
AQS 中使用 acquireSharedInterruptibly(int arg)
方法獲取可中斷的共享鎖。
acquireSharedInterruptibly
方法與 acquireInterruptibly
幾乎一致,再也不贅述。
AQS 中使用 tryAcquireSharedNanos(int arg)
方法獲取超時等待式的共享鎖。
tryAcquireSharedNanos
方法與 tryAcquireNanos
幾乎一致,再也不贅述。
ReentrantLock
類是Lock
接口的具體實現,它是一個可重入鎖。與內置鎖synchronized
不一樣,ReentrantLock
提供了一組無條件的、可輪詢的、定時的以及可中斷的鎖操做,全部獲取鎖、釋放鎖的操做都是顯式的操做。
ReentrantLock
的特性以下:
ReentrantLock
提供了與 synchronized
相同的互斥性、內存可見性和可重入性。ReentrantLock
支持公平鎖和非公平鎖(默認)兩種模式。ReentrantLock
實現了 Lock
接口,支持了 synchronized
所不具有的靈活性。
synchronized
沒法中斷一個正在等待獲取鎖的線程synchronized
沒法在請求獲取一個鎖時無休止地等待Lock
的接口定義以下:
public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void unlock(); Condition newCondition(); }
lock()
- 獲取鎖。unlock()
- 釋放鎖。tryLock()
- 嘗試獲取鎖,僅在調用時鎖未被另外一個線程持有的狀況下,才獲取該鎖。tryLock(long time, TimeUnit unit)
- 和 tryLock()
相似,區別僅在於限定時間,若是限定時間內未獲取到鎖,視爲失敗。lockInterruptibly()
- 鎖未被另外一個線程持有,且線程沒有被中斷的狀況下,才能獲取鎖。newCondition()
- 返回一個綁定到 Lock
對象上的 Condition
實例。前文了解了 ReentrantLock
的特性,接下來,咱們要講述其具體用法。
ReentrantLock
有兩個構造方法:
public ReentrantLock() {} public ReentrantLock(boolean fair) {}
ReentrantLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync);ReentrantLock(boolean)
- new ReentrantLock(true)
會初始化一個公平鎖(FairSync)。lock()
- 無條件獲取鎖。若是當前線程沒法獲取鎖,則當前線程進入休眠狀態不可用,直至當前線程獲取到鎖。若是該鎖沒有被另外一個線程持有,則獲取該鎖並當即返回,將鎖的持有計數設置爲 1。unlock()
- 用於釋放鎖。🔔 注意:請務必牢記,獲取鎖操做
lock()
必須在try catch
塊中進行,而且將釋放鎖操做unlock()
放在finally
塊中進行,以保證鎖必定被被釋放,防止死鎖的發生。
示例:ReentrantLock
的基本操做
public class ReentrantLockDemo { public static void main(String[] args) { Task task = new Task(); MyThread tA = new MyThread("Thread-A", task); MyThread tB = new MyThread("Thread-B", task); MyThread tC = new MyThread("Thread-C", task); tA.start(); tB.start(); tC.start(); } static class MyThread extends Thread { private Task task; public MyThread(String name, Task task) { super(name); this.task = task; } @Override public void run() { task.execute(); } } static class Task { private ReentrantLock lock = new ReentrantLock(); public void execute() { lock.lock(); try { for (int i = 0; i < 3; i++) { System.out.println(lock.toString()); // 查詢當前線程 hold 住此鎖的次數 System.out.println("\t holdCount: " + lock.getHoldCount()); // 查詢正等待獲取此鎖的線程數 System.out.println("\t queuedLength: " + lock.getQueueLength()); // 是否爲公平鎖 System.out.println("\t isFair: " + lock.isFair()); // 是否被鎖住 System.out.println("\t isLocked: " + lock.isLocked()); // 是否被當前線程持有鎖 System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread()); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { lock.unlock(); } } } }
輸出結果:
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A] holdCount: 1 queuedLength: 2 isFair: false isLocked: true isHeldByCurrentThread: true java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C] holdCount: 1 queuedLength: 1 isFair: false isLocked: true isHeldByCurrentThread: true // ...
與無條件獲取鎖相比,tryLock 有更完善的容錯機制。
tryLock()
- 可輪詢獲取鎖。若是成功,則返回 true;若是失敗,則返回 false。也就是說,這個方法不管成敗都會當即返回,獲取不到鎖(鎖已被其餘線程獲取)時不會一直等待。tryLock(long, TimeUnit)
- 可定時獲取鎖。和 tryLock()
相似,區別僅在於這個方法在獲取不到鎖時會等待必定的時間,在時間期限以內若是還獲取不到鎖,就返回 false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。示例:ReentrantLock
的 tryLock()
操做
修改上個示例中的 execute()
方法
public void execute() { if (lock.tryLock()) { try { for (int i = 0; i < 3; i++) { // 略... } } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗"); } }
示例:ReentrantLock
的 tryLock(long, TimeUnit)
操做
修改上個示例中的 execute()
方法
public void execute() { try { if (lock.tryLock(2, TimeUnit.SECONDS)) { try { for (int i = 0; i < 3; i++) { // 略... } } finally { lock.unlock(); } } else { System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗"); } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + " 獲取鎖超時"); e.printStackTrace(); } }
lockInterruptibly()
- 可中斷獲取鎖。可中斷獲取鎖能夠在得到鎖的同時保持對中斷的響應。可中斷獲取鎖比其它獲取鎖的方式稍微複雜一些,須要兩個 try-catch
塊(若是在獲取鎖的操做中拋出了 InterruptedException
,那麼可使用標準的 try-finally
加鎖模式)。
lock.lockInterruptibly()
獲取某個鎖時,若線程 A 獲取到了鎖,則線程 B 只能等待。若此時對線程 B 調用 threadB.interrupt()
方法可以中斷線程 B 的等待過程。因爲 lockInterruptibly()
的聲明中拋出了異常,因此 lock.lockInterruptibly()
必須放在 try
塊中或者在調用 lockInterruptibly()
的方法外聲明拋出 InterruptedException
。🔔 注意:當一個線程獲取了鎖以後,是不會被
interrupt()
方法中斷的。單獨調用interrupt()
方法不能中斷正在運行狀態中的線程,只能中斷阻塞狀態中的線程。所以當經過lockInterruptibly()
方法獲取某個鎖時,若是未獲取到鎖,只有在等待的狀態下,才能夠響應中斷。
示例:ReentrantLock
的 lockInterruptibly()
操做
修改上個示例中的 execute()
方法
public void execute() { try { lock.lockInterruptibly(); for (int i = 0; i < 3; i++) { // 略... } } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName() + "被中斷"); e.printStackTrace(); } finally { lock.unlock(); } }
newCondition()
- 返回一個綁定到 Lock
對象上的 Condition
實例。Condition
的特性和具體方法請閱讀下文 Condition
。
閱讀 ReentrantLock
的源碼,能夠發現它有一個核心字段:
private final Sync sync;
sync
- 內部抽象類 ReentrantLock.Sync
對象,Sync
繼承自 AQS。它有兩個子類:ReentrantLock.FairSync
- 公平鎖。ReentrantLock.NonfairSync
- 非公平鎖。查看源碼能夠發現,ReentrantLock
實現 Lock
接口實際上是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現,這裏不一一列舉。
ReentrantLock 獲取鎖和釋放鎖的接口,從表象看,是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現;從本質上看,是基於 AQS 的實現。
仔細閱讀源碼很容易發現:
void lock()
調用 Sync 的 lock() 方法。void lockInterruptibly()
直接調用 AQS 的 獲取可中斷的獨佔鎖 方法 lockInterruptibly()
。
boolean tryLock()
調用 Sync 的 nonfairTryAcquire()
。boolean tryLock(long time, TimeUnit unit)
直接調用 AQS 的 獲取超時等待式的獨佔鎖 方法 tryAcquireNanos(int arg, long nanosTimeout)
。void unlock()
直接調用 AQS 的 釋放獨佔鎖 方法 release(int arg)
。
直接調用 AQS 接口的方法就再也不贅述了,其原理在 AQS 的原理 中已經用很大篇幅進行過講解。
nonfairTryAcquire
方法源碼以下:
// 公平鎖和非公平鎖都會用這個方法區嘗試獲取鎖 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { // 若是同步狀態爲0,將其設爲 acquires,並設置當前線程爲排它線程 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
處理流程很簡單:
lock 方法在公平鎖和非公平鎖中的實現:
兩者的區別僅在於申請非公平鎖時,若是同步狀態爲 0,嘗試將其設爲 1,若是成功,直接將當前線程置爲排它線程;不然和公平鎖同樣,調用 AQS 獲取獨佔鎖方法 acquire
。
// 非公平鎖實現 final void lock() { if (compareAndSetState(0, 1)) // 若是同步狀態爲0,將其設爲1,並設置當前線程爲排它線程 setExclusiveOwnerThread(Thread.currentThread()); else // 調用 AQS 獲取獨佔鎖方法 acquire acquire(1); } // 公平鎖實現 final void lock() { // 調用 AQS 獲取獨佔鎖方法 acquire acquire(1); }
ReentrantReadWriteLock
類是ReadWriteLock
接口的具體實現,它是一個可重入的讀寫鎖。ReentrantReadWriteLock
維護了一對讀寫鎖,將讀寫鎖分開,有利於提升併發效率。
ReentrantLock
實現了一種標準的互斥鎖:每次最多隻有一個線程能持有ReentrantLock
。但對於維護數據的完整性來講,互斥一般是一種過於強硬的加鎖策略,所以也就沒必要要地限制了併發性。大多數場景下,讀操做比寫操做頻繁,只要保證每一個線程都能讀取到最新數據,而且在讀數據時不會有其它線程在修改數據,那麼就不會出現線程安全問題。這種策略減小了互斥同步,天然也提高了併發性能,ReentrantReadWriteLock
就是這種策略的具體實現。
ReentrantReadWriteLock 的特性以下:
ReentrantReadWriteLock
適用於讀多寫少的場景。若是是寫多讀少的場景,因爲 ReentrantReadWriteLock
其內部實現比 ReentrantLock
複雜,性能可能反而要差一些。若是存在這樣的問題,須要具體問題具體分析。因爲 ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現了 Lock
接口,因此要替換爲 ReentrantLock
也較爲容易。ReentrantReadWriteLock
實現了 ReadWriteLock
接口,支持了 ReentrantLock
所不具有的讀寫鎖分離。ReentrantReadWriteLock
維護了一對讀寫鎖(ReadLock
、WriteLock
)。將讀寫鎖分開,有利於提升併發效率。ReentrantReadWriteLock
的加鎖策略是:容許多個讀操做併發執行,但每次只容許一個寫操做。ReentrantReadWriteLock
爲讀寫鎖都提供了可重入的加鎖語義。ReentrantReadWriteLock
支持公平鎖和非公平鎖(默認)兩種模式。ReadWriteLock
接口定義以下:
public interface ReadWriteLock { Lock readLock(); Lock writeLock(); }
readLock
- 返回用於讀操做的鎖(ReadLock
)。writeLock
- 返回用於寫操做的鎖(WriteLock
)。在讀寫鎖和寫入鎖之間的交互能夠採用多種實現方式,ReadWriteLock
的一些可選實現包括:
前文了解了 ReentrantReadWriteLock
的特性,接下來,咱們要講述其具體用法。
ReentrantReadWriteLock
和 ReentrantLock
同樣,也有兩個構造方法,且用法類似。
public ReentrantReadWriteLock() {} public ReentrantReadWriteLock(boolean fair) {}
ReentrantReadWriteLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中,線程得到鎖的順序是不肯定的。寫線程降級爲讀線程是能夠的,但讀線程升級爲寫線程是不能夠的(這樣會致使死鎖)。ReentrantReadWriteLock(boolean)
- new ReentrantLock(true)
會初始化一個公平鎖(FairSync)。對於公平鎖,等待時間最長的線程將優先得到鎖。若是這個鎖是讀線程持有,則另外一個線程請求寫鎖,那麼其餘讀線程都不能得到讀鎖,直到寫線程釋放寫鎖。在 ReentrantReadWriteLock
的特性 中已經介紹過,ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現了 Lock
接口,因此其各自獨立的使用方式與 ReentrantLock
同樣,這裏再也不贅述。
ReentrantReadWriteLock
與 ReentrantLock
用法上的差別,主要在於讀寫鎖的配合使用。本文以一個典型使用場景來進行講解。
示例:基於 ReentrantReadWriteLock
實現一個簡單的本地緩存
/** * 簡單的無界緩存實現 * <p> * 使用 WeakHashMap 存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。 */ static class UnboundedCache<K, V> { private final Map<K, V> cacheMap = new WeakHashMap<>(); private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); public V get(K key) { cacheLock.readLock().lock(); V value; try { value = cacheMap.get(key); String log = String.format("%s 讀數據 %s:%s", Thread.currentThread().getName(), key, value); System.out.println(log); } finally { cacheLock.readLock().unlock(); } return value; } public V put(K key, V value) { cacheLock.writeLock().lock(); try { cacheMap.put(key, value); String log = String.format("%s 寫入數據 %s:%s", Thread.currentThread().getName(), key, value); System.out.println(log); } finally { cacheLock.writeLock().unlock(); } return value; } public V remove(K key) { cacheLock.writeLock().lock(); try { return cacheMap.remove(key); } finally { cacheLock.writeLock().unlock(); } } public void clear() { cacheLock.writeLock().lock(); try { this.cacheMap.clear(); } finally { cacheLock.writeLock().unlock(); } } }
說明:
WeakHashMap
而不是 HashMap
來存儲鍵值對。WeakHashMap
中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。Map
寫數據前加寫鎖,寫完後,釋放寫鎖。Map
讀數據前加讀鎖,讀完後,釋放讀鎖。測試其線程安全性:
/** * @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @since 2020-01-01 */ public class ReentrantReadWriteLockDemo { static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>(); public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 20; i++) { executorService.execute(new MyThread()); cache.get(0); } executorService.shutdown(); } /** 線程任務每次向緩存中寫入 3 個隨機值,key 固定 */ static class MyThread implements Runnable { @Override public void run() { Random random = new Random(); for (int i = 0; i < 3; i++) { cache.put(i, random.nextInt(100)); } } } }
說明:示例中,經過線程池啓動 20 個併發任務。任務每次向緩存中寫入 3 個隨機值,key 固定;而後主線程每次固定讀取緩存中第一個 key 的值。
輸出結果:
main 讀數據 0:null pool-1-thread-1 寫入數據 0:16 pool-1-thread-1 寫入數據 1:58 pool-1-thread-1 寫入數據 2:50 main 讀數據 0:16 pool-1-thread-1 寫入數據 0:85 pool-1-thread-1 寫入數據 1:76 pool-1-thread-1 寫入數據 2:46 pool-1-thread-2 寫入數據 0:21 pool-1-thread-2 寫入數據 1:41 pool-1-thread-2 寫入數據 2:63 main 讀數據 0:21 main 讀數據 0:21 // ...
前面瞭解了 ReentrantLock
的原理,理解 ReentrantReadWriteLock
就容易多了。
閱讀 ReentrantReadWriteLock 的源碼,能夠發現它有三個核心字段:
/** Inner class providing readlock */ private final ReentrantReadWriteLock.ReadLock readerLock; /** Inner class providing writelock */ private final ReentrantReadWriteLock.WriteLock writerLock; /** Performs all synchronization mechanics */ final Sync sync; public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
sync
- 內部類 ReentrantReadWriteLock.Sync
對象。與 ReentrantLock
相似,它有兩個子類:ReentrantReadWriteLock.FairSync
和 ReentrantReadWriteLock.NonfairSync
,分別表示公平鎖和非公平鎖的實現。readerLock
- 內部類 ReentrantReadWriteLock.ReadLock
對象,這是一把讀鎖。writerLock
- 內部類 ReentrantReadWriteLock.WriteLock
對象,這是一把寫鎖。public static class ReadLock implements Lock, java.io.Serializable { // 調用 AQS 獲取共享鎖方法 public void lock() { sync.acquireShared(1); } // 調用 AQS 釋放共享鎖方法 public void unlock() { sync.releaseShared(1); } } public static class WriteLock implements Lock, java.io.Serializable { // 調用 AQS 獲取獨佔鎖方法 public void lock() { sync.acquire(1); } // 調用 AQS 釋放獨佔鎖方法 public void unlock() { sync.release(1); } }
前文中提過 Lock
接口中 有一個 newCondition()
方法用於返回一個綁定到 Lock
對象上的 Condition
實例。Condition
是什麼?有什麼做用?本節將一一講解。
在單線程中,一段代碼的執行可能依賴於某個狀態,若是不知足狀態條件,代碼就不會被執行(典型的場景,如:if ... else ...)。在併發環境中,當一個線程判斷某個狀態條件時,其狀態多是因爲其餘線程的操做而改變,這時就須要有必定的協調機制來確保在同一時刻,數據只能被一個線程鎖修改,且修改的數據狀態被全部線程所感知。
Java 1.5 以前,主要是利用 Object
類中的 wait
、notify
、notifyAll
配合 synchronized
來進行線程間通訊(若是不瞭解其特性,能夠參考:Java 線程基礎 - wait/notify/notifyAll)。
wait
、notify
、notifyAll
須要配合 synchronized
使用,不適用於 Lock
。而使用 Lock
的線程,彼此間通訊應該使用 Condition
。這能夠理解爲,什麼樣的鎖配什麼樣的鑰匙。內置鎖(synchronized
)配合內置條件隊列(wait
、notify
、notifyAll
),顯式鎖(Lock
)配合顯式條件隊列(Condition
)。
Condition
接口定義以下:
public interface Condition { void await() throws InterruptedException; void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); void signalAll(); }
其中,await
、signal
、signalAll
與 wait
、notify
、notifyAll
相對應,功能也類似。除此之外,Condition
相比內置條件隊列( wait
、notify
、notifyAll
),提供了更爲豐富的功能:
Lock
)上能夠存在多個 Condition
,這意味着鎖的狀態條件能夠有多個。awaitUninterruptibly()
。awaitNanos(long)
、await(long, TimeUnit)
、awaitUntil(Date)
。這裏以 Condition
來實現一個消費者、生產者模式。
🔔 注意:事實上,解決此類問題使用
CountDownLatch
、Semaphore
等工具更爲便捷、安全。想了解詳情,能夠參考 Java 併發工具類 。
產品類
class Message { private final Lock lock = new ReentrantLock(); private final Condition producedMsg = lock.newCondition(); private final Condition consumedMsg = lock.newCondition(); private String message; private boolean state; private boolean end; public void consume() { //lock lock.lock(); try { // no new message wait for new message while (!state) { producedMsg.await(); } System.out.println("consume message : " + message); state = false; // message consumed, notify waiting thread consumedMsg.signal(); } catch (InterruptedException ie) { System.out.println("Thread interrupted - viewMessage"); } finally { lock.unlock(); } } public void produce(String message) { lock.lock(); try { // last message not consumed, wait for it be consumed while (state) { consumedMsg.await(); } System.out.println("produce msg: " + message); this.message = message; state = true; // new message added, notify waiting thread producedMsg.signal(); } catch (InterruptedException ie) { System.out.println("Thread interrupted - publishMessage"); } finally { lock.unlock(); } } public boolean isEnd() { return end; } public void setEnd(boolean end) { this.end = end; } }
消費者
class MessageConsumer implements Runnable { private Message message; public MessageConsumer(Message msg) { message = msg; } @Override public void run() { while (!message.isEnd()) { message.consume(); } } }
生產者
class MessageProducer implements Runnable { private Message message; public MessageProducer(Message msg) { message = msg; } @Override public void run() { produce(); } public void produce() { List<String> msgs = new ArrayList<>(); msgs.add("Begin"); msgs.add("Msg1"); msgs.add("Msg2"); for (String msg : msgs) { message.produce(msg); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } message.produce("End"); message.setEnd(true); } }
測試
public class LockConditionDemo { public static void main(String[] args) { Message msg = new Message(); Thread producer = new Thread(new MessageProducer(msg)); Thread consumer = new Thread(new MessageConsumer(msg)); producer.start(); consumer.start(); } }