【併發編程】J.U.C 之 AQS 介紹、實現及其子類使用演示

AQS 介紹

AQS全稱AbstractQueuedSynchronizer.java

AQS底層的數據結構是雙向鏈表,是隊列的一種實現,所以也能夠把它看成一個隊列。數組

其中Sync queue是同步隊列,其head節點主要用於後期調度。這裏的head節點就是佔用資源的線程,後面的都是等待資源的線程。安全

下面的Condition queue 是一個單項鍊表,它不是必須的,只有當程序中須要使用到condition的時候纔會存在,而且可能會有多個condition queue數據結構

AQS的設計思想:併發

  • 1.使用Node實現FIFO隊列,可用於構建鎖或者其餘同步裝置的基礎框架。
  • 2.利用了一個int類型表示狀態。(state)
  • 3.使用方法是繼承。
  • 4.子類經過繼承並經過實現它的方法管理狀態(acquirerelease)的方法操縱狀態。
  • 5.能夠同時實現排他鎖和共享鎖模式。

具體實現的大體思路:框架

AQS內部維護了一個CLH隊列來管理鎖,線程會首先嚐試獲取鎖,若是失敗就將當前線程以及等待狀態等信息包成一個Node節點,加入到同步隊列Sync queue中,接着會不斷循環嘗試獲取鎖,條件是當前節點爲head的直接後繼纔會嘗試,若是失敗就會阻塞本身,直到本身被喚醒。當持有鎖的線程釋放鎖的時候會喚醒隊列中的後繼線程。jvm

AQS同步組件性能

基於這些設計思路JDK提供了不少基於AQS的子類。學習

  • CountDownLatch:閉鎖,經過計數來保證線程是否須要一直阻塞。
  • Semaphore:能控制同一時間併發線程的數目。
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

CountDownLatch

CountDownLatch是一個同步輔助類,經過它能夠完成相似於阻塞當前線程的功能,也就是一個或多個線程一直等待直到其餘線程執行完成。CountDownLatch用了一個給定的計數器來進行初始化,該計數器的操做是原子操做,即同時只能有一個線程操做該計數器,調用該類await方法的線程會一直處於阻塞狀態,直到其餘線程調用countDown方法時計數器的值變成0,每次調用countDown時計數器的值會減1,當計數器的值爲0時全部因await方法而處於等待狀態的線程就會繼續執行。這種操做至多出現一次,由於這裏的計數器是不能被重置的。優化

使用場景

並行計算,處理量很大時能夠將運算任務拆分紅多個子任務,當全部子任務都完成以後,父任務再將全部子任務都結果進行彙總。

Coding演示

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++){
            final int thradNum = i;
            executorService.execute(() -> {
                try {
                    test(thradNum);
                } catch (Exception e){
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        executorService.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
複製代碼

運行結果

能夠看到finish是在全部方法都執行完後才執行的,也就是計數器減到0了才能執行。

Semaphore

相似於操做系統裏的信號量,能夠控制某個資源可被同時訪問的線程數。

Coding演示

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++){
            final int thradNum = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire(); // 獲取一個許可
                    test(thradNum);
                    semaphore.release(); // 釋放一個許可
                } catch (Exception e){
                    log.error("exception", e);
                }
            });
        }
        executorService.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
複製代碼

運行結果是以每次3個輸出的,總共輸出20個。

實際開發中有時須要拿到多個許可才容許執行,下面來演示一下須要獲取多個許可的狀況。

semaphore.acquire(3); // 獲取3個許可
test(thradNum);
semaphore.release(3); // 釋放3個許可
複製代碼

這樣運行之後就是每次只輸出1條結果,由於Semaphore總共只有3個許可,而這裏一次操做須要獲取3個許可。

當線程數過多時咱們可使用嘗試獲取許可的方法

if (semaphore.tryAcquire()){
    test(thradNum);
    semaphore.release(); // 釋放一個許可
}
複製代碼

這種狀況下若是獲取不到許可就會被直接丟棄。

運行結果只輸出了3條結果。

咱們來看一下tryAcquire方法的實現

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}
複製代碼

能夠看到傳入的參數爲申請的許可數,也就是獲取多個許可。

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
複製代碼

這裏的參數爲超時時間和時間單位,意味着在嘗試獲取許可時能夠等待一段時間。

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
複製代碼

這裏就是結合上面兩種來使用的狀況。

CyclicBarrier

CyclicBarrier也是一個同步輔助類,它容許一組線程相互等待直到到達某個公共的屏障點,經過它能夠完成多個線程之間相互等待時,只有當每一個線程都準備就緒後才能各自繼續執行後面的操做。它也是經過計數器來實現,當某個線程調用await方法後就進入等待狀態,計數器執行加一操做。當計數器的值達到了設置的初始值時等待狀態的線程會被喚醒繼續執行。因爲CyclicBarrier在釋放等待線程後能夠重用,因此能夠稱之爲循環屏障。

注意: CyclicBarrier的計數器能夠重置。

Coding演示

