Java併發——AbstractQueuedSynchronizer(AQS)同步器

簡介

在此以前介紹ReentrantLockReentrantReadWriteLock中都有sync屬性,而sync正是繼承了AQS(AbstractQueuedSynchronizer)同步器。AQS採用模板設計模式,調用其模板方法(獨佔式獲取與釋放同步狀態、共享式獲取與釋放同步狀態和查詢同步隊列中的等待線程狀況),重寫指定方法,咱們自身就能利用AQS構造出自定義同步組件。java

AQS解析

重要屬性

//等待隊列的頭節點
    private transient volatile Node head;
    //等待隊列的尾節點
    private transient volatile Node tail;
    //同步狀態
    private volatile int state;
複製代碼

AQS內部經過head、tail定義了一個FIFO隊列,state表示同步狀態(0表示未有線程獲取同步狀態或鎖,大於0表示有線程佔有),都經過volatile修飾,保證了內存可見性node

重要內部類

static final class Node {
    /** 共享模式 */
    static final Node SHARED = new Node();
    /** 獨佔模式 */
    static final Node EXCLUSIVE = null;
    /**由於在同步隊列中等待的線程等待超時或者被中斷,須要從同步隊列中取消等待,節點進入該狀態將不會變化 */
    static final int CANCELLED =  1;
    /** 
     * 後繼節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者取消 
     * 將會通知後繼節點,使後繼節點的線程運行
     */ 
    static final int SIGNAL    = -1;
    /**
     * 節點在等待隊列中,節點線程等待在Condtion上,當其餘線程對Condtion調用了signal()方法後
     * 該節點將會從等待隊列中轉移到同步隊列中,加入到對同步狀態的獲取中 
     */
    static final int CONDITION = -2;
    /**  
     * 表示下一次共享式同步狀態獲取將會無條件被傳播下去
     */
    static final int PROPAGATE = -3;
    /** 當前節點等待狀態 */
    volatile int waitStatus;
    /** 前驅節點 */
    volatile Node prev;
    /** 後繼節點 */
    volatile Node next;
    /** 節點關聯的線程 */
    volatile Thread thread;
    /** 
     * 等待隊列中的後繼節點,若是當前節點是共享的,那麼這個字段是一個SHARED常量,即節點類型(獨佔和共享)
     * 和等待隊列中的後繼節點公用同一個字段
     */
    Node nextWaiter;
    }
複製代碼

node節點是構成同步隊列的基礎,pre、next前驅後繼維護了一個雙向隊列,同步隊列結構如圖:編程

當一個線程成功地獲取了同步狀態(或者鎖),其餘線程將沒法獲取到同步狀態,轉而被構形成爲節點並加入到同步隊列中

