Pepper Metrics是我與同事開發的一個開源工具(github.com/zrbcool/pep…),其經過收集jedis/mybatis/httpservlet/dubbo/motan的運行性能統計,並暴露成prometheus等主流時序數據庫兼容數據,經過grafana展現趨勢。其插件化的架構也很是方便使用者擴展並集成其餘開源組件。
請你們給個star,同時歡迎你們成爲開發者提交PR一塊兒完善項目。java
ReentrantLock,重入鎖,是JDK5中添加在併發包下的一個高性能的工具。顧名思義,ReentrantLock支持同一個線程在未釋放鎖的狀況下重複獲取鎖。node
每個東西的出現必定是有價值的。既然已經有了元老級的synchronized,並且synchronized也支持重入,爲何Doug Lea還要專門寫一個ReentrantLock呢?git
首先,ReentrantLock的性能要優於synchronized。下面經過兩段代碼比價一下。 首先是synchronized:github
public class LockDemo2 {
private static final Object lock = new Object(); // 定義鎖對象
private static int count = 0; // 累加數
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
CountDownLatch cdl = new CountDownLatch(100);
// 啓動100個線程對count累加,每一個線程累加1000000次
// 調用add函數累加,經過synchronized保證多線程之間的同步
for (int i=0;i<100;i++) {
new Thread(() -> {
for (int i1 = 0; i1 <1000000; i1++) {
add();
}
cdl.countDown();
}).start();
}
cdl.await();
System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
}
private static void add() {
synchronized (lock) {
count++;
}
}
}
複製代碼
而後是ReentrantLock:數據庫
public class LockDemo3 {
private static Lock lock = new ReentrantLock(); // 重入鎖
private static int count = 0;
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
CountDownLatch cdl = new CountDownLatch(100);
for (int i=0;i<100;i++) {
new Thread(() -> {
for (int i1 = 0; i1 <1000000; i1++) {
add();
}
cdl.countDown();
}).start();
}
cdl.await();
System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count);
}
// 經過ReentrantLock保證線程之間的同步
private static void add() {
lock.lock();
count++;
lock.unlock();
}
}
複製代碼
下面是運行屢次的結果對比:微信
synchronized | ReentrantLock | |
---|---|---|
第一次 | 4620 ms | 3360 ms |
第二次 | 4086 ms | 3138 ms |
第三次 | 4650 ms | 3408 ms |
整體來看,ReentrantLock的平均性能要比synchronized好20%左右。mybatis
PS:感謝 @荒野七叔 的指正。更嚴謹的描述一下這個性能的對比:當存在大量線程競爭鎖時,多數狀況下ReentrantLock的性能優於synchronized。多線程
由於在JDK6中對synchronized作了優化,在鎖競爭不激烈的時候,多數狀況下鎖會停留在偏向鎖和輕量級鎖階段,這兩個階段性能是很好的。當存在大量競爭時,可能會膨脹爲重量級鎖,性能降低,此時的ReentrantLock應該是優於synchronized的。架構
公平性是啥概念呢?若是是公平的獲取鎖,就是說多個線程之間獲取鎖的時候要排隊,依次獲取鎖;若是是不公平的獲取鎖,就是說多個線程獲取鎖的時候一哄而上,誰搶到是誰的。併發
因爲synchronized是基於monitor機制實現的,它只支持非公平鎖;但ReentrantLock同時支持公平鎖和非公平鎖。
除了上文所述,ReentrantLock還有一些其餘synchronized不具有的特性,這裏來總結一下。
synchronized | ReentrantLock | |
---|---|---|
性能 | 相對較差 | 優於synchronized 20%左右 |
公平性 | 只支持非公平鎖 | 同時支持公平鎖與非公平鎖 |
嘗試獲取鎖的支持 | 不支持,一旦到了同步塊,且沒有獲取到鎖,就阻塞在這裏 | 支持,經過tryLock方法實現,可經過其返回值判斷是否成功獲取鎖,因此即便獲取鎖失敗也不會阻塞在這裏 |
超時的獲取鎖 | 不支持,若是一直獲取不到鎖,就會一直等待下去 | 支持,經過tryLock(time, TimeUnit)方法實現,若是超時了還沒獲取鎖,就放棄獲取鎖,不會一直阻塞下去 |
是否可響應中斷 | 不支持,不可響應線程的interrupt信號 | 支持,經過lockInterruptibly方法實現,經過此方法獲取鎖以後,線程可響應interrupt信號,並拋出InterruptedException異常 |
等待條件的支持 | 支持,經過wait、notify、notifyAll來實現 | 支持,經過Conditon接口實現,支持多個Condition,比synchronized更加靈活 |
ReentrantLock的實現基於隊列同步器(AbstractQueuedSynchronizer,後面簡稱AQS),關於AQS的實現原理,能夠看筆者的另外一篇文章: Java隊列同步器(AQS)究竟是怎麼一回事
ReentrantLock的可重入功能基於AQS的同步狀態:state。
其原理大體爲:當某一線程獲取鎖後,將state值+1,並記錄下當前持有鎖的線程,再有線程來獲取鎖時,判斷這個線程與持有鎖的線程是不是同一個線程,若是是,將state值再+1,若是不是,阻塞線程。 當線程釋放鎖時,將state值-1,當state值減爲0時,表示當前線程完全釋放了鎖,而後將記錄當前持有鎖的線程的那個字段設置爲null,並喚醒其餘線程,使其從新競爭鎖。
// acquires的值是1
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程
final Thread current = Thread.currentThread();
// 獲取state的值
int c = getState();
// 若是state的值等於0,表示當前沒有線程持有鎖
// 嘗試將state的值改成1,若是修改爲功,則成功獲取鎖,並設置當前線程爲持有鎖的線程,返回true
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// state的值不等於0,表示已經有其餘線程持有鎖
// 判斷當前線程是否等於持有鎖的線程,若是等於,將state的值+1,並設置到state上,獲取鎖成功,返回true
// 若是不是當前線程,獲取鎖失敗,返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
ReentrantLock有兩個構造函數:
// 無參構造,默認使用非公平鎖(NonfairSync)
public ReentrantLock() {
sync = new NonfairSync();
}
// 經過fair參數指定使用公平鎖(FairSync)仍是非公平鎖(NonfairSync)
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼
sync是ReentrantLock的成員變量,是其內部類Sync的實例。NonfairSync和FairSync都是Sync類的子類。能夠參考以下類關係圖:
Sync繼承了AQS,因此他具有了AQS的功能。一樣的,NonfairSync和FairSync都是AQS的子類。
當咱們經過無參構造函數獲取ReentrantLock實例後,默認用的就是非公平鎖。
下面將經過以下場景描述非公平鎖的實現原理:假設一個線程(t1)獲取到了鎖,其餘不少沒獲取到鎖的線程(others_t)加入到了AQS的同步隊列中等待,當這個線程執行完,釋放鎖後,其餘線程從新非公平的競爭鎖。
先來描述一下獲取鎖的方法:
final void lock() {
// 線程t1成功的將state的值從0改成1,表示獲取鎖成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// others_t線程們沒有獲取到鎖
acquire(1);
}
複製代碼
若是獲取鎖失敗,會調用AQS的acquire方法
public final void acquire(int arg) {
// tryAcquire是個模板方法,在NonfairSync中實現,若是在tryAcquire方法中依然獲取鎖失敗,會將當前線程加入同步隊列中等待(addWaiter)
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
tryAcquire的實現以下,實際上是調用了上面的nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
複製代碼
OK,此時t1獲取到了鎖,others_t線程們都跑到同步隊列裏等着了。
某一時刻,t1本身的任務執行完成,調用了釋放鎖的方法(unlock)。
public void unlock() {
// 調用AQS的release方法釋放資源
sync.release(1);
}
複製代碼
public final boolean release(int arg) {
// tryRelease也是模板方法,在Sync中實現
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 成功釋放鎖後,喚醒同步隊列中的下一個節點,使之能夠從新競爭鎖
// 注意此時不會喚醒隊列第一個節點以後的節點,這些節點此時仍是沒法競爭鎖
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
protected final boolean tryRelease(int releases) {
// 將state的值-1,若是-1以後等於0,釋放鎖成功
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
複製代碼
這時鎖被釋放了,被喚醒的線程和新來的線程從新競爭鎖(不包含同步隊列後面的那些線程)。
回到lock方法中,因爲此時全部線程都能經過CAS來獲取鎖,並不能保證被喚醒的那個線程能競爭過新來的線程,因此是非公平的。這就是非公平鎖的實現。
這個過程大概能夠描述爲下圖這樣子:
公平鎖與非公平鎖的釋放鎖的邏輯是同樣的,都是調用上述的unlock方法,最大區別在於獲取鎖的時候。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 獲取鎖,與非公平鎖的不一樣的地方在於,這裏直接調用的AQS的acquire方法,沒有先嚐試獲取鎖
// acquire又調用了下面的tryAcquire方法,核心在於這個方法
final void lock() {
acquire(1);
}
/** * 這個方法和nonfairTryAcquire方法只有一點不一樣,在標註爲#1的地方 * 多了一個判斷hasQueuedPredecessors,這個方法是判斷當前AQS的同步隊列中是否還有等待的線程 * 若是有,返回true,不然返回false。 * 由此可知,當隊列中沒有等待的線程時,當前線程才能嘗試經過CAS的方式獲取鎖。 * 不然就讓這個線程去隊列後面排隊。 */
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// #1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
複製代碼
經過註釋可知,在公平鎖的機制下,任何線程想要獲取鎖,都要排隊,不可能出現插隊的狀況。這就是公平鎖的實現原理。
這個過程大概能夠描述爲下圖這樣子:
tryLock作的事情很簡單:讓當前線程嘗試獲取一次鎖,成功的話返回true,不然false。
其實現,其實就是調用了nonfairTryAcquire方法來獲取鎖。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
複製代碼
至於獲取失敗的話,他也不會將本身添加到同步隊列中等待,直接返回false,讓業務調用代碼本身處理。
中斷,也就是經過Thread的interrupt方法將某個線程中斷,中斷一個阻塞狀態的線程,會拋出一個InterruptedException異常。
若是獲取鎖是可中斷的,當一個線程長時間獲取不到鎖時,咱們能夠主動將其中斷,可避免死鎖的產生。
其實現方式以下:
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
複製代碼
會調用AQS的acquireInterruptibly方法
public final void acquireInterruptibly(int arg) throws InterruptedException {
// 判斷當前線程是否已經中斷,若是已中斷,拋出InterruptedException異常
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
複製代碼
此時會優先經過tryAcquire嘗試獲取鎖,若是獲取失敗,會將本身加入到隊列中等待,並可隨時響應中斷。
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 將本身添加到隊列中等待
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 自旋的獲取鎖
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 獲取鎖失敗,在parkAndCheckInterrupt方法中,經過LockSupport.park()阻塞當前線程,
// 並調用Thread.interrupted()判斷當前線程是否已經被中斷
// 若是被中斷,直接拋出InterruptedException異常,退出鎖的競爭隊列
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// #1
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
PS:不可中斷的方式下,代碼#1位置不會拋出InterruptedException異常,只是簡單的記錄一下當前線程被中斷了。
經過以下方法實現,timeout是超時時間,unit表明時間的單位(毫秒、秒...)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
複製代碼
能夠發現,這也是一個能夠響應中斷的方法。而後調用AQS的tryAcquireNanos方法:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
複製代碼
doAcquireNanos方法與中斷裏面的方法大同小異,下面在註釋中說明一下不一樣的地方:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算超時截止時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 計算到截止時間的剩餘時間
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 超時了,獲取失敗
return false;
// 超時時間大於1000納秒時,才阻塞
// 由於若是小於1000納秒,基本能夠認爲超時了(系統調用的時間可能都比這個長)
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 響應中斷
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
本文首先對比了元老級的鎖synchronized與ReentrantLock的不一樣,ReentrantLock具備一下優點:
而後介紹了幾個主要特性的實現原理,這些都是基於AQS的。
ReentrantLock的核心,是經過修改AQS中state的值來同步鎖的狀態。 經過這個方式,實現了可重入。
ReentrantLock具有公平鎖和非公平鎖,默認使用非公平鎖。其實現原理主要依賴於AQS中的同步隊列。
最後,可中斷的機制是內部經過Thread.interrupted()判斷當前線程是否已被中斷,若是被中斷就拋出InterruptedException異常來實現的。