@Slf4j
public class CyclicBarrierExample1 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
複製代碼

運行結果

ready狀態時日誌是每秒輸出一條,當有5條ready時會一次性輸出5條continue。這就是前面講的所有線程準備就緒後同時開始執行。

在初始化CyclicBarrier時還能夠在等待線程數後指定一個runnable,含義是當線程到達這個屏障時優先執行這裏的runnable

private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
    log.info("call back is ready.");
});
複製代碼

運行結果

ReentrantLock

Java中的鎖主要分紅兩類:

  • synchronized修飾的鎖
  • J.U.C 裏提供的鎖

ReentrantLock屬於第二類,下面經過與第一類鎖的對比來學習這個鎖

ReentrantLock(可重入鎖)和synchronized 區別

  • 1.鎖的實現:synchronized是依賴於jvm實現的,ReentrantLock是依賴jdk實現的。
  • 2.性能的區別:在sync優化之後,二者性能差很少,均可用時更建議使用sync,由於它的寫法更容易。
  • 3.功能的區別:鎖的靈活性ReentrantLock優於sync。
  • 4.ReentrantLock能夠指定是公平鎖仍是非公平鎖,而sync只能是非公平鎖。
  • 5.ReentrantLock提供了一個Condition類,能夠分組喚醒須要的線程。sync只能隨機喚醒一個線程或者喚醒所有線程。
  • 6.ReentrantLock提供了可以中斷等待鎖的線程的機制。

ReentrantLock的實現是一種自旋鎖,經過循環調用CAS操做來實現加鎖,它性能比較好的緣由之一就是避免了線程進入內核態的阻塞狀態。

Coding演示

@Slf4j
public class LockExample2 {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    public static int count = 0;

    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add(){
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }

    }
}
複製代碼

執行結果始終是5000。

咱們點進ReentrantLock來看一下它作了什麼事情

/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */
public ReentrantLock() {
    sync = new NonfairSync();
}
複製代碼

這裏默認給了一個不公平的鎖。

也能夠經過傳入true或者false來選擇公平鎖仍是不公平鎖

/** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}
複製代碼

下面來看tryLock方法

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
複製代碼

tryLock的含義是僅在調用時鎖定未被另外一個線程保持的狀況下才獲取鎖定。第二個傳入時間參數的含義是若是鎖定在給定的等待時間內沒有被另外一個線程保持且當前線程沒有被中斷,則獲取這個鎖定。使用這兩個方法時必定要理解清楚這兩個方法的含義。

ReentrantReadWriteLock

下面來介紹JUClocks包內另一個鎖ReentrantReadWriteLock

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
    /** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** Performs all synchronization mechanics */
    final Sync sync;
}
複製代碼

能夠看到它裏面有兩個鎖:讀鎖和寫鎖。

它是在沒有任何讀寫鎖的狀況下才能取得寫入的鎖。

它能夠用於實現了悲觀讀取,即當執行中進行讀取時,可能有另外一個進程要寫入的需求,爲了保持同步就須要ReentrantReadWriteLock的讀取鎖定。可是若是讀取不少,寫入不多的狀況下,使用ReentrantReadWriteLock可能會使寫入線程遭遇飢餓,即寫入線程長期處於等待狀態。

Coding演示

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

}
複製代碼

這裏實現了對Map的讀寫操做的同步,經過對內部方法的封裝使外部在調用方法時不須要考慮同步問題。

須要注意的是ReentrantReadWriteLock實現的是悲觀讀取,若是想得到寫入鎖時堅定不容許有任何讀鎖還保持着,即當全部讀操做作完時才容許寫操做,所以可能會形成寫操做的線程飢餓。

StampedLock

StampedLock控制鎖有三種方式,分別是:寫,讀,樂觀讀。

StampedLock的狀態由版本和模式兩個部分組成。鎖獲取方法返回的是一個數組做爲票據(Stamp),用相應的鎖狀態來表示和控制相關的訪問,輸出0表示沒有寫鎖被受權訪問,在讀鎖上分爲悲觀鎖和樂觀鎖。

Coding演示

@Slf4j
public class LockExample5 {

    /** * 請求總數 */
    public static int clientTotal = 5000;
    /** * 同時併發執行線程數 */
    public static int threadTotal = 200;

    public static int count = 0;

    private static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal; i++){
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e){
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add(){
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
            lock.unlock(stamp);
        }

    }
}
複製代碼

運行結果始終爲5000,線程安全。

StampLock對吞吐量有巨大改進,特別是在讀線程愈來愈多的場景下。

注意:除了sync,其餘鎖都要在使用完後釋放鎖。

總結

介紹了這麼多種鎖,咱們來總結一下各個鎖使用的場景

  • 當只有少許競爭者時sync是很好的選擇。
  • 競爭者很多,但線程增加的趨勢可以預估時,ReentrantLock適合。

sync因爲是jvm自動解鎖,因此確定不會形成死鎖,而其餘鎖可能由於使用不當形成死鎖。

Written by Autu

2019.7.20

相關文章
相關標籤/搜索