在 Java5
以前,只能使用 synchronized
關鍵字來實現鎖。它使用起來比較簡單,可是有一些侷限性:java
而在 Java5
中,併發包中增長了 Lock
接口及其實現類,它的功能與 synchronized
相似,須要進行顯示地獲取和釋放鎖,可是卻提供了不少 synchronized
不具備的特性。舉一個例子:node
Lock lock = new ReentrantLock();
lock.lock();
try {
//
} finally {
lock.unlock();
}
複製代碼
注意的是獲取鎖的 lock
方法應該寫在 try
塊以外,由於若是寫在 try
塊中,獲取鎖時發生了異常,拋出異常的同時也會致使鎖無端釋放,而不是等到執行 finally
語句時才釋放鎖。編程
在 Lock
接口中,定義了鎖獲取和釋放的基本操做,包括可中斷的獲取鎖、超時獲取鎖等特性:安全
public interface Lock {
// 獲取鎖
void lock();
// 可中斷地獲取鎖,即獲取鎖時,其餘線程能夠中斷當前線程
void lockInterruptibly() throws InterruptedException;
// 嘗試獲取鎖,調用後會當即返回,能獲取就返回 true,不然返回 false
boolean tryLock();
// 在給定時間內可中斷地嘗試獲取鎖
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 釋放鎖
void unlock();
// 返回一個綁定到該 Lock 實例上的 Condition
// 只有當前線程持有了鎖,才能調用 await 方法,await 方法的調用將會自動釋放鎖
Condition newCondition();
}
複製代碼
Lock
接口的主要實現就是 ReentrantLock
。而 Lock
接口的實現基本都是經過內部實現了一個同步器 AQS
的子類來實現線程訪問控制的。併發
同步器 AbstractQueuedSynchronizer
,是用來構建鎖或其餘同步組件的基礎框架。它使用一個 int
成員變量表示同步狀態,經過內置的 FIFO
同步隊列來完成線程獲取資源時的排隊等待工做。框架
在自定義同步組件時,推薦定義一個靜態內部類,使其繼承自同步器 AQS
並實現它的抽象方法來管理同步狀態,在實現抽象方法時,對同步狀態的管理可使用同步器提供的三個方法。oop
private volatile int state;
// 獲取當前同步狀態
protected final int getState() {
return state;
}
// 設置當前同步狀態
protected final void setState(int newState) {
state = newState;
}
// 使用 CAS 設置當前狀態,保證原子性
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
同步器是實現同步組件的關,它們兩者的關係以下:ui
同步器是基於模板方法模式的。使用者須要繼承同步器並重寫指定的方法。而可重寫的方法主要有:this
方法名 | 描述 |
---|---|
tryAcquire | 獨佔式獲取同步狀態 |
tryRelease | 獨佔式釋放同步狀態 |
tryAcquireShared | 共享式獲取同步狀態 |
tryReleaseShared | 共享式釋放同步狀態 |
isHeldExclusively | 判斷同步器是否被線程獨佔 |
隨後將同步器組合到自定義同步組件的實現中,並調用同步器提供的模板方法,而這些模板方法會調用使用者重寫的方法。spa
可調用的模板方法主要有三類:獨佔式獲取與釋放同步狀態、共享式獲取與釋放狀態、以及查詢同步隊列中的等待線程狀況。下文會介紹它們,並簡單分析其實現原理。
同步器內部使用一個 FIFO
同步隊列來管理同步狀態,在線程獲取同步狀態失敗時,同步器會將當前線程與等待狀態等信息構形成一個節點,將其加入到同步隊列中,同時會阻塞當前線程。當釋放同步狀態時,則會喚醒隊列中首節點的線程,使其再次嘗試獲取同步狀態。
同步隊列中的節點的主要屬性有:
static final class Node {
// 等待狀態
volatile int waitStatus;
// 前驅節點,在入隊時被賦值
volatile Node prev;
// 後繼節點,
volatile Node next;
// 加入節點的線程,該線程獲取到同步狀態
volatile Thread thread;
}
複製代碼
等待狀態 waitStatus
的取值主要有:
// 同步隊列中等待的線程等待超時或被中斷,須要取消等待,以後節點的狀態將不會再改變
static final int CANCELLED = 1;
// 後繼節點的線程處於等待狀態
// 當前節點的線程釋放或取消同步狀態時,會喚醒它的後繼節點
static final int SIGNAL = -1;
// 節點目前在等待隊列中
// 當節點被喚醒時,從等待隊列轉移到同步隊列中,嘗試獲取同步狀態
static final int CONDITION = -2;
// 共享式同步狀態被傳播給其餘節點
static final int PROPAGATE = -3;
//初始化 waitStatus 值爲 0
複製代碼
同步器中包含兩個引用,分別指向同步隊列的首節點和尾節點:
// 頭節點,惰性初始化
private transient volatile Node head;
// 尾節點,惰性初始化
private transient volatile Node tail;
複製代碼
當線程沒法獲取同步狀態,會將該線程構形成一個節點加入同步隊列中,使用 addWaiter
方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 快速嘗試
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
複製代碼
若是快速嘗試添加尾節點失敗,則調用 enq
方法經過死循環來保證節點的正確添加:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 若是未初始化,則會先初始化,再繼續嘗試
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
複製代碼
而這個過程可能會有多個線程同時執行,因此必需要保證線程安全,提供了基於 CAS
的設置尾節點的方法:
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
複製代碼
同步隊列中,首節點是獲取同步狀態成功的節點,線程在釋放同步狀態時,會喚醒後繼節點,後繼節點成功獲取同步狀態時將本身設置爲首節點,因爲只有一個線程能獲取到同步狀態,因此設置頭節點的方法不須要 CAS
方法保證:
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
複製代碼
獨佔式獲取與釋放同步狀態主要有四個模板方法,分別是:
方法名 | 描述 |
---|---|
void acquire(int arg) | 獨佔式獲取同步狀態 |
void acquireInterruptibly(int arg) | 可響應中斷的獨佔式獲取同步狀態 |
boolean tryAcquireNanos(int arg, long nanos) | 可響應中斷的獨佔式超時獲取同步狀態 |
boolean release(int arg) | 獨佔式釋放同步狀態 |
acquire
方法能夠獲取同步狀態,該方法爲獨佔式獲取,不可中斷,也就是若是線程獲取同步狀態失敗,加入到同步隊列中,後續對線程進行中斷操做,線程並不會被移除。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
複製代碼
acquire
方法中,首先調用 tryAcquire
方法嘗試獲取同步狀態,該方法由自定義組件本身實現。若是獲取失敗,調用 addWaiter
方法將當前線程加入到同步隊列末尾。最後調用 acquiredQueued
方法經過死循環的方式來獲取同步狀態:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
該方法中,經過死循環的方式來獲取同步狀態,而且只有前驅節點是頭節點時,纔可以嘗試獲取同步狀態,這樣作就是爲了保持 FIFO
同步隊列原則,即先加入到同步隊列中的線程先嚐試獲取同步狀態。
另外,在自旋時首先會調用 shouldParkAfterFailedAcquire
方法判斷是否應該被阻塞:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驅節點狀態爲 SIGNAL ,則當前節點能夠被阻塞
return true;
if (ws > 0) {
// 前驅節點處於取消狀態,也就是超時或被中斷,須要從同步隊列中刪除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 不然,將當前節點設置爲 SIGNAL,不會阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
複製代碼
該方法主要是根據前驅節點的 waitStatus
來判斷當前節點的線程,若是當前節點應該被阻塞,則會調用 parkAndCheckInterrupt
方法阻塞:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
複製代碼
該方法調用 LockSupport.park()
方法阻塞當前線程,並返回當前線程的中斷狀態。
acquireInterruptibly
方法以可響應中斷的方式獲取同步狀態,其中調用 tryAcquire
方法失敗後,會調用 doAcquireInterruptibly
方法自旋式獲取。
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
複製代碼
doAcquireInterruptibly
方法與普通地獨佔式獲取同步狀態很是相似,只是再也不使用 interrupt
標誌,而是直接拋出 InterruptedException
異常。
tryAcquireNanos
方法能夠超時獲取同步狀態,即在指定時間內可中斷地獲取同步狀態。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
複製代碼
該方法首先調用 tryAcquire
方法嘗試獲取同步狀態,若是獲取失敗,則會調用 doAcquireNanos
方法:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 計算總的超時時間
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 剩餘的超時時間
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 若是超時時間大於 臨界值,則會阻塞線程,不然快速自旋
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
該方法中,首先計算出超時的最終時間,而後將當前節點加入到同步隊列中。
而後自旋進行判斷,若是當前節點爲頭節點,則會調用 tryAcquire
方法嘗試獲取同步狀態;不然從新計算超時時間,若是 nanosTimeout
小於 0
,則獲取失敗。不然繼續判斷超時時間是否大於 spinForTimeoutThreshold
臨界值,若是大於表示時間較長,調用 LockSupport.parkNanos
使線程阻塞。
若是時間較短,則直接進入自旋過程,繼續判斷。另外,還會判斷線程是否被中斷。
release
方法用來釋放同步狀態,該方法釋放了同步狀態後,會喚醒後繼節點,使其從新嘗試獲取同步狀態。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
該方法中,首先調用 tryRelease
方法嘗試釋放同步狀態,該方法由自定義同步組件本身實現。而後調用 unparkSuccessor
方法來喚醒後繼節點:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 節點狀態設置爲 0
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 若是後繼節點超時或者被中斷
if (s == null || s.waitStatus > 0) {
s = null;
// 從 tail 向前,找最靠近 head 的可用節點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
複製代碼
該方法首先找到一個可用的 waitStatus
值大於 0
的節點,而後調用 LockSupport.unpark
方法喚醒該線程。
共享式與獨佔式最大的區別就是同一時刻有多個線程同時獲取到同步狀態。
共享式獲取與釋放同步狀態主要有四個模板方法,分別是:
方法名 | 描述 |
---|---|
acquireShared(int arg) | 共享式獲取同步狀態 |
acquireSharedInterruptibly(int arg) | 可響應中斷的共享式獲取同步狀態 |
tryAcquireSharedNanos(int arg, long anos) | 可響應中斷的共享式超時獲取同步狀態 |
releaseShared(int arg) | 共享式釋放同步狀態 |
acquireShared
方法能夠共享式地獲取同步狀態:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
複製代碼
該方法中,首先調用 tryAcquireShared
方法嘗試獲取同步狀態,若是返回值大於等於 0
,則表示獲取成功。不然獲取失敗,則會調用 doAcquireShared
方法:
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取前驅節點
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 大於等於 0,表示獲取成功
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
首先以共享節點加入到等待隊列中,而後以死循環的方式進行判斷,若是當前節點的前驅節點爲頭節點,則調用 doAcquireShared
方法嘗試獲取同步狀態,直到其返回值大於等於 0
。
可響應中斷、超時獲取的共享式獲取同步狀態與以前相似,這裏也就很少介紹。
releaseShared
方法用於共享式釋放同步狀態,
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
複製代碼
該方法首先調用 tryReleaseShared
嘗試釋放同步狀態,若是釋放失敗,則會調用 doReleaseShared
方法;
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
複製代碼
該方法中在釋放同步狀態時,因爲有多個線程,須要保證線程安全。首先,若是後繼節點的線程須要喚醒,則將當前節點的狀態設置爲 0
,而後調用 unparkSuccessor
方法喚醒後繼節點。