AbstractQueuedSynchronizer源碼分析

package java.util.concurrent.locks; import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.Collection; import java.util.Date; import sun.misc.Unsafe;java

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

private static final long serialVersionUID = 7373984972572414691L;

/**
 * Creates a new {[@code](https://my.oschina.net/codeo) AbstractQueuedSynchronizer} instance
 * with initial synchronization state of zero.
 */
protected AbstractQueuedSynchronizer() { }

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

	// -------------------------------- 同步狀態 ---------------------------------------
	
	// 因爲在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會發生變化。
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
	
	// 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者被取消,將會通知後繼節點,使後繼節點的線程得以運行。
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
	
	// 節點在等待隊列中,節點線程等待在Condition上,當其它線程對Condition調用了signal()方法後,該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中。
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
	
	// 表示下一次共享式同步狀態獲取將會無條件地被傳播下去。
    /** waitStatus value to indicate the next acquireShared should unconditionally propagate */
    static final int PROPAGATE = -3;
	
	// 初始狀態(初始值)爲 0
	
	// -------------------------------- 同步狀態 ---------------------------------------
	
    /**
     * Status field, taking on only the values:
     *   SIGNAL:     The successor of this node is (or will soon be)
     *               blocked (via park), so the current node must
     *               unpark its successor when it releases or
     *               cancels. To avoid races, acquire methods must
     *               first indicate they need a signal,
     *               then retry the atomic acquire, and then,
     *               on failure, block.
     *   CANCELLED:  This node is cancelled due to timeout or interrupt.
     *               Nodes never leave this state. In particular,
     *               a thread with cancelled node never again blocks.
     *   CONDITION:  This node is currently on a condition queue.
     *               It will not be used as a sync queue node
     *               until transferred, at which time the status
     *               will be set to 0. (Use of this value here has
     *               nothing to do with the other uses of the
     *               field, but simplifies mechanics.)
     *   PROPAGATE:  A releaseShared should be propagated to other
     *               nodes. This is set (for head node only) in
     *               doReleaseShared to ensure propagation
     *               continues, even if other operations have
     *               since intervened.
     *   0:          None of the above
     *
     * The values are arranged numerically to simplify use.
     * Non-negative values mean that a node doesn't need to signal. 
	 * So, most code doesn't need to check for particular values, just for sign.
     *
     * The field is initialized to 0 for normal sync nodes, and CONDITION for condition nodes.  
	 * It is modified using CAS (or when possible, unconditional volatile writes).
     */
    volatile int waitStatus;

    /**
     * Link to predecessor node that current node/thread relies on for checking waitStatus. Assigned during enqueuing, 
	 * and nulled out (for sake of GC) only upon dequeuing.  
	 * Also, upon cancellation of a predecessor, we short-circuit while finding a non-cancelled one, which will always exist
     * because the head node is never cancelled: A node becomes head only as a result of successful acquire.
	 * A cancelled thread never succeeds in acquiring, and a thread only cancels itself, not any other node.
     */
    volatile Node prev;

    /**
     * Link to the successor node that the current node/thread unparks upon release. 
	 * Assigned during enqueuing, adjusted when bypassing cancelled predecessors, and nulled out (for sake of GC) when dequeued.  
	 * The enq operation does not assign next field of a predecessor until after attachment,
     * so seeing a null next field does not necessarily mean that node is at end of queue. 
	 * However, if a next field appears to be null, we can scan prev's from the tail to double-check.  
	 * The next field of cancelled nodes is set to point to the node itself instead of null, to make life easier for isOnSyncQueue.
     */
    volatile Node next;

	// 獲取同步狀態的線程
    /** The thread that enqueued this node.  Initialized on construction and nulled out after use. */
    volatile Thread thread;

    /**
     * Link to next node waiting on condition, or the special value SHARED.  
	 * Because condition queues are accessed only when holding in exclusive mode, 
	 * we just need a simple linked queue to hold nodes while they are waiting on conditions. 
	 * They are then transferred to the queue to re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared mode.
     */
    Node nextWaiter;

    /**
     * Returns true if node is waiting in shared mode.
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /**
     * Returns previous node, or throws NullPointerException if null.
     * Use when predecessor cannot be null.  The null check could be elided, but is present to help the VM.
     *
     * [@return](https://my.oschina.net/u/556800) the predecessor of this node
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

/**
 * Head of the wait queue, lazily initialized.  Except for initialization, it is modified only via method setHead.  
 * Note: If head exists, its waitStatus is guaranteed not to be CANCELLED.
 */
private transient volatile Node head;

/**
 * Tail of the wait queue, lazily initialized.  Modified only via method enq to add new wait node.
 */
private transient volatile Node tail;

// 同步狀態
/** The synchronization state. */
private volatile int state;

/**
 * Returns the current value of synchronization state.
 * This operation has memory semantics of a {[@code](https://my.oschina.net/codeo) volatile} read.
 * [@return](https://my.oschina.net/u/556800) current state value
 */
protected final int getState() {
    return state;
}

/**
 * Sets the value of synchronization state.
 * This operation has memory semantics of a {[@code](https://my.oschina.net/codeo) volatile} write.
 * @param newState the new state value
 */
protected final void setState(int newState) {
    state = newState;
}

/**
 * Atomically sets synchronization state to the given updated
 * value if the current state value equals the expected value.
 * This operation has memory semantics of a {@code volatile} read and write.
 *
 * @param expect the expected value
 * @param update the new value
 * @return {@code true} if successful. False return indicates that the actual
 *         value was not equal to the expected value.
 */
protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}


/**
 * Acquires in shared mode, aborting if interrupted.  Implemented by first checking interrupt status, 
 * then invoking at least once {@link #tryAcquireShared}, returning on success.  
 * Otherwise the thread is queued, possibly repeatedly blocking and unblocking,
 * invoking {@link #tryAcquireShared} until success or the thread is interrupted.
 * @param arg the acquire argument.
 * This value is conveyed to {@link #tryAcquireShared} but is otherwise uninterpreted and can represent anything you like.
 * @throws InterruptedException if the current thread is interrupted
 */
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

/**
 * Acquires in shared interruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * Releases in shared mode.  Implemented by unblocking one or more threads if {@link #tryReleaseShared} returns true.
 *
 * @param arg the release argument.  This value is conveyed to {@link #tryReleaseShared} but is otherwise uninterpreted and can represent anything you like.
 * @return the value returned from {@link #tryReleaseShared}
 */
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

/**
 * Release action for shared mode -- signals successor and ensures
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 */
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}


// 這裏省略掉了其它的源碼。。

}node

相關文章
相關標籤/搜索