package java.util.concurrent.locks; import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import sun.misc.Unsafe;java
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { private static final long serialVersionUID = 7373984972572414691L; /** * Creates a new {[@code](https://my.oschina.net/codeo) AbstractQueuedSynchronizer} instance * with initial synchronization state of zero. */ protected AbstractQueuedSynchronizer() { } static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; // -------------------------------- 同步狀態 --------------------------------------- // 因爲在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會發生變化。 /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; // 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行。 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; // 節點在等待隊列中,節點線程等待在Condition上,當其它線程對Condition調用了signal()方法後,該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中。 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; // 表示下一次共享式同步狀態獲取將會無條件地被傳播下去。 /** waitStatus value to indicate the next acquireShared should unconditionally propagate */ static final int PROPAGATE = -3; // 初始狀態(初始值)爲 0 // -------------------------------- 同步狀態 --------------------------------------- /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened. * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to signal. * So, most code doesn't need to check for particular values, just for sign. * * The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes. * It is modified using CAS (or when possible, unconditional volatile writes). */ volatile int waitStatus; /** * Link to predecessor node that current node/thread relies on for checking waitStatus. Assigned during enqueuing, * and nulled out (for sake of GC) only upon dequeuing. * Also, upon cancellation of a predecessor, we short-circuit while finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes head only as a result of successful acquire. * A cancelled thread never succeeds in acquiring, and a thread only cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread unparks upon release. * Assigned during enqueuing, adjusted when bypassing cancelled predecessors, and nulled out (for sake of GC) when dequeued. * The enq operation does not assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that node is at end of queue. * However, if a next field appears to be null, we can scan prev's from the tail to double-check. * The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue. */ volatile Node next; // 獲取同步狀態的線程 /** The thread that enqueued this node. Initialized on construction and nulled out after use. */ volatile Thread thread; /** * Link to next node waiting on condition, or the special value SHARED. * Because condition queues are accessed only when holding in exclusive mode, * we just need a simple linked queue to hold nodes while they are waiting on conditions. * They are then transferred to the queue to re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could be elided, but is present to help the VM. * * [@return](https://my.oschina.net/u/556800) the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } /** * Head of the wait queue, lazily initialized. Except for initialization, it is modified only via method setHead. * Note: If head exists, its waitStatus is guaranteed not to be CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via method enq to add new wait node. */ private transient volatile Node tail; // 同步狀態 /** The synchronization state. */ private volatile int state; /** * Returns the current value of synchronization state. * This operation has memory semantics of a {[@code](https://my.oschina.net/codeo) volatile} read. * [@return](https://my.oschina.net/u/556800) current state value */ protected final int getState() { return state; } /** * Sets the value of synchronization state. * This operation has memory semantics of a {[@code](https://my.oschina.net/codeo) volatile} write. * @param newState the new state value */ protected final void setState(int newState) { state = newState; } /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } /** * Acquires in shared mode, aborting if interrupted. Implemented by first checking interrupt status, * then invoking at least once {@link #tryAcquireShared}, returning on success. * Otherwise the thread is queued, possibly repeatedly blocking and unblocking, * invoking {@link #tryAcquireShared} until success or the thread is interrupted. * @param arg the acquire argument. * This value is conveyed to {@link #tryAcquireShared} but is otherwise uninterpreted and can represent anything you like. * @throws InterruptedException if the current thread is interrupted */ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } /** * Acquires in shared interruptible mode. * @param arg the acquire argument */ private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } /** * Releases in shared mode. Implemented by unblocking one or more threads if {@link #tryReleaseShared} returns true. * * @param arg the release argument. This value is conveyed to {@link #tryReleaseShared} but is otherwise uninterpreted and can represent anything you like. * @return the value returned from {@link #tryReleaseShared} */ public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } /** * Release action for shared mode -- signals successor and ensures * propagation. (Note: For exclusive mode, release just amounts * to calling unparkSuccessor of head if it needs signal.) */ private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } // 這裏省略掉了其它的源碼。。
}node