獨佔式獲取與釋放同步狀態

  • 獨佔式獲取同步狀態
  • 獨佔鎖的lock方法都會調用AQS所提供的模板方法acquire(), 當線程獲取同步狀態失敗後進入同步隊列中,後續對線程進行中斷操做時,線程不會從同步隊列中移出
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    複製代碼
    模板主要思路:

    ①.調用tryAcquire()方法保證線程安全地獲取同步狀態(或者鎖),此方法自定義同步器本身實現
    ②.若獲取失敗,則構造同步節點,調用addWaiter()將該節點加入同步隊列尾部
    ③.最後調用acquireQueued()死循環獲取同步狀態
    ④.若是獲取不到,調用shouldParkAfterFailedAcquire()方法判斷是否須要阻塞,若返回true阻塞節點中的線程,能夠依靠前驅節點的出隊或阻塞線程被中斷來喚醒阻塞線程

    設計模式

    tryAcquire

    tryAcquire()方法體內部只拋異常,若自定義同步器爲獨佔式獲取同步狀態必須重寫此方法安全

    protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    複製代碼

    addWaiter

    將節點加入同步隊列併發

    private Node addWaiter(Node mode) {
            //新建Node
            Node node = new Node(Thread.currentThread(), mode);
            // CAS快速嘗試尾插節點
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //屢次嘗試
            enq(node);
            return node;
        }
    複製代碼

    若隊列爲空或者cas設置失敗後,調用enq自旋再次設置工具

    private Node enq(final Node node) {
            // 死循環
            for (;;) {
                // 獲取尾結點
                Node t = tail;
                // 若隊列爲空,初始化
                if (t == null) { // Must initialize
                    // cas設置頭節點
                    if (compareAndSetHead(new Node()))
                        // 設置尾結點
                        tail = head;
                } else {
                    // CAS設置尾結點
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    複製代碼

    從源碼中能夠發現若同步隊列添加節點失敗後,會死循環一直尾插下去直至添加成功

    post

    acquireQueued

    節點進入同步隊列以後,就進入了一個自旋的過程,當條件知足,獲取到了同步狀態,就能夠從這個自旋過程當中退出,不然會一直執行下去ui

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                // 中斷標記
                boolean interrupted = false;
                // 自旋死循環
                for (;;) {
                    // 獲取當前節點的前驅節點
                    final Node p = node.predecessor();
                    // 若前驅節點爲頭節點且獲取同步狀態成功
                    if (p == head && tryAcquire(arg)) {
                        // 設置頭節點
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 判斷線程是否須要阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    複製代碼

    從源碼中能夠發現只有當前節點的前驅節點是頭節點才能嘗試獲取同步狀態,其緣由在於:
    ①.頭節點是成功獲取到同步狀態的節點,而頭節點釋放同步狀態後,將會喚醒其後繼節點,後繼節點被喚醒後須要檢查本身是否爲頭節點
    ②.保持FIFO同步隊列原則
    this

    阻塞

    加入隊列後,會自旋不斷獲取同步狀態,可是自旋過程當中須要判斷當前線程是否須要阻塞

    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
        
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 前驅節點等待狀態
            int ws = pred.waitStatus;
            // 若前驅節點狀態爲SIGNAL,代表當前節點處於等待狀態,返回true
            if (ws == Node.SIGNAL)
                return true;
            // 若前驅節點狀態>0即取消狀態,代表前驅節點已經等待超時或者被中斷了,須要從同步隊列中取消
            if (ws > 0) {
                // 循環遍歷,直至處於當前節點前面的節點不爲取消狀態爲止
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            // 前驅節點狀態爲CONDITION,PROPAGATE
            } else {
                // CAS設置前驅節點狀態爲SINNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    複製代碼

    若shouldParkAfterFailedAcquire返回true,會調用parkAndCheckInterrupt方法,其方法內部主要調用LockSupport工具類的park()方法阻塞線程

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            // 返回當前線程的中斷狀態
            return Thread.interrupted();
        }
    複製代碼

    acquire()方法流程

  • 獨佔式釋放同步狀態
  • 當線程獲取同步狀態後,執行完相應邏輯後就須要釋放同步狀態,AQS提供了release()方法釋放同步狀態,方法在釋放了同步狀態以後,會喚醒其後繼節點(進而使後繼節點從新嘗試獲取同步狀態),一樣自定義同步器須要重寫tryRelease()釋放同步狀態

    public final boolean release(int arg) { // 若釋放同步狀態成功 if (tryRelease(arg)) { // 獲取同步隊列頭節點 Node h = head; if (h != null && h.waitStatus != 0) // 喚醒頭節點的後繼節點 unparkSuccessor(h); return true; } return false; }
    private void unparkSuccessor(Node node) {
        // 獲取當前節點等待狀態
        int ws = node.waitStatus;
        // 若狀態爲SIGNAL、CONDITION或PROPAGATE,CAS將其狀態置爲0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        // 獲取後繼節點
        Node s = node.next;
        // 若後繼節點爲null或其狀態爲CANCELLED(等待超市或者被中斷)
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 從尾結點遍歷
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 喚醒節點所關聯的線程
            LockSupport.unpark(s.thread);
    }
    複製代碼
    複製代碼private void unparkSuccessor(Node node) { // 獲取當前節點等待狀態 int ws = node.waitStatus; // 若狀態爲SIGNAL、CONDITION或PROPAGATE,CAS將其狀態置爲0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取後繼節點 Node s = node.next; // 若後繼節點爲null或其狀態爲CANCELLED(等待超市或者被中斷) if (s == null || s.waitStatus > 0) { s = null; // 從尾結點遍歷 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒節點所關聯的線程 LockSupport.unpark(s.thread); } 複製代碼複製代碼

    從源碼中能夠發現喚醒的節點從尾遍歷而不是從頭遍歷,緣由是當前節點的後繼可能爲null、等待超時或被中斷,因此從尾部向前進行遍

    共享式同步狀態獲取與釋放

    共享式獲取與獨佔式獲取最主要的區別在於同一時刻可否有多個線程同時獲取到同步狀 態,例如ReentrantReadWriteLock中的讀鎖

  • 共享式獲取同步狀態
  • public final void acquireShared(int arg) {
            // 若獲取失敗自旋再次嘗試
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    複製代碼
    首先tryAcquireShared()嘗試獲取同步狀態,若返回值大於等於0時代表獲取成功,不然調用doAcquireShared()自旋獲取同步狀態
    private void doAcquireShared(int arg) {
            // 將共享節點加入同步隊列
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                // 中斷標記
                boolean interrupted = false;
                // 死循環
                for (;;) {
                    // 獲取前驅節點
                    final Node p = node.predecessor();
                    // 若前驅節點爲頭節點
                    if (p == head) {
                        // 獲取同步
                        int r = tryAcquireShared(arg);
                        // 若獲取成功
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    // 判斷線程是否須要阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    複製代碼

  • 共享式釋放同步狀態
  • public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    複製代碼
    釋放了同步狀態以後,會喚醒後續處於等待狀態的節點,一樣自定義同步器須要重寫tryRelease()釋放同步狀態。不過由於是共享,會存在多個線程同時釋放同步狀態,因此 採用CAS,當CAS操做失敗自旋循重試

    超時獲取同步狀態

    使用內置鎖synchronized同步,可能會形成死鎖,而AQS提供了超時獲取同步狀態,即在指定時間段內獲取同步狀態

  • 獨佔式超時獲取同步狀態
  • 相對於上面介紹的acquire()方法(此方法沒法響應中斷),AQS爲了響應中斷額外提供了acquireInterruptibly()方法,若當前線程被中斷會當即響應中斷拋出異常
    public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
    複製代碼
    該方法首先判斷線程是否中斷,如果拋出異常;不然執行tryAcquire()方法獲取同步狀態,獲取成功直接結束不然執行doAcquireInterruptibly(),與acquireQueued()相似,最大區別在於其再也不使用interrupted標誌,直接拋出InterruptedException異常
    private void doAcquireInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    複製代碼

    tryAcquireNanos()方法超時獲取同步狀態是響應中斷獲取同步狀態的"加強版",增長了 超時控制

    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        // 超時時間    
        final long deadline = System.nanoTime() + nanosTimeout;
        // 將獨佔節點
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 死循環自旋
            for (;;) {
                // 獲取前驅節點
                final Node p = node.predecessor();
                // 若前驅節點爲頭節點且獲取同步狀態成功
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 若獲取失敗,判斷是否超時
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                // 判斷線程是否中斷    
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    複製代碼
    複製代碼private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 超時時間 final long deadline = System.nanoTime() + nanosTimeout; // 將獨佔節點 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 死循環自旋 for (;;) { // 獲取前驅節點 final Node p = node.predecessor(); // 若前驅節點爲頭節點且獲取同步狀態成功 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 若獲取失敗,判斷是否超時 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 判斷線程是否中斷 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } 複製代碼複製代碼
    其思路:首先記錄deadline超時時間獲取同步狀態,若獲取失敗判斷是否超時,沒有超時則計算剩餘等待時間,若剩餘時間小於等於0代表已經超時,若沒有則判斷是否大於spinForTimeoutThreshold(1000L),若是大於使用阻塞方式等待,不然仍然自旋等待,使用了LockSupport.parkNanos()方法來實現限時地等待,並支持中斷

  • 共享式超時獲取同步狀態
  • 共享式獲取響應中斷doAcquireSharedInterruptibly()方法與共享式獲取同步狀態也相似,區別也是再也不使用interrupted標誌,直接拋出InterruptedException異常。共享式超時獲取大致思路也差很少,再也不多述。

    感謝

    《java併發編程的藝術》

    相關文章
    相關標籤/搜索