AbstractQueuedSynchronizer源碼分析

概述

AbstractQueuedSynchronizer,簡稱AQS,它提供了同步器框架的抽象實現,ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier等併發類均是基於AQS來實現的,具體用法是經過繼承AQS實現其模板方法,而後將子類做爲同步組件的內部類。node

源碼分析

成員變量

head和tail節點

這倆個節點是Node類型的,後面會說先看代碼性能優化

//等待隊列的頭節點,惰性初始化。除了初始化外只能被setHead方法修改,而且節點的waitStatus不能爲CANCELLED
    private transient volatile Node head;
    //等待隊列的尾節點,惰性初始化,只會在調用enq方法添加等待節點時修改
    private transient volatile Node tail;

state

AQS維持了一個單一的volatile的state變量,並經過unsafe方法來原子性的獲取和設置該值併發

/**
     * The synchronization state.
     */
    private volatile int state;

    protected final int getState() {
        return state;
    }

    protected final void setState(int newState) {
        state = newState;
    }
    
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

spinForTimeoutThreshold

static final long spinForTimeoutThreshold = 1000L;

實現方法

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            //第一次入隊,head和tail都爲null
            if (t == null) { // Must initialize
                //原子設置頭節點
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //原子設置尾節點
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

使用傳入的mode節點來讓當前線程入隊app

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure 性能優化嘗試enq方法的快速執行路徑,不然嘗試完整的enq方法
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //這裏都是對共享狀態tail的操做,須要CAS保證
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

設置頭節點,會把入參節點的thread屬性和前指針賦null框架

/**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

喚醒節點的後繼者源碼分析

/**
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
相關文章
相關標籤/搜索