AQS解析與實戰

前言

前段時間在面試,發現面試官都有問到同步器AQS的相關問題。AQS爲Java中幾乎全部的鎖和同步器提供一個基礎框架,派生出如ReentrantLock、Semaphore、CountDownLatch等AQS全家桶。本文基於AQS原理的幾個核心點,談談對AbstractQueuedSynchronizer的理解,並實現一個自定義同步器。java

AQS原理面試題的核心回答要點

  1. state 狀態的維護。
  2. CLH隊列
  3. ConditionObject通知
  4. 模板方法設計模式
  5. 獨佔與共享模式。
  6. 自定義同步器。
  7. AQS全家桶的一些延伸,如:ReentrantLock等。

AQS的類圖結構

AQS全稱是AbstractQueuedSynchronizer,即抽象同步隊列。下面看一下AQS的類圖結構: node

爲了方便下面幾個關鍵點的理解,你們先熟悉一下AQS的類圖結構面試

state 狀態的維護

在AQS中維持了一個單一的共享狀態state,來實現同步器同步。看一下state的相關代碼以下:
複製代碼

state源碼

/** * The synchronization state. */
  private volatile int state;

  /** * Returns the current value of synchronization state. * This operation has memory semantics of a {@code volatile} read. * @return current state value */
  protected final int getState() {
      return state;
  }

  /** * Sets the value of synchronization state. * This operation has memory semantics of a {@code volatile} write. * @param newState the new state value */
  protected final void setState(int newState) {
      state = newState;
  }

  /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */
  protected final boolean compareAndSetState(int expect, int update) {
      // See below for intrinsics setup to support this
      return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
  }
複製代碼

state 源碼設計幾個回答要點:

  • state用volatile修飾,保證多線程中的可見性。
  • getState()和setState()方法採用final修飾,限制AQS的子類重寫它們兩。
  • compareAndSetState()方法採用樂觀鎖思想的CAS算法,也是採用final修飾的,不容許子類重寫。

CLH隊列

談到CLH隊列,咱們結合以上state狀態,先來看一下AQS原理圖算法

CLH(Craig, Landin, and Hagersten locks) 同步隊列 是一個FIFO雙向隊列,其內部經過節點head和tail記錄隊首和隊尾元素,隊列元素的類型爲Node。AQS依賴它來完成同步狀態state的管理,當前線程若是獲取同步狀態失敗時,AQS則會將當前線程已經等待狀態等信息構形成一個節點(Node)並將其加入到CLH同步隊列,同時會阻塞當前線程,當同步狀態釋放時,會把首節點喚醒(公平鎖),使其再次嘗試獲取同步狀態。編程

Node節點

CLH同步隊列中,一個節點表示一個線程,它保存着線程的引用(thread)、狀態(waitStatus)、前驅節點(prev)、後繼節點(next),condition隊列的後續節點(nextWaiter)以下圖:設計模式

waitStatus幾種狀態狀態:安全

咱們再看一下CLH隊列入列以及出列的代碼:bash

入列

CLH隊列入列就是tail指向新節點、新節點的prev指向當前最後的節點,當前最後一個節點的next指向當前節點。addWaiter方法以下:多線程

//構造Node
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節點)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設置尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //屢次嘗試
        enq(node);
        return node;
        }
複製代碼

由以上代碼可得,addWaiter設置尾節點失敗的話,調用enq(Node node)方法設置尾節點,enq方法以下:併發

