//AQS成員變量,同步狀態 private volatile int state; //獲取當前同步狀態 protected final int getState() { return state; } //設置當前同步狀態 protected final void setState(int newState) { state = newState; } //使用CAS設置當前狀態,該方法可以保證狀態設置的原子性 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
當有多個線程競爭獲取鎖時,只有一個線程能獲取到鎖,那麼這些沒有獲取到鎖的線程就須要等待,等到線程把鎖釋放了再喚醒等待線程去獲取鎖,爲了實現等待-喚醒機制,AQS提供了基於CLH隊列(Craig, Landin,Hagersten)實現的等待隊列,是一個先入先出的雙向隊列。同步隊列是一個非阻塞的 FIFO 隊列。也就是說往裏面插入或移除一個節點的時候,在併發條件下不會阻塞,而是經過自旋鎖和CAS保證節點插入和移除的原子性。
static final class Node { //等待狀態 volatile int waitStatus; //前驅結點 volatile Node prev; //後繼節點 volatile Node next; //等待獲取鎖的線程 volatile Thread thread; //condition隊列的後繼節點 Node nextWaiter; }
private final Sync sync; public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
public void lock() { sync.lock(); }
final void lock() { acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { //獲取當前線程 final Thread current = Thread.currentThread(); //獲取同步狀態 int c = getState(); //同步狀態爲0,沒有其餘線程佔據鎖 if (c == 0) { //檢測同步隊列沒有其餘線程等待(確保公平性),若是沒有獲取鎖就以CAS方式嘗試改變同步狀態 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //設置鎖的擁有者爲當前線程 setExclusiveOwnerThread(current); return true; } } //同步狀態不爲0,檢測是不是當前線程擁有鎖 else if (current == getExclusiveOwnerThread()) { //當前線程擁有鎖,直接更新同步狀態,重入鎖 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
public final boolean hasQueuedPredecessors() { //同步隊列尾節點 Node t = tail; //同步隊列頭節點 Node h = head; Node s; //h!=t 頭節點和尾節點不一樣,說明同步隊列不爲空 //同步隊列不爲空,檢測下一個等待獲取鎖的線程(h.next.thread)是否是當前線程 return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }
private Node addWaiter(Node mode) { //以當前線程和給定模式構成節點Node Node node = new Node(Thread.currentThread(), mode); // 同步隊列不爲空,以CAS方式把當前線程加入到隊列末尾 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } //隊列爲空,創建同步隊列,再把當前線程加入同步隊列 enq(node); return node; }
private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //當前節點的前驅結點 final Node p = node.predecessor(); //前驅結點是head頭節點,嘗試獲取同步狀態 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { // c是本次釋放鎖以後的同步狀態 int c = getState() - releases; //當前線程不是鎖的擁有者,拋出IllegalMonitorStateException異常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //若是「鎖」已經被當前線程完全釋放,則設置「鎖」的持有者爲null,即鎖是可獲取狀態。 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { // 獲取當前線程(要釋放鎖)的等待狀態 int ws = node.waitStatus; if (ws < 0) //設置爲初始狀態 compareAndSetWaitStatus(node, ws, 0); //同步隊列頭節點的下一個等待節點 Node s = node.next; //等待節點無效,從同步節點尾部開始遍歷找到有效的等待節點 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } //喚醒等待節點的線程 if (s != null) LockSupport.unpark(s.thread); }
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //同步狀態爲0,嘗試以CAS方式改變同步狀態 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //重入鎖 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }