高併發 - AbstractQueuedSynchronizer

舒適提醒

  1. AbstractQueuedSynchronizer隊列是CLH隊列的變種,CLH隊列等待採用自旋,AQS的隊列等待採用LockSupport#park。
  2. Node.waitStatus表示對應線程是否應當阻塞,
  3. head節點是正佔有鎖的線程的,其thread值爲null,處於head後驅節點的線程纔會去tryAcquire,tryAcquire由子類實現。
  4. 入隊在tail,出隊在head
如下必需要子類實現:
/**
 * exclusive mode 
 */
boolean tryAcquire(int arg) 
/**
 * exclusive mode.
 */
boolean tryRelease(int arg) 
/**
 * shared mode.
 */
int tryAcquireShared(int arg)
/**
 * shared mode.
 */
boolean tryReleaseShared(int arg) 
/**
 * Returns true if synchronization is held exclusively with
 * respect to the current (calling) thread.  
 */
boolean isHeldExclusively()

獨佔模式acquire

private Node enq(final Node node) {
// 無限循環,即step 1返回false(有別的線程將它的node鏈接到tail)也會從新將node鏈接到tail
    for (;;) {
        Node t = tail;
        if (t == null) {     // new一個不帶任何狀態的Node做爲頭節點
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {    // step 1
                t.next = node;
                return t;    // 返回當前tail
            }
        }
    }
}


private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    //-- 和#enq邏輯比,只是取消了循環,爲了更快?
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
//--
    enq(node);
    return node;    // 返回當前線程的node
}


final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
    // 無限循環,直到當前線程node的前驅node是head,不然對於node狀態爲SIGNAL的線程會park
        for (;;) {
            final Node p = node.predecessor();
        // 若是當前線程node的前驅是head
            if (p == head && tryAcquire(arg)) {
            // head = node; 從而讓其餘線程也能走入該if
        // node.thread = null; 因此head永遠是一個不帶Thread的空節點 
        // node.prev = null;
                setHead(node);        
                p.next = null;     // 配合上面的 node.prev = null; for GC
                failed = false;
                return interrupted;
            }
        // 判斷在tryAcquire失敗後是否應該park,如果,則執行park
            if (shouldParkAfterFailedAcquire(p, node) && 
          // LockSupport#park,返回Thread#interrupted
          parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
// 表示當前節點應當park
    if (ws == Node.SIGNAL) return true;
// 當前節點不斷向前找,直到找到一個前驅節點waitStats不是CANCELLED的爲止(狀態值裏面只有CANCELLED是大於0的)
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
    // 中間CANCELLED的node做廢
        pred.next = node;
    } 
// 0 or PROPAGATE 須要設置爲SIGNAL,但仍然返回false,即don’t park
else { 
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}


// Acquires in exclusive mode, ignoring interrupts
public final void acquire(int arg) {
// 這裏提早tryAcquire爲了省去入隊列操做,提升性能,由於大部分狀況下可能都沒有鎖競爭
    if (!tryAcquire(arg) &&
    // 入隊列,返回當前線程中斷狀態
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

獨佔模式release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)    // step 1
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
   
    int ws = node.waitStatus;
// 這裏是SIGNAL,將head的waitStatus設爲0,是爲了避免重複step 1
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
// 若是head(當前線程)無後驅node,或後驅node爲CANCELLED
    if (s == null || s.waitStatus > 0) {
        s = null;
    // 從鏈表tail開始遍歷,取出非CANCELLED的node
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
// 非CANCELLED的線程unpark,繼續#acquireQueued的for循環
    if (s != null)
        LockSupport.unpark(s.thread);
}
相關文章
相關標籤/搜索