AQS
- AbstractQueuedSynchronizer隊列同步器,Lock接口實現的核心,可自定義同步器。
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
- CLH鎖實現圖,將每一個線程構形成Node節點,加入鏈表,新加入線程在列表隊列最後,每次頭結點獲取到所,後續節點繼續獲取鎖。
- AQS暴露操做方法,隱藏實現細節,集成AQS,重寫獲取鎖方法,可自定義同步鎖。
- 內部經過voliate修飾的int變量,
unsafe
方法提供的compareAndSet方法提供同步,逐步包裝成同步隊列,同步鎖。
- 原理總結,用AQS的volatile的同步特性,設置int鎖數量,unsafe對象的CAS同步設置方法,構造同步隊列,最後unsafe的park,unpark直接操做線程掛起釋放。
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
//實現AQS重寫鎖
//Lock的全部操做,經過代理內部類調用AQS,先看非公平鎖
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
//2. sync的非公平實現,快速調用父類AQS的CAS設置state是,若是失敗,進入acquire構造Node節點,加入CLK
//
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//3. AQS的acruird,tryAcquire交給子類自定實現,addWaiter加入CLK節點,acquireQueued,全部節點自旋,獲取同步狀態
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//1. 外部調用Lock
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public Condition newCondition() {
return sync.newCondition();
}
}
//4. AQS中的實現,tryAcquire交給子類
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//5. ReentranLock裏Sync非公平鎖實現
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取AQS中state
int c = getState();
if (c == 0) {
//AQS的CAS同步設置state,並設置持有線程
if (compareAndSetState(0, acquires)) {
//設置持有線程
setExclusiveOwnerThread(current);
return true;
}
}
//可重入關鍵,同一線程可重複獲取鎖
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//6. 加入CLK鏈表尾部,經過AQS,CAS設置tail節點,構造鏈表見自定容器Stack
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
//快速設置tail,若是失敗進入enq
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
//死循環設置尾節點,直到成功,以死循環的方式,同步設置tail節點
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//7. 最後,每一個構造的節點,都以自旋的方式,獲取前一節點,若是前一節點是頭節點,而且獲取到鎖,把當前本身設置成頭結點,獲的鎖。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//parkAndCheckInterrupt 調用LockSupport掛起線程,進入阻塞狀態
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//unlock,調用AQS釋放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//LockSupport的unparking,釋放線程掛起狀態
unparkSuccessor(h);
return true;
}
return false;
}
//1. 交給子類實現
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}