📦 本文以及示例源碼已歸檔在 javacorehtml
確保線程安全最多見的作法是利用鎖機制(Lock
、sychronized
)來對共享數據作互斥同步,這樣在同一個時刻,只有一個線程能夠執行某個方法或者某個代碼塊,那麼操做必然是原子性的,線程安全的。java
在工做、面試中,常常會聽到各類五花八門的鎖,聽的人云裏霧裏。鎖的概念術語不少,它們是針對不一樣的問題所提出的,經過簡單的梳理,也不難理解。git
可重入鎖又名遞歸鎖,是指 同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖。github
可重入鎖能夠在必定程度上避免死鎖。面試
ReentrantLock
、ReentrantReadWriteLock
是可重入鎖。這點,從其命名也不難看出。 synchronized
也是一個可重入鎖。 synchronized void setA() throws Exception{
Thread.sleep(1000);
setB();
}
synchronized void setB() throws Exception{
Thread.sleep(1000);
}複製代碼
上面的代碼就是一個典型場景:若是使用的鎖不是可重入鎖的話,setB
可能不會被當前線程執行,從而形成死鎖。編程
公平鎖爲了保證線程申請順序,勢必要付出必定的性能代價,所以其吞吐量通常低於非公平鎖。數組
公平鎖與非公平鎖 在 Java 中的典型實現:緩存
synchronized
只支持非公平鎖。 ReentrantLock
、ReentrantReadWriteLock
,默認是非公平鎖,但支持公平鎖。 獨享鎖與共享鎖是一種廣義上的說法,從實際用途上來看,也常被稱爲互斥鎖與讀寫鎖。安全
獨享鎖與共享鎖在 Java 中的典型實現:markdown
synchronized
、ReentrantLock
只支持獨享鎖。 ReentrantReadWriteLock
其寫鎖是獨享鎖,其讀鎖是共享鎖。讀鎖是共享鎖使得併發讀是很是高效的,讀寫,寫讀 ,寫寫的過程是互斥的。 樂觀鎖與悲觀鎖不是指具體的什麼類型的鎖,而是處理併發同步的策略。
悲觀鎖與樂觀鎖在 Java 中的典型實現:
synchronized
和 Lock
顯示加鎖來進行互斥同步,這是一種阻塞同步。 Unsafe
類提供,但這個類不直接暴露爲 API,因此都是間接使用,如各類原子類)。 所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細。顯然,控制粒度越細,阻塞開銷越小,併發性也就越高。
Java 1.6 之前,重量級鎖通常指的是 synchronized
,而輕量級鎖指的是 volatile
。
Java 1.6 之後,針對 synchronized
作了大量優化,引入 4 種鎖狀態: 無鎖狀態、偏向鎖、輕量級鎖和重量級鎖。鎖能夠單向的從偏向鎖升級到輕量級鎖,再從輕量級鎖升級到重量級鎖 。
分段鎖實際上是一種鎖的設計,並非具體的一種鎖。所謂分段鎖,就是把鎖的對象分紅多段,每段獨立控制,使得鎖粒度更細,減小阻塞開銷,從而提升併發性。這其實很好理解,就像高速公路上的收費站,若是隻有一個收費口,那全部的車只能排成一條隊繳費;若是有多個收費口,就能夠分流了。
Hashtable
使用 synchronized
修飾方法來保證線程安全性,那麼面對線程的訪問,Hashtable 就會鎖住整個對象,全部的其它線程只能等待,這種阻塞方式的吞吐量顯然很低。
Java 1.7 之前的 ConcurrentHashMap
就是分段鎖的典型案例。ConcurrentHashMap
維護了一個 Segment
數組,通常稱爲分段桶。
final Segment<K,V>[] segments;複製代碼
當有線程訪問 ConcurrentHashMap
的數據時,ConcurrentHashMap
會先根據 hashCode 計算出數據在哪一個桶(即哪一個 Segment),而後鎖住這個 Segment
。
Java 1.5 以前,協調對共享對象的訪問時可使用的機制只有 synchronized
和 volatile
。這兩個都屬於內置鎖,即鎖的申請和釋放都是由 JVM 所控制。
Java 1.5 以後,增長了新的機制:ReentrantLock
、ReentrantReadWriteLock
,這類鎖的申請和釋放均可以由程序所控制,因此常被稱爲顯示鎖。
💡
synchronized
的用法和原理能夠參考:Java 併發基礎機制 - synchronized 。:bell: 注意:若是不須要
ReentrantLock
、ReentrantReadWriteLock
所提供的高級同步特性,應該優先考慮使用synchronized
。理由以下:- Java 1.6 之後,
synchronized
作了大量的優化,其性能已經與ReentrantLock
、ReentrantReadWriteLock
基本上持平。- 從趨勢來看,Java 將來更可能會優化
synchronized
,而不是ReentrantLock
、ReentrantReadWriteLock
,由於synchronized
是 JVM 內置屬性,它能執行一些優化。-
ReentrantLock
、ReentrantReadWriteLock
申請和釋放鎖都是由程序控制,若是使用不當,可能形成死鎖,這是很危險的。
如下對比一下顯示鎖和內置鎖的差別:
synchronized
不能主動獲取鎖和釋放鎖。獲取鎖和釋放鎖都是 JVM 控制的。 ReentrantLock
能夠主動獲取鎖和釋放鎖。(若是忘記釋放鎖,就可能產生死鎖)。 synchronized
不能響應中斷。 ReentrantLock
能夠響應中斷。 synchronized
沒有超時機制。 ReentrantLock
有超時機制。ReentrantLock
能夠設置超時時間,超時後自動釋放鎖,避免一直等待。 synchronized
只支持非公平鎖。 ReentrantLock
支持非公平鎖和公平鎖。 synchronized
修飾的方法或代碼塊,只能被一個線程訪問(獨享)。若是這個線程被阻塞,其餘線程也只能等待 ReentrantLock
能夠基於 Condition
靈活的控制同步條件。 synchronized
不支持讀寫鎖分離; ReentrantReadWriteLock
支持讀寫鎖,從而使阻塞讀寫的操做分開,有效提升併發性。
AbstractQueuedSynchronizer
(簡稱 AQS)是隊列同步器,顧名思義,其主要做用是處理同步。它是併發鎖和不少同步工具類的實現基石(如ReentrantLock
、ReentrantReadWriteLock
、Semaphore
等)。所以,要想深刻理解
ReentrantLock
、ReentrantReadWriteLock
等併發鎖和同步工具,必須先理解 AQS 的要點和原理。
在 java.util.concurrent.locks
包中的相關鎖(經常使用的有 ReentrantLock
、 ReadWriteLock
)都是基於 AQS 來實現。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync
類去繼承 AQS。爲何要這樣呢?由於鎖面向的是使用用戶,而同步器面向的則是線程控制,那麼在鎖的實現中聚合同步器而不是直接繼承 AQS 就能夠很好的隔離兩者所關注的事情。
AQS 提供了對獨享鎖與共享鎖的支持。
獲取、釋放獨享鎖的主要 API 以下:
public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)複製代碼
acquire
- 獲取獨佔鎖。 acquireInterruptibly
- 獲取可中斷的獨佔鎖。 tryAcquireNanos
- 嘗試在指定時間內獲取可中斷的獨佔鎖。在如下三種狀況下回返回:
release
- 釋放獨佔鎖。 獲取、釋放共享鎖的主要 API 以下:
public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)複製代碼
acquireShared
- 獲取共享鎖。 acquireSharedInterruptibly
- 獲取可中斷的共享鎖。 tryAcquireSharedNanos
- 嘗試在指定時間內獲取可中斷的共享鎖。 release
- 釋放共享鎖。 閱讀 AQS 的源碼,能夠發現:AQS 繼承自 AbstractOwnableSynchronize
。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
/** 等待隊列的隊頭,懶加載。只能經過 setHead 方法修改。 */
private transient volatile Node head;
/** 等待隊列的隊尾,懶加載。只能經過 enq 方法添加新的等待節點。*/
private transient volatile Node tail;
/** 同步狀態 */
private volatile int state;
}複製代碼
state
- AQS 使用一個整型的 volatile
變量來 維護同步狀態。
ReentrantLock
中該狀態值表示全部者線程已經重複獲取該鎖的次數,Semaphore
中該狀態值表示剩餘的許可數量。 head
和 tail
- AQS 維護了一個 Node
類型(AQS 的內部類)的雙鏈表來完成同步狀態的管理。這個雙鏈表是一個雙向的 FIFO 隊列,經過 head
和 tail
指針進行訪問。當 有線程獲取鎖失敗後,就被添加到隊列末尾。 再來看一下 Node
的源碼
static final class Node {
/** 該等待同步的節點處於共享模式 */
static final Node SHARED = new Node();
/** 該等待同步的節點處於獨佔模式 */
static final Node EXCLUSIVE = null;
/** 線程等待狀態,狀態值有: 0、一、-一、-二、-3 */
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 前驅節點 */
volatile Node prev;
/** 後繼節點 */
volatile Node next;
/** 等待鎖的線程 */
volatile Thread thread;
/** 和節點是否共享有關 */
Node nextWaiter;
}複製代碼
很顯然,Node 是一個雙鏈表結構。
waitStatus
- Node
使用一個整型的 volatile
變量來 維護 AQS 同步隊列中線程節點的狀態。waitStatus
有五個狀態值:
CANCELLED(1)
- 此狀態表示:該節點的線程可能因爲超時或被中斷而 處於被取消(做廢)狀態,一旦處於這個狀態,表示這個節點應該從等待隊列中移除。 SIGNAL(-1)
- 此狀態表示:後繼節點會被掛起,所以在當前節點釋放鎖或被取消以後,必須喚醒(unparking
)其後繼結點。 CONDITION(-2)
- 此狀態表示:該節點的線程 處於等待條件狀態,不會被看成是同步隊列上的節點,直到被喚醒(signal
),設置其值爲 0,再從新進入阻塞狀態。 PROPAGATE(-3)
- 此狀態表示:下一個 acquireShared
應無條件傳播。 AQS 中使用 acquire(int arg)
方法獲取獨佔鎖,其大體流程以下:
詳細流程能夠用下圖來表示,請結合源碼來理解(一圖勝千言):
AQS 中使用 release(int arg)
方法釋放獨佔鎖,其大體流程以下:
AQS 中使用 acquireInterruptibly(int arg)
方法獲取可中斷的獨佔鎖。
acquireInterruptibly(int arg)
實現方式相較於獲取獨佔鎖方法( acquire
)很是類似,區別僅在於它會經過 Thread.interrupted
檢測當前線程是否被中斷,若是是,則當即拋出中斷異常(InterruptedException
)。
AQS 中使用 tryAcquireNanos(int arg)
方法獲取超時等待的獨佔鎖。
doAcquireNanos 的實現方式 相較於獲取獨佔鎖方法( acquire
)很是類似,區別在於它會根據超時時間和當前時間計算出截止時間。在獲取鎖的流程中,會不斷判斷是否超時,若是超時,直接返回 false;若是沒超時,則用 LockSupport.parkNanos
來阻塞當前線程。
AQS 中使用 acquireShared(int arg)
方法獲取共享鎖。
acquireShared
方法和 acquire
方法的邏輯很類似,區別僅在於自旋的條件以及節點出隊的操做有所不一樣。
成功得到共享鎖的條件以下:
tryAcquireShared(arg)
返回值大於等於 0 (這意味着共享鎖的 permit 尚未用完)。 AQS 中使用 releaseShared(int arg)
方法釋放共享鎖。
releaseShared
首先會嘗試釋放同步狀態,若是成功,則解鎖一個或多個後繼線程節點。釋放共享鎖和釋放獨享鎖流程大致類似,區別在於:
對於獨享模式,若是須要 SIGNAL,釋放僅至關於調用頭節點的 unparkSuccessor
。
AQS 中使用 acquireSharedInterruptibly(int arg)
方法獲取可中斷的共享鎖。
acquireSharedInterruptibly
方法與 acquireInterruptibly
幾乎一致,再也不贅述。
AQS 中使用 tryAcquireSharedNanos(int arg)
方法獲取超時等待式的共享鎖。
tryAcquireSharedNanos
方法與 tryAcquireNanos
幾乎一致,再也不贅述。
ReentrantLock
類是Lock
接口的具體實現,它是一個可重入鎖。與內置鎖synchronized
不一樣,ReentrantLock
提供了一組無條件的、可輪詢的、定時的以及可中斷的鎖操做,全部獲取鎖、釋放鎖的操做都是顯式的操做。
ReentrantLock
的特性以下:
ReentrantLock
提供了與 synchronized
相同的互斥性、內存可見性和可重入性。 ReentrantLock
支持公平鎖和非公平鎖(默認)兩種模式。 ReentrantLock
實現了 Lock
接口,支持了 synchronized
所不具有的靈活性。
synchronized
沒法中斷一個正在等待獲取鎖的線程 synchronized
沒法在請求獲取一個鎖時無休止地等待 Lock
的接口定義以下:
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}複製代碼
lock()
- 獲取鎖。 unlock()
- 釋放鎖。 tryLock()
- 嘗試獲取鎖,僅在調用時鎖未被另外一個線程持有的狀況下,才獲取該鎖。 tryLock(long time, TimeUnit unit)
- 和 tryLock()
相似,區別僅在於限定時間,若是限定時間內未獲取到鎖,視爲失敗。 lockInterruptibly()
- 鎖未被另外一個線程持有,且線程沒有被中斷的狀況下,才能獲取鎖。 newCondition()
- 返回一個綁定到 Lock
對象上的 Condition
實例。 前文了解了 ReentrantLock
的特性,接下來,咱們要講述其具體用法。
ReentrantLock
有兩個構造方法:
public ReentrantLock() {}
public ReentrantLock(boolean fair) {}複製代碼
ReentrantLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync); ReentrantLock(boolean)
- new ReentrantLock(true)
會初始化一個公平鎖(FairSync)。 lock()
- 無條件獲取鎖。若是當前線程沒法獲取鎖,則當前線程進入休眠狀態不可用,直至當前線程獲取到鎖。若是該鎖沒有被另外一個線程持有,則獲取該鎖並當即返回,將鎖的持有計數設置爲 1。 unlock()
- 用於釋放鎖。 :bell: 注意:請務必牢記,獲取鎖操做
lock()
必須在try catch
塊中進行,而且將釋放鎖操做unlock()
放在finally
塊中進行,以保證鎖必定被被釋放,防止死鎖的發生。
示例:ReentrantLock
的基本操做
public class ReentrantLockDemo {
public static void main(String[] args) {
Task task = new Task();
MyThread tA = new MyThread("Thread-A", task);
MyThread tB = new MyThread("Thread-B", task);
MyThread tC = new MyThread("Thread-C", task);
tA.start();
tB.start();
tC.start();
}
static class MyThread extends Thread {
private Task task;
public MyThread(String name, Task task) {
super(name);
this.task = task;
}
@Override
public void run() {
task.execute();
}
}
static class Task {
private ReentrantLock lock = new ReentrantLock();
public void execute() {
lock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println(lock.toString());
// 查詢當前線程 hold 住此鎖的次數
System.out.println("\t holdCount: " + lock.getHoldCount());
// 查詢正等待獲取此鎖的線程數
System.out.println("\t queuedLength: " + lock.getQueueLength());
// 是否爲公平鎖
System.out.println("\t isFair: " + lock.isFair());
// 是否被鎖住
System.out.println("\t isLocked: " + lock.isLocked());
// 是否被當前線程持有鎖
System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
lock.unlock();
}
}
}
}複製代碼
輸出結果:
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
holdCount: 1
queuedLength: 2
isFair: false
isLocked: true
isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
holdCount: 1
queuedLength: 1
isFair: false
isLocked: true
isHeldByCurrentThread: true
// ...複製代碼
與無條件獲取鎖相比,tryLock 有更完善的容錯機制。
tryLock()
- 可輪詢獲取鎖。若是成功,則返回 true;若是失敗,則返回 false。也就是說,這個方法不管成敗都會當即返回,獲取不到鎖(鎖已被其餘線程獲取)時不會一直等待。 tryLock(long, TimeUnit)
- 可定時獲取鎖。和 tryLock()
相似,區別僅在於這個方法在獲取不到鎖時會等待必定的時間,在時間期限以內若是還獲取不到鎖,就返回 false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。 示例:ReentrantLock
的 tryLock()
操做
修改上個示例中的 execute()
方法
public void execute() {
if (lock.tryLock()) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
}
}複製代碼
示例:ReentrantLock
的 tryLock(long, TimeUnit)
操做
修改上個示例中的 execute()
方法
public void execute() {
try {
if (lock.tryLock(2, TimeUnit.SECONDS)) {
try {
for (int i = 0; i < 3; i++) {
// 略...
}
} finally {
lock.unlock();
}
} else {
System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
e.printStackTrace();
}
}複製代碼
lockInterruptibly()
- 可中斷獲取鎖。可中斷獲取鎖能夠在得到鎖的同時保持對中斷的響應。可中斷獲取鎖比其它獲取鎖的方式稍微複雜一些,須要兩個 try-catch
塊(若是在獲取鎖的操做中拋出了 InterruptedException
,那麼可使用標準的 try-finally
加鎖模式)。
lock.lockInterruptibly()
獲取某個鎖時,若線程 A 獲取到了鎖,則線程 B 只能等待。若此時對線程 B 調用 threadB.interrupt()
方法可以中斷線程 B 的等待過程。因爲 lockInterruptibly()
的聲明中拋出了異常,因此 lock.lockInterruptibly()
必須放在 try
塊中或者在調用 lockInterruptibly()
的方法外聲明拋出 InterruptedException
。
:bell: 注意:當一個線程獲取了鎖以後,是不會被
interrupt()
方法中斷的。單獨調用interrupt()
方法不能中斷正在運行狀態中的線程,只能中斷阻塞狀態中的線程。所以當經過lockInterruptibly()
方法獲取某個鎖時,若是未獲取到鎖,只有在等待的狀態下,才能夠響應中斷。
示例:ReentrantLock
的 lockInterruptibly()
操做
修改上個示例中的 execute()
方法
public void execute() {
try {
lock.lockInterruptibly();
for (int i = 0; i < 3; i++) {
// 略...
}
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + "被中斷");
e.printStackTrace();
} finally {
lock.unlock();
}
}複製代碼
newCondition()
- 返回一個綁定到 Lock
對象上的 Condition
實例。Condition
的特性和具體方法請閱讀下文 Condition
。
閱讀 ReentrantLock
的源碼,能夠發現它有一個核心字段:
private final Sync sync;複製代碼
sync
- 內部抽象類 ReentrantLock.Sync
對象,Sync
繼承自 AQS。它有兩個子類: ReentrantLock.FairSync
- 公平鎖。 ReentrantLock.NonfairSync
- 非公平鎖。 查看源碼能夠發現,ReentrantLock
實現 Lock
接口實際上是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現,這裏不一一列舉。
ReentrantLock 獲取鎖和釋放鎖的接口,從表象看,是調用 ReentrantLock.FairSync
或 ReentrantLock.NonfairSync
中各自的實現;從本質上看,是基於 AQS 的實現。
仔細閱讀源碼很容易發現:
void lock()
調用 Sync 的 lock() 方法。 void lockInterruptibly()
直接調用 AQS 的 獲取可中斷的獨佔鎖 方法 lockInterruptibly()
。 boolean tryLock()
調用 Sync 的 nonfairTryAcquire()
。 boolean tryLock(long time, TimeUnit unit)
直接調用 AQS 的 獲取超時等待式的獨佔鎖 方法 tryAcquireNanos(int arg, long nanosTimeout)
。 void unlock()
直接調用 AQS 的 釋放獨佔鎖 方法 release(int arg)
。 直接調用 AQS 接口的方法就再也不贅述了,其原理在 AQS 的原理 中已經用很大篇幅進行過講解。
nonfairTryAcquire
方法源碼以下:
// 公平鎖和非公平鎖都會用這個方法區嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
// 若是同步狀態爲0,將其設爲 acquires,並設置當前線程爲排它線程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}複製代碼
處理流程很簡單:
lock 方法在公平鎖和非公平鎖中的實現:
兩者的區別僅在於申請非公平鎖時,若是同步狀態爲 0,嘗試將其設爲 1,若是成功,直接將當前線程置爲排它線程;不然和公平鎖同樣,調用 AQS 獲取獨佔鎖方法 acquire
。
// 非公平鎖實現
final void lock() {
if (compareAndSetState(0, 1))
// 若是同步狀態爲0,將其設爲1,並設置當前線程爲排它線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 調用 AQS 獲取獨佔鎖方法 acquire
acquire(1);
}
// 公平鎖實現
final void lock() {
// 調用 AQS 獲取獨佔鎖方法 acquire
acquire(1);
}複製代碼
ReentrantReadWriteLock
類是ReadWriteLock
接口的具體實現,它是一個可重入的讀寫鎖。ReentrantReadWriteLock
維護了一對讀寫鎖,將讀寫鎖分開,有利於提升併發效率。
ReentrantLock
實現了一種標準的互斥鎖:每次最多隻有一個線程能持有ReentrantLock
。但對於維護數據的完整性來講,互斥一般是一種過於強硬的加鎖策略,所以也就沒必要要地限制了併發性。大多數場景下,讀操做比寫操做頻繁,只要保證每一個線程都能讀取到最新數據,而且在讀數據時不會有其它線程在修改數據,那麼就不會出現線程安全問題。這種策略減小了互斥同步,天然也提高了併發性能,ReentrantReadWriteLock
就是這種策略的具體實現。
ReentrantReadWriteLock 的特性以下:
ReentrantReadWriteLock
適用於讀多寫少的場景。若是是寫多讀少的場景,因爲 ReentrantReadWriteLock
其內部實現比 ReentrantLock
複雜,性能可能反而要差一些。若是存在這樣的問題,須要具體問題具體分析。因爲 ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現了 Lock
接口,因此要替換爲 ReentrantLock
也較爲容易。 ReentrantReadWriteLock
實現了 ReadWriteLock
接口,支持了 ReentrantLock
所不具有的讀寫鎖分離。ReentrantReadWriteLock
維護了一對讀寫鎖(ReadLock
、WriteLock
)。將讀寫鎖分開,有利於提升併發效率。ReentrantReadWriteLock
的加鎖策略是:容許多個讀操做併發執行,但每次只容許一個寫操做。 ReentrantReadWriteLock
爲讀寫鎖都提供了可重入的加鎖語義。 ReentrantReadWriteLock
支持公平鎖和非公平鎖(默認)兩種模式。 ReadWriteLock
接口定義以下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}複製代碼
readLock
- 返回用於讀操做的鎖(ReadLock
)。 writeLock
- 返回用於寫操做的鎖(WriteLock
)。 在讀寫鎖和寫入鎖之間的交互能夠採用多種實現方式,ReadWriteLock
的一些可選實現包括:
前文了解了 ReentrantReadWriteLock
的特性,接下來,咱們要講述其具體用法。
ReentrantReadWriteLock
和 ReentrantLock
同樣,也有兩個構造方法,且用法類似。
public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}複製代碼
ReentrantReadWriteLock()
- 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中,線程得到鎖的順序是不肯定的。寫線程降級爲讀線程是能夠的,但讀線程升級爲寫線程是不能夠的(這樣會致使死鎖)。 ReentrantReadWriteLock(boolean)
- new ReentrantLock(true)
會初始化一個公平鎖(FairSync)。對於公平鎖,等待時間最長的線程將優先得到鎖。若是這個鎖是讀線程持有,則另外一個線程請求寫鎖,那麼其餘讀線程都不能得到讀鎖,直到寫線程釋放寫鎖。 在 ReentrantReadWriteLock` 的特
中已經介紹過,ReentrantReadWriteLock
的讀寫鎖(ReadLock
、WriteLock
)都實現了 Lock
接口,因此其各自獨立的使用方式與 ReentrantLock
同樣,這裏再也不贅述。
ReentrantReadWriteLock
與 ReentrantLock
用法上的差別,主要在於讀寫鎖的配合使用。本文以一個典型使用場景來進行講解。
示例:基於 ReentrantReadWriteLock
實現一個簡單的本地緩存
/**
* 簡單的無界緩存實現
* <p>
* 使用 WeakHashMap 存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。
*/
static class UnboundedCache<K, V> {
private final Map<K, V> cacheMap = new WeakHashMap<>();
private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
public V get(K key) {
cacheLock.readLock().lock();
V value;
try {
value = cacheMap.get(key);
String log = String.format("%s 讀數據 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.readLock().unlock();
}
return value;
}
public V put(K key, V value) {
cacheLock.writeLock().lock();
try {
cacheMap.put(key, value);
String log = String.format("%s 寫入數據 %s:%s", Thread.currentThread().getName(), key, value);
System.out.println(log);
} finally {
cacheLock.writeLock().unlock();
}
return value;
}
public V remove(K key) {
cacheLock.writeLock().lock();
try {
return cacheMap.remove(key);
} finally {
cacheLock.writeLock().unlock();
}
}
public void clear() {
cacheLock.writeLock().lock();
try {
this.cacheMap.clear();
} finally {
cacheLock.writeLock().unlock();
}
}
}複製代碼
說明:
WeakHashMap
而不是 HashMap
來存儲鍵值對。WeakHashMap
中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。 Map
寫數據前加寫鎖,寫完後,釋放寫鎖。 Map
讀數據前加讀鎖,讀完後,釋放讀鎖。 測試其線程安全性:
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2020-01-01
*/
public class ReentrantReadWriteLockDemo {
static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
executorService.execute(new MyThread());
cache.get(0);
}
executorService.shutdown();
}
/** 線程任務每次向緩存中寫入 3 個隨機值,key 固定 */
static class MyThread implements Runnable {
@Override
public void run() {
Random random = new Random();
for (int i = 0; i < 3; i++) {
cache.put(i, random.nextInt(100));
}
}
}
}複製代碼
說明:示例中,經過線程池啓動 20 個併發任務。任務每次向緩存中寫入 3 個隨機值,key 固定;而後主線程每次固定讀取緩存中第一個 key 的值。
輸出結果:
main 讀數據 0:null
pool-1-thread-1 寫入數據 0:16
pool-1-thread-1 寫入數據 1:58
pool-1-thread-1 寫入數據 2:50
main 讀數據 0:16
pool-1-thread-1 寫入數據 0:85
pool-1-thread-1 寫入數據 1:76
pool-1-thread-1 寫入數據 2:46
pool-1-thread-2 寫入數據 0:21
pool-1-thread-2 寫入數據 1:41
pool-1-thread-2 寫入數據 2:63
main 讀數據 0:21
main 讀數據 0:21
// ...複製代碼
前面瞭解了 ReentrantLock
的原理,理解 ReentrantReadWriteLock
就容易多了。
閱讀 ReentrantReadWriteLock 的源碼,能夠發現它有三個核心字段:
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }複製代碼
sync
- 內部類 ReentrantReadWriteLock.Sync
對象。與 ReentrantLock
相似,它有兩個子類:ReentrantReadWriteLock.FairSync
和 ReentrantReadWriteLock.NonfairSync
,分別表示公平鎖和非公平鎖的實現。 readerLock
- 內部類 ReentrantReadWriteLock.ReadLock
對象,這是一把讀鎖。 writerLock
- 內部類 ReentrantReadWriteLock.WriteLock
對象,這是一把寫鎖。 public static class ReadLock implements Lock, java.io.Serializable {
// 調用 AQS 獲取共享鎖方法
public void lock() {
sync.acquireShared(1);
}
// 調用 AQS 釋放共享鎖方法
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
// 調用 AQS 獲取獨佔鎖方法
public void lock() {
sync.acquire(1);
}
// 調用 AQS 釋放獨佔鎖方法
public void unlock() {
sync.release(1);
}
}複製代碼
前文中提過 Lock
接口中 有一個 newCondition()
方法用於返回一個綁定到 Lock
對象上的 Condition
實例。Condition
是什麼?有什麼做用?本節將一一講解。
在單線程中,一段代碼的執行可能依賴於某個狀態,若是不知足狀態條件,代碼就不會被執行(典型的場景,如:if ... else ...)。在併發環境中,當一個線程判斷某個狀態條件時,其狀態多是因爲其餘線程的操做而改變,這時就須要有必定的協調機制來確保在同一時刻,數據只能被一個線程鎖修改,且修改的數據狀態被全部線程所感知。
Java 1.5 以前,主要是利用 Object
類中的 wait
、notify
、notifyAll
配合 synchronized
來進行線程間通訊(若是不瞭解其特性,能夠參考:Java 線程基礎 - wait/notify/notifyAll)。
wait
、notify
、notifyAll
須要配合 synchronized
使用,不適用於 Lock
。而使用 Lock
的線程,彼此間通訊應該使用 Condition
。這能夠理解爲,什麼樣的鎖配什麼樣的鑰匙。內置鎖(synchronized
)配合內置條件隊列(wait
、notify
、notifyAll
),顯式鎖(Lock
)配合顯式條件隊列(Condition
)。
Condition
接口定義以下:
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}複製代碼
其中,await
、signal
、signalAll
與 wait
、notify
、notifyAll
相對應,功能也類似。除此之外,Condition
相比內置條件隊列( wait
、notify
、notifyAll
),提供了更爲豐富的功能:
Lock
)上能夠存在多個 Condition
,這意味着鎖的狀態條件能夠有多個。 awaitUninterruptibly()
。 awaitNanos(long)
、await(long, TimeUnit)
、awaitUntil(Date)
。 這裏以 Condition
來實現一個消費者、生產者模式。
:bell: 注意:事實上,解決此類問題使用
CountDownLatch
、Semaphore
等工具更爲便捷、安全。想了解詳情,能夠參考 Java 併發工具類 。
產品類
class Message {
private final Lock lock = new ReentrantLock();
private final Condition producedMsg = lock.newCondition();
private final Condition consumedMsg = lock.newCondition();
private String message;
private boolean state;
private boolean end;
public void consume() {
//lock
lock.lock();
try {
// no new message wait for new message
while (!state) { producedMsg.await(); }
System.out.println("consume message : " + message);
state = false;
// message consumed, notify waiting thread
consumedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - viewMessage");
} finally {
lock.unlock();
}
}
public void produce(String message) {
lock.lock();
try {
// last message not consumed, wait for it be consumed
while (state) { consumedMsg.await(); }
System.out.println("produce msg: " + message);
this.message = message;
state = true;
// new message added, notify waiting thread
producedMsg.signal();
} catch (InterruptedException ie) {
System.out.println("Thread interrupted - publishMessage");
} finally {
lock.unlock();
}
}
public boolean isEnd() {
return end;
}
public void setEnd(boolean end) {
this.end = end;
}
}複製代碼
消費者
class MessageConsumer implements Runnable {
private Message message;
public MessageConsumer(Message msg) {
message = msg;
}
@Override
public void run() {
while (!message.isEnd()) { message.consume(); }
}
}複製代碼
生產者
class MessageProducer implements Runnable {
private Message message;
public MessageProducer(Message msg) {
message = msg;
}
@Override
public void run() {
produce();
}
public void produce() {
List<String> msgs = new ArrayList<>();
msgs.add("Begin");
msgs.add("Msg1");
msgs.add("Msg2");
for (String msg : msgs) {
message.produce(msg);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
message.produce("End");
message.setEnd(true);
}
}複製代碼
測試
public class LockConditionDemo {
public static void main(String[] args) {
Message msg = new Message();
Thread producer = new Thread(new MessageProducer(msg));
Thread consumer = new Thread(new MessageConsumer(msg));
producer.start();
consumer.start();
}
}複製代碼