private Node enq(final Node node) {
        //死循環嘗試,知道成功爲止
        for (;;) {
            Node t = tail;
            //tail 不存在,設置爲首節點
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
複製代碼

出列

首節點的線程釋放同步狀態後,將會喚醒它的後繼節點(next),然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點。能夠看一下如下兩段源碼:

Node h = head;
  if (h != null && h.waitStatus != 0)
  unparkSuccessor(h);
複製代碼
private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

複製代碼

CLH核心幾個回答要點

  • 雙向鏈表入列出列
  • CAS算法設置尾節點+死循環自旋。

CAS算法,能夠看一下我工做實戰中仿造CAS算法解決併發問題的實現 juejin.im/post/5d0616…

ConditionObject

ConditionObject簡介

咱們都知道,synchronized控制同步的時候,能夠配合Object的wait()、notify(),notifyAll() 系列方法能夠實現等待/通知模式。而Lock呢?它提供了條件Condition接口,配合await(),signal(),signalAll() 等方法也能夠實現等待/通知機制。ConditionObject實現了Condition接口,給AQS提供條件變量的支持

Condition隊列與CLH隊列的那些事

咱們先來看一下圖:

ConditionObject隊列與CLH隊列的愛恨情仇:

  • 調用了await()方法的線程,會被加入到conditionObject等待隊列中,而且喚醒CLH隊列中head節點的下一個節點。
  • 線程在某個ConditionObject對象上調用了singnal()方法後,等待隊列中的firstWaiter會被加入到AQS的CLH隊列中,等待被喚醒。
  • 當線程調用unLock()方法釋放鎖時,CLH隊列中的head節點的下一個節點(在本例中是firtWaiter),會被喚醒。

區別:

  • ConditionObject對象都維護了一個單獨的等待隊列 ,AQS所維護的CLH隊列是同步隊列,它們節點類型相同,都是Node。

獨佔與共享模式。

AQS支持兩種同步模式:獨佔式和共享式。

獨佔式

同一時刻僅有一個線程持有同步狀態,如ReentrantLock。又可分爲公平鎖和非公平鎖。

公平鎖: 按照線程在隊列中的排隊順序,有禮貌的,先到者先拿到鎖。

非公平鎖: 當線程要獲取鎖時,無視隊列順序直接去搶鎖,不講道理的,誰搶到就是誰的。

acquire(int arg)是獨佔式獲取同步狀態的方法,咱們來看一下源碼:

  • acquire(long arg)方法
public final void acquire(long arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
複製代碼
  • addWaiter方法
//構造Node
private Node addWaiter(Node mode) {
  Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure(快速嘗試添加尾節點)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            //CAS設置尾節點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //屢次嘗試
        enq(node);
        return node;
        }
複製代碼
  • acquireQueued(final Node node, long arg)方法
final boolean acquireQueued(final Node node, long arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼
  • selfInterrupt()方法
static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

複製代碼

結合源代碼,可得acquire(int arg)方法流程圖,以下:

共享式

多個線程可同時執行,如Semaphore/CountDownLatch等都是共享式的產物。

acquireShared(long arg)是共享式獲取同步狀態的方法,能夠看一下源碼:

public final void acquireShared(long arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
複製代碼

由上可得,先調用tryAcquireShared(int arg)方法嘗試獲取同步狀態,若是獲取失敗,調用doAcquireShared(int arg)自旋方式獲取同步狀態,方法源碼以下:

private void doAcquireShared(long arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    long r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
複製代碼

AQS的模板方法設計模式

模板方法模式

模板方法模式: 在一個方法中定義一個算法的骨架,而將一些步驟延遲到子類中。模板方法使得子類能夠在不改變算法結構的狀況下,從新定義算法中的某些步驟。

模板方法模式生活中的例子: 假設咱們要去北京旅遊,那麼咱們能夠坐高鐵或者飛機,或者火車,那麼定義交通方式的抽象類,能夠有如下模板:買票->安檢->乘坐xx交通工具->到達北京。讓子類繼承該抽象類,實現對應的模板方法。

AQS定義的一些模板方法以下:

isHeldExclusively()//該線程是否正在獨佔資源。只有用到condition才須要去實現它。
tryAcquire(int)//獨佔方式。嘗試獲取資源,成功則返回true,失敗則返回false。
tryRelease(int)//獨佔方式。嘗試釋放資源,成功則返回true,失敗則返回false。 tryAcquireShared(int)//共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩餘可用資源;正數表示成功,且有剩餘資源。
tryReleaseShared(int)//共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。

簡言之,就是AQS提供tryAcquire,tryAcquireShared等模板方法,給子類實現自定義的同步器

自定義同步器。

基於以上分析,咱們都知道state,CLH隊列,ConditionObject隊列 等這些關鍵點,你要實現自定義鎖的話,首先須要肯定你要實現的是獨佔鎖仍是共享鎖,定義原子變量state的含義,再定義一個內部類去繼承AQS,重寫對應的模板方法

咱們來看一下基於 AQS 實現的不可重入的獨佔鎖的demo,來自《Java併發編程之美》:

public class NonReentrantLock implements Lock,Serializable{

    //內部類,自定義同步器
    static class Sync extends AbstractQueuedSynchronizer {
        //是否鎖已經被持有
        public boolean isHeldExclusively() {
            return getState() == 1;
        }
        //若是state爲0 則嘗試獲取鎖
        public boolean tryAcquire(int arg) {
            assert arg== 1 ;
            //CAS設置狀態,能保證操做的原子性,當前爲狀態爲0,操做成功狀態改成1
            if(compareAndSetState(0, 1)){
                //設置當前獨佔的線程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        //嘗試釋放鎖,設置state爲0
        public boolean tryRelease(int arg) {
            assert arg ==1;
            //若是同步器同步器狀態等於0,則拋出監視器非法狀態異常
            if(getState() == 0)
                throw new IllegalMonitorStateException();
            //設置獨佔鎖的線程爲null
            setExclusiveOwnerThread(null);
            //設置同步狀態爲0
            setState(0);
            return true;
        }
        //返回Condition,每一個Condition都包含了一個Condition隊列
        Condition newCondition(){
            return new ConditionObject();
        }
    }
    //建立一個Sync來作具體的工做
    private final Sync sync= new Sync ();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
        @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }


    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
    }
複製代碼

NonReentrantLockDemoTest:

public class NonReentrantLockDemoTest {

    private static NonReentrantLock nonReentrantLock = new NonReentrantLock();

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(() -> {
                nonReentrantLock.lock();
                try {
                    System.out.println(Thread.currentThread().getName());
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    nonReentrantLock.unlock();
                }
            });
            thread.start();
        }
    }
}

複製代碼

運行結果:

AQS全家桶實戰

AQS派生出如ReentrantLock、Semaphore等AQS全家桶,接下來能夠看一下它們的使用案例。

ReentrantLock

ReentrantLock介紹

  • ReentrantLock爲重入鎖,可以對共享資源可以重複加鎖,是實現Lock接口的一個類。
  • ReentrantLock支持公平鎖和非公平鎖兩種方式

ReentrantLock案例

使用ReentrantLock來實現個簡單線程安全的list,以下:

public class ReentrantLockList {
    // 線程不安全的list
    private ArrayList<String> array = new ArrayList<>();
    //獨佔鎖
    private volatile ReentrantLock lock = new ReentrantLock();

    //添加元素
    public  void add(String e){
        lock.lock();
        try {
            array.add(e);
        }finally {
            lock.unlock();
        }
    }

    //刪除元素
    public void remove(String e){
        lock.lock();
        try {
            array.remove(e);
        }finally {
            lock.unlock();
        }
    }
    //獲取元素
    public String get(int index){
        lock.lock();
        try {
            return array.get(index);
        }finally {
            lock.unlock();
        }
    }
}
複製代碼

Semaphore

Semaphore介紹

  • Semaphore也叫信號量,能夠用來控制資源併發訪問的線程數量,經過協調各個線程,以保證合理的使用資源。

Semaphore案例

Java多線程有一到比較經典的面試題:ABC三個線程順序輸出,循環10遍。

public class ABCSemaphore {

    private static Semaphore A = new Semaphore(1);
    private static Semaphore B = new Semaphore(1);
    private static Semaphore C = new Semaphore(1);


    static class ThreadA extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    A.acquire();
                    System.out.print("A");
                    B.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class ThreadB extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    B.acquire();
                    System.out.print("B");
                    C.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class ThreadC extends Thread {

        @Override
        public void run() {
            try {
                for (int i = 0; i < 10; i++) {
                    C.acquire();
                    System.out.print("C");
                    A.release();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) throws InterruptedException {
        // 開始只有A能夠獲取, BC都不能夠獲取, 保證了A最早執行
        B.acquire();
        C.acquire();
        new ThreadA().start();
        new ThreadB().start();
        new ThreadC().start();
    }
複製代碼

參考

我的公衆號

歡迎你們關注,你們一塊兒學習,一塊兒討論。

相關文章
相關標籤/搜索