【Interview】什麼是AQS隊列同步器

什麼是AQS

  • AbstractQueuedSynchronizer是一個隊列同步器,是用來構建鎖和其它同步組件的基礎框架,它使用一個volatile修飾的int成員變量表示同步狀態,經過內置的FIFO隊列來完成資源獲取線程排隊的工做
  • 經過改變int成員變量state來表示鎖是否獲取成功,當state>0表示鎖獲取成功,當state=0時說明鎖釋放成功。提供了三個方法(getState()setState(int newState)compareAndSetState(int expect,int update))來對同步狀態state進行操做,AQS確保對state操做時線程安全的。
  • 主要使用方式是繼承,子類經過繼承同步器並實現它的抽像方法來管理同步狀態。
  • 提供獨佔式和共享式兩種方式來操做同步狀態的獲取與釋放
  • ReentrantLock、ReentrantReadWriteLock、Semaphore等就併發工具就是基於護一個內部幫助器類集成AQS來實現的的

AQS提供的方法(列出主要幾個)

  • acquire(int arg) 以獨佔模式獲取對象,忽略中斷。
  • acquireInterruptibly(int arg) 以獨佔模式獲取對象,若是被中斷則停止。
  • acquire(int arg) 以獨佔模式獲取對象,忽略中斷。
  • acquireShared(int arg) 以共享模式獲取對象,忽略中斷。
  • acquireSharedInterruptibly(int arg) 以共享模式獲取對象,若是被中斷則停止。
  • compareAndSetState(int expect, int update) 若是當前狀態值等於預期值,則以原子方式將同步狀態設置爲給定的更新值。
  • getState() 返回同步狀態的當前值。
  • release(int arg) 以獨佔模式釋放對象。
  • releaseShared(int arg) 以共享模式釋放對象。
  • setState(int newState) 設置同步狀態的值。
  • tryAcquire(int arg) 試圖在獨佔模式下獲取對象狀態。
  • tryAcquireNanos(int arg, long nanosTimeout) 試圖以獨佔模式獲取對象,若是被中斷則停止,若是到了給定超時時間,則會失敗。
  • tryAcquireShared(int arg) 試圖在共享模式下獲取對象狀態。
  • tryAcquireSharedNanos(int arg, long nanosTimeout) 試圖以共享模式獲取對象,若是被中斷則停止,若是到了給定超時時間,則會失敗。
  • tryReleaseShared(int arg) 試圖設置狀態來反映共享模式下的一個釋放。

同步器可重寫的方法

隊列同步器的實現分析

  • 同步器依賴內部的同步隊列(一個FIFO雙向隊列)來完成同步狀態的管理,當前線程獲取同步狀態失敗時,同步器會將當前線程以及等待狀態等信息構形成爲一個節點(Node)並將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點中的線程喚醒,使其再次嘗試獲取同步狀態。
static final class Node {
        /** 表示節點正在共享模式中等待 */
        static final Node SHARED = new Node();
        /** 表示節點正在獨佔模式下等待 */
        static final Node EXCLUSIVE = null;

        /** 表示取消狀態,同步隊列中等待的線程等待超時或中斷,須要從同步隊列中取消等待,節點進入該值不會發生變化 */
        static final int CANCELLED =  1;
        /** 後續節點的線程處於等待狀態,而當前節點的線程若是釋放了同步狀態或者取消,將會通知後續節點運行*/
        static final int SIGNAL    = -1;
        /** 節點在等待中,節點線程等待在Conditions上。當其餘線程對Condition調用了signal()後,該節點將會從等待隊列中轉移到同步隊列中,加入到同步狀態的獲取中 */
        static final int CONDITION = -2;
        /** * 表示下一次共享式同步狀態獲取將會無條件傳播下去 */
        static final int PROPAGATE = -3;

        volatile int waitStatus;

       /**前驅節點**/
        volatile Node prev;
        /**後繼節點**/
        volatile Node next;

  
        volatile Thread thread;


        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

複製代碼

節點是構成同步隊列的基礎,同步器擁有首節點(head)和尾節點(tail),沒有成功獲取同步狀態的線程將會成爲節點加入該隊列的尾部,同步隊列的java

同步器的acquire方法(獲取)

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼
  • 調用自定義同步器實現的tryAcquire(int arg)方法,該方法保證線程安全的獲取同步狀態,若是同步狀態獲取失敗,則構造同步節點並經過addWaiter(Node node)方法將該節點加入到同步隊列的尾部,最後調用acquireQueued(Node node,int arg)方法,使得該 節點以"死循環"(自旋)的方式獲取同步狀態。

獨佔式的獲取與釋放總結

