AbstractQueuedSynchronizer維護了一個volatile int類型的變量,用戶表示當前同步狀態。java
private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
這三個方法都是原子性操做。node
其中AQS類繼承AbstractOwnableSynchronizer併發
private transient Thread exclusiveOwnerThread; protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; }
當中的這兩個方法能夠分別記錄當前AQS所同步着的線程。app
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire()嘗試直接去獲取資源,若是成功則直接返回;(tryAcquire是抽象方法,暴露給子類實現)less
addWaiter()方法負責把當前沒法得到鎖的線程包裝爲一個Node添加到隊尾,SHARED:共享鎖,EXCLUSIVE:獨佔鎖。ui
static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
acquireQueued()的主要做用是把已經追加到隊列的線程節點經過調用parkAndCheckInterrupt()進行阻塞,等被喚醒後再去競爭資源。this
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
檢查前一個節點的線程狀態spa
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
release:若是能夠釋放鎖,則喚醒隊列第一個線程(Head)線程
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease:若是線程屢次鎖定,則進行屢次釋放,直至status==0則真正釋放鎖,所謂釋放鎖即設置status爲0,由於無競爭因此沒有使用CAS。 code
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
ReentrantLock()有公平鎖和非公平鎖兩種
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); }
非公平鎖:
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平鎖:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
hasQueuedPredecessors()就是判斷鎖是否公平的關鍵,若是在當前線程以前還有排隊的線程就返回true,這時候當前線程就不會去競爭鎖。從而保證了鎖的公平性。
public void lock() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); int c = getState(); //獲取寫鎖,寫鎖存在且不是當前線程 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; //獲取讀鎖 int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && //MAX_COUNT爲獲取讀鎖的最大數量,爲16位的最大值 compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { //若是有資源,繼續釋放後面的節點 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
tryAcquireShared()流程:
doAcquireShared():
tryAcquireShared()嘗試加鎖失敗,調用doAcquireShared()加入阻塞隊列以前,還會嘗試一次加鎖,有剩餘的話還會喚醒以後的隊友。那麼問題就來了,假如老大用完後釋放了5個資源,而老二須要6個,老三須要1個,老四須要2個。老大先喚醒老二,老二一看資源不夠,他是把資源讓給老三呢,仍是不讓?答案是否認的!老二會繼續park()等待其餘線程釋放資源,也更不會去喚醒老三和老四了。獨佔模式,同一時刻只有一個線程去執行,這樣作何嘗不可;但共享模式下,多個線程是能夠同時執行的,如今由於老二的資源需求量大,而把後面量小的老三和老四也都卡住了。固然,這並非問題,只是AQS保證嚴格按照入隊順序喚醒罷了(保證公平,但下降了併發)