AQS全稱AbstractQueuedSynchronizer
,它是實現 JCU包中幾乎全部的有關鎖、多線程併發以及線程同步器等重要組件的基石, 其核心思想就是volatile int state
這個屬性配合Unsafe
這個工具類來實現對當前鎖的狀態進行修改 。html
AQS內部維護着一個FIFO的CLH隊列,該隊列的基本結構以下。 java
AQS中使用Node來表示CLH隊列的每一個節點,源碼以下:node
static final class Node {
//表示共享模式(共享鎖)
static final Node SHARED = new Node();
//表示獨佔模式(獨佔鎖)
static final Node EXCLUSIVE = null;
//表示線程已取消
static final int CANCELLED = 1;
//表示當前結點的後繼節點須要被喚醒
static final int SIGNAL = -1;
//線程(處在Condition休眠狀態)在等待Condition喚醒
static final int CONDITION = -2;
//表示鎖的下一次獲取能夠無條件傳播,在共享模式頭結點有可能處於這種狀態
static final int PROPAGATE = -3;
//線程等待狀態
volatile int waitStatus;
//當前節點的前一個節點
volatile Node prev;
//當前節點的下一個節點
volatile Node next;
//當前節點所表明的的線程
volatile Thread thread;
//能夠理解爲當前是獨佔模式仍是共享模式
Node nextWaiter;
//若是節點在共享模式下等待,則返回true。
final boolean isShared() {
return nextWaiter == SHARED;
}
//獲取前一個節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
...
}
複製代碼
若是當前線程經過CAS獲取鎖失敗,AQS會將該線程以及等待狀態等信息打包成一個Node節點,並將其加入同步隊列的尾部,同時將當前線程掛起。編程
public final void acquire(int arg) {
//tryAcquire是子類重寫的方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//將線程及狀態信息打包成一個節點
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//快速的將節點插入隊列尾部,
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速插入失敗,經過輪詢來插入尾部,性能比快速插入消耗要大一些
enq(node);
return node;
}
//經過輪詢方式將節點插入隊列尾部
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//掛起當前線程
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//經過LockSupport來掛起當前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
複製代碼
整體流程圖以下。 segmentfault
當釋放鎖時,執行出隊操做及喚醒後繼節點。安全
public final boolean release(int arg) {
//tryRelease是子類實現的方式
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//線程喚醒及出隊操做
unparkSuccessor(h);
return true;
}
return false;
}
複製代碼
整體流程圖以下。 多線程
或許對圖中的同步器有所疑惑。它究竟是什麼?其實很簡單,它就是給首尾兩個節點加上volatile
同步域,以下。併發
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
複製代碼
上面三個變量是AQS中很是重要的三個變量,前面兩個變量好理解,下面就來講一下state
變量,該變量是一個計數器,在獨佔鎖狀況下,獲取鎖後,state的值就會爲1,釋放鎖時就設置爲0(這種鎖屬於不可重入鎖);在共享鎖狀況下,每個線程獲取到鎖,就會state++,釋放鎖時就state--;在可重入鎖狀況下(獲取的鎖都是同一把鎖),每獲取一次鎖會state++,釋放鎖時state--。ide
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//使用CAS
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
複製代碼
經過上一節的講解,想必對AQS有了必定的瞭解,下面就經過AQS來實現一個獨佔鎖及共享鎖。工具
AQS很是強大,只須要重寫tryAcquire
、tryRelease
這兩個方法就能夠實現一個獨佔鎖。代碼以下:
public class SingleLock implements Lock {
//自定義的獨佔鎖
static class Sync extends AbstractQueuedSynchronizer {
//獨佔鎖
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//獨佔鎖
@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//判斷是是不是獨佔鎖。
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
Condition newCondition() {
return new ConditionObject();
}
}
private Sync sync;
public SingleLock() {
sync = new Sync();
}
//加鎖
@Override
public void lock() {
sync.acquire(1);
}
//獲取可中斷鎖
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//獲取鎖,可能失敗
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
//在time時間內不能獲取鎖則失敗
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}
//釋放鎖
@Override
public void unlock() {
sync.release(1);
}
//Condition來實現阻塞喚醒機制
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
複製代碼
很簡單的代碼就實現了一個獨佔鎖,SingleLock
擁有ReentrantLock
的大部分功能,而且用法如出一轍。是否是很簡單...
JUC包中提供的閉鎖(CountDownLatch)及信號量(Semaphore)就是典型的共享鎖的實現。共享鎖的實現也很簡單,須要重寫tryAcquireShared
、tryReleaseShared
這兩個方法。下面就來實現一個共享鎖。代碼以下:
public class ShareLock implements Lock {
static class Sync extends AbstractQueuedSynchronizer {
private int count;
Sync(int count) {
this.count = count;
}
@Override
protected int tryAcquireShared(int arg) {
for (; ; ) {
int current = getState();
int newCount = current - arg;
if (newCount < 0 || compareAndSetState(current, newCount)) {
return newCount;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (; ; ) {
int current = getState();
int newCount = current + arg;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
Condition newCondition() {
return new ConditionObject();
}
}
private int count;
private Sync sync;
public ShareLock(int count) {
this.count = count;
sync = new Sync(count);
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquireShared(1) >= 0;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
}
複製代碼
ShareLock
容許count
個線程同時獲取鎖,它的實現也很簡單吧。經過上面這兩個例子,咱們就能夠按照本身需求來實現不一樣的鎖,但JUC包中提供的類基本上能知足絕大部分需求了。
SingleLock
是咱們本身實現的一種獨佔鎖,但若是把它用在遞歸中,就會產生死鎖。由於SingleLock
不具有可重入性。那麼該如何實現可重入性尼?來看ReentrantLock
的實現。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//可重入性的實現,若是當前線程已拿到鎖
else if (current == getExclusiveOwnerThread()) {
//狀態加1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//從新設置狀態
setState(nextc);
return true;
}
return false;
}
複製代碼
能夠發現可重入性的實現仍是蠻簡單的,首先判斷當前線程是否是已經拿到鎖,若是已經拿到鎖就將state的值加1。可重入性這一點很是重要,不然會產生沒必要要的死鎖問題,Synchronize
也具有可重入性。
SingleLock
屬於一個非公平鎖,那麼如何實現公平鎖尼?其實這更簡單,只須要加個判斷便可。來看ReentrantLock
的公平鎖的實現。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//若是當前線程以前還有節點則hasQueuedPredecessors返回true,就不會去競爭鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
複製代碼
hasQueuedPredecessors
就是判斷鎖是否公平的關鍵,若是在當前線程以前還有排隊的線程就返回true,這時候當前線程就不會去競爭鎖。從而保證了鎖的公平性。
在使用wait/notify/notifyAll
時,喚醒線程都是使用notifyAll
來喚醒線程,由於notify
沒法喚醒指定線程,從而可能致使死鎖。但使用notifyAll
也有一個問題,那就是當大量線程來獲取鎖時,就會產生驚羣效應,大量的競爭必然形成資源的劇增和浪費,所以終究只能有一個線程競爭成功,其餘線程仍是要老老實實的回去等待。而AQS的FIFO的等待隊列給解決在鎖競爭方面的驚羣效應問題提供了一個思路:保持一個FIFO隊列,隊列每一個節點只關心其前一個節點的狀態,線程喚醒也只喚醒隊頭等待線程。
【參考資料】