  • 在獲取同步狀態時,同步器維護一個同步隊列,獲取狀態失敗的線程都會被加入到隊列中並在隊列中進行自旋;移出隊列(或中止自旋)的條件是前驅節點爲頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int arg)方法釋放同步狀態,而後喚醒頭節點的後繼節點

同步器的release方法(釋放)

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
複製代碼
  • 該方法執行時,會喚醒頭節點的後繼節點線程,unparkSuccessor(Node node)方法使用LockSupport來喚醒處於等待狀態的線程。

基於AQS實現一個簡單的可重入的獨佔式鎖的獲取與釋放

package com.example.juc;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

/** * 基於AQS實現一個簡單的鎖 * * @author qinxuewu * @create 19/3/18下午11:44 * @since 1.0.0 */
public class MyAQSLock implements Lock {
    private  final  MySync sync=new MySync();

    /** * 構建一個內部幫助器 集成AQS */
    private  static  class MySync extends AbstractQueuedSynchronizer{
        //狀態爲0時獲取鎖,

        /*** * 一個線程進來時,若是狀態爲0,就更改state變量,返回true表示拿到鎖 * * 當state大於0說明當前鎖已經被持有,直接返回false,若是重複進來,就累加state,返回true * @param arg * @return */
        @Override
        protected boolean tryAcquire(int arg) {
            //獲取同步狀態狀態的成員變量的值
            int state=getState();
            Thread cru=Thread.currentThread();
            if(state==0){
                //CAS方式更新state,保證原子性,指望值,更新的值
                if( compareAndSetState(0,arg)){
                    //設置成功
                    //設置當前線程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return  true;
                }
            }else if(Thread.currentThread()==getExclusiveOwnerThread()){
                    //若是仍是當前線程進來,累加state,返回true 可重入
                    setState(state+1);
                    return  true;
            }
            return false;
        }

        /** * 釋放同步狀態 * @param arg * @return */
        @Override
        protected boolean tryRelease(int arg) {
            boolean flag=false;
            //判斷釋放操做是不是當前線程,
            if(Thread.currentThread()==getExclusiveOwnerThread()){

                    //獲取同步狀態成員變量,若是大於0 才釋放
                    int state=getState();
                    if(getState()==0){
                        //當前線程置爲null
                        setExclusiveOwnerThread(null);
                        flag=true;
                    }
                    setState(arg);

            }else{
                //不是當線程拋出異常
                throw  new RuntimeException();
            }
            return flag;
        }
        Condition newCondition(){
            return  new ConditionObject();
        }
    }

    @Override
    public void lock() {
            sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /** * 加鎖 * @return */
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1,unit.toNanos(time));
    }

    /** * 釋放鎖 */
    @Override
    public void unlock() {
        sync.tryRelease(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

複製代碼

測試node

public class MyAQSLockTest {
    MyAQSLock lock=new MyAQSLock();
    private    int i=0;
    public int next() {
        try {
            lock.lock();
            try {
                Thread.sleep(300);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i++;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return 0;
    }

    public void test1(){
        System.out.println("test1");
        test2();
    }
    public void test2(){
        System.out.println("test2");
    }

    public static void main(String[] args){
        MyAQSLockTest test=new MyAQSLockTest();
// Thread thread = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
//
// System.out.println(Thread.currentThread().getName() + "-" + test.next());
//
// }
//
// }
// });
// thread.start();
//
// Thread thread2 = new Thread(new Runnable() {
// @Override
// public void run() {
// while (true) {
//
// System.out.println(Thread.currentThread().getName() + "-" + test.next());
//
// }
//
// }
// });
// thread2.start();

        //可重複鎖演示
        Thread thread3 = new Thread(new Runnable() {
            @Override
            public void run() {
                test.test1();

            }
        });
        thread3.start();
    }
}

複製代碼
相關文章
相關標籤/搜索