【併發編程】【JDK源碼】J.U.C--AQS 及其同步組件(2/2)

原文:慕課網高併發實戰(七)- J.U.C之AQShtml

【併發編程】【JDK源碼】AQS (AbstractQueuedSynchronizer)(1/2)中簡要介紹了AQS的概念和基本原理,下面繼續對AQS進行分析。java

AQS設計原理

數據結構

  • 底層是雙向鏈表,隊列的一種實現。
  • Sync queue:同步隊列,head節點主要負責後面的調度。
  • Condition queue:單向鏈表,不是必須的的,也能夠有多個。

設計原理

  • 使用Node實現FIFO隊列,能夠用於構建鎖或者其餘同步裝置的基礎框架
  • 利用了一個int類型標示狀態,有一個state的成員變量,表示獲取鎖的線程數(0沒有線程獲取鎖,1有線程獲取鎖,大於1表示重入鎖的數量),和一個同步組件ReentrantLock
  • 使用方法是繼承,基於模板方法
  • 子類經過繼承並經過實現它的方法管理其狀態{acquire和release}的方法操做狀態
  • 能夠實現排它鎖和共享鎖的模式(獨佔、共享)

具體實現的思路

一、首先 AQS內部維護了一個CLH隊列,來管理鎖。
二、線程嘗試獲取鎖,若是獲取失敗,則將等待信息等包裝成一個Node結點,加入到同步隊列Sync queue裏。
三、不斷從新嘗試獲取鎖(當前結點爲head的直接後繼纔會 嘗試),若是獲取失敗,則會阻塞本身,直到被喚醒。
四、當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程。數據庫

同步組件

下面幾個主要同步組件:編程

CountDownLatch
Semaphore
CyclicBarrier安全

ReentrantLock
Condition
FutureTask數據結構

CountDownLatch

同步阻塞類,能夠完成阻塞線程的功能
多線程

程序執行須要等待某個條件完成後,才能進行後面的操做。好比父任務等待全部子任務都完成的時候,再繼續往下進行。
實例一併發

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    // 爲防止出現異常,放在finally更保險一些
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}

實例二
好比有多個線程完成一個任務,可是這個任務只想給他一個指定的時間,超過這個任務就不繼續等待了。完成多少算多少。框架

@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
           // 放在這裏沒有用的,由於這時候仍是在主線程中阻塞,阻塞完之後纔開始執行下面的await
           // Thread.sleep(1);
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
       // 等待指定的時間 參數1:等待時間 參數2:時間單位
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        log.info("finish");
       // 並非第一時間內銷燬掉全部線程,而是先讓正在執行線程執行完
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
    }
}

Semaphore

僅能提供有限訪問的資源:好比數據庫的鏈接數最大隻有20,而上層的併發數遠遠大於20,這時候若是不作限制,可能會因爲沒法獲取鏈接而致使併發異常,這時候可使用Semaphore來進行控制,當信號量設置爲1的時候,就和單線程很類似了。高併發

實例一:每次獲取1個許可

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 獲取一個許可
                    test(threadNum);
                    semaphore.release(); // 釋放一個許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

實例2:一次性獲取多個許可

@Slf4j
public class SemaphoreExample2 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(3); // 獲取多個許可
                    test(threadNum);
                    semaphore.release(3); // 釋放多個許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

實例三:併發很高,想要超過容許的併發數以後,就拋棄

@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try{
                    if (semaphore.tryAcquire()) { // 嘗試獲取一個許可
   // 本例中只有一個三個線程能夠執行到這裏
                        test(threadNum);
                        semaphore.release(); // 釋放一個許可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

下面是Semaphore的方法列表

嘗試獲取獲取許可的時候等一段時間

嘗試獲取獲取許可的次數以及超時時間均可以設置

@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個許可
                        test(threadNum);
                        semaphore.release(); // 釋放一個許可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

CyclicBarrier

同步輔助類,容許一組線程相互等待,知道全部線程都準備就緒後,才能繼續操做,當某個線程調用了await方法以後,就會進入等待狀態,並將計數器-1,直到全部線程調用await方法使計數器爲0,才能夠繼續執行,因爲計數器能夠重複使用,因此咱們又叫他循環屏障。

使用場景
多線程計算數據,最後合併計算結果的應用場景,好比用Excel保存了用戶的銀行流水,每一頁保存了一個用戶近一年的每一筆銀行流水,如今須要統計用戶的日均銀行流水,這時候咱們就能夠用多線程處理每一頁裏的銀行流水,都執行完之後,獲得每個頁的日均銀行流水,以後經過CyclicBarrier的action,利用這些線程的計算結果,計算出整個excel的日均流水。

CyclicBarrier與CountDownLatch區別
一、CyclicBarrier能夠重複使用(使用reset方法),CountDownLatch只能用一次
二、CountDownLatch主要用於實現一個或n個線程須要等待其餘線程完成某項操做以後,才能繼續往下執行,描述的是一個或n個線程等待其餘線程的關係,而CyclicBarrier是多個線程相互等待,知道知足條件之後再一塊兒往下執行。描述的是多個線程相互等待的場景

實例一:能夠設置等待時間

@Slf4j
public class CyclicBarrierExample1 {

   // 1.給定一個值,說明有多少個線程同步等待
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            // 延遲1秒,方便觀察
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
       // 2.使用await方法進行等待
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

實例二

@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            // 因爲狀態可能會改變,因此會拋出BarrierException異常,若是想繼續往下執行,須要加上try-catch
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}

實例三

@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
       // 當線程所有到達屏障時,優先執行這裏的runable
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

ReentrantLock與Condition

Java一共分爲兩類鎖,一類是由synchornized修飾的鎖,還有一種是JUC裏提供的鎖,核心就是ReentrantLock

synchornized與ReentrantLock的區別對比:

對比維度 synchornized ReentrantLock
可重入性(進入鎖的時候計數器自增1) 可重入 可重入
鎖的實現 JVM實現,很難操做源碼,獲得實現 JDK實現
性能 在引入輕量級鎖後性能大大提高,建議均可以選擇的時候選擇synchornized -
功能區別 方便簡潔,由編譯器負責加鎖和釋放鎖 手工操做
粗粒度,不靈活 細粒度,可靈活控制 -
能否指定公平所 不能夠 能夠
能否放棄鎖 不能夠 能夠

ReentrantLock獨有的功能
能夠指定是公平鎖仍是非公平鎖
提供了一個Condition類,能夠分組喚醒須要喚醒的線程
提供可以中斷等待鎖的線程的機制,lock.lockInterruptibly()

ReentrantLock實現:自旋鎖,循環調用CAS操做來實現加鎖,避免了使線程進入內核態的阻塞狀態。想辦法組織線程進入內核態的阻塞狀態,是咱們分析和理解鎖的關鍵鑰匙。

基本用法

@Slf4j
@ThreadSafe
public class LockExample2 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

源碼分析

默認使用非公平鎖,能夠傳入true和false來使用公平所仍是非公平鎖。

tryLock,能夠設置等待時間,或者直接返回

ReentrantReadWriteLock

在沒有任何讀寫鎖的時候才能取得寫入的鎖,可用於實現悲觀讀取,讀多寫少的場景下可能會出現線程飢餓。

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

// 在沒有任何讀寫鎖的時候才能夠進行寫入操做
    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    class Data {

    }
}

StempedLock

StempedLock控制鎖有三種形式,分別是寫,讀,和樂觀讀,重點在樂觀鎖。一個StempedLock,狀態是由版本和模式兩個部分組成;鎖獲取的方法返回的是一個數字做爲票據(Stempe),他用相應的鎖狀態來表示並控制相關的訪問,數字0表示沒有寫鎖被受權訪問;在讀鎖上分爲悲觀讀和樂觀讀;
樂觀讀:若是讀的操做不少,寫操做不多的狀況下,咱們能夠樂觀的認爲,讀寫同時發生的概率很小,所以不悲觀的使用讀取鎖定很小,程序能夠在查看相關的狀態以後,判斷有沒有寫操做的變動,再採起相應的措施,這一小小的改進,能夠大大提高執行效率。

源碼案例解釋

import java.util.concurrent.locks.StampedLock;

public class LockExample4 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看樂觀讀鎖案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //得到一個樂觀讀鎖
            double currentX = x, currentY = y;  //將兩個字段讀入本地局部變量
            if (!sl.validate(stamp)) { //檢查發出樂觀讀鎖後同時是否有其餘寫鎖發生?
                stamp = sl.readLock();  //若是沒有,咱們再次得到一個讀悲觀鎖
                try {
                    currentX = x; // 將兩個字段讀入本地局部變量
                    currentY = y; // 將兩個字段讀入本地局部變量
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲觀讀鎖案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //循環,檢查當前狀態是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉爲寫鎖
                    if (ws != 0L) { //這是確認轉爲寫鎖是否成功
                        stamp = ws; //若是成功 替換票據
                        x = newX; //進行狀態改變
                        y = newY;  //進行狀態改變
                        break;
                    } else { //若是不能成功轉換爲寫鎖
                        sl.unlockRead(stamp);  //咱們顯式釋放讀鎖
                        stamp = sl.writeLock();  //顯式直接進行寫鎖 而後再經過循環再試
                    }
                }
            } finally {
                sl.unlock(stamp); //釋放讀鎖或寫鎖
            }
        }
    }
}

簡單使用

@Slf4j
@ThreadSafe
public class LockExample5 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    public static int count = 0;

    private final static StampedLock lock = new StampedLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
      // 會返回一個stamp的值
        long stamp = lock.writeLock();
        try {
            count++;
        } finally {
//釋放的時候要釋放
            lock.unlock(stamp);
        }
    }
}

總結關於鎖的幾個類:
synchronized:JVM實現,不但能夠經過一些監控工具監控,並且在出現未知異常的時候JVM也會自動幫咱們釋放鎖
ReentrantLock、ReentrantRead/WriteLock、StempedLock 他們都是對象層面的鎖定,要想保證鎖必定被釋放,要放到finally裏面,纔會更安全一些;StempedLock對性能有很大的改進,特別是在讀線程愈來愈多的狀況下,StempedLock有一個複雜的API。要注意使用

如何使用:
1.在只有少許競爭者的時候,synchronized是一個很好的鎖的實現
2.競爭者很多,可是增加量是能夠競爭的,ReentrantLock是一個很好的鎖的實現(適合本身的纔是最好的,不是越高級越好)

Condition

@Slf4j
public class LockExample6 {

    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();
// 從reentrantLock實例裏獲取了condition
        Condition condition = reentrantLock.newCondition();

        new Thread(() -> {
            try {
              // 線程1調用了lock方法,加入到了AQS的等待隊裏裏面去
                reentrantLock.lock();
                log.info("wait signal"); // 1 等待信號
          // 調用await方法後,從AQS隊列裏移除了,進入到了condition隊列裏面去,等待一個信號
                condition.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("get signal"); // 4 獲得信號
// 線程1釋放鎖
            reentrantLock.unlock();
        }).start();

        new Thread(() -> {
// 線程1await釋放鎖之後,這裏就獲取了鎖,加入到了AQS等待隊列中
            reentrantLock.lock();
            log.info("get lock"); // 2 獲取鎖
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
//調用signalAll發送信號的方法,Condition節點的線程1節點元素被取出,放在了AQS等待隊列裏(注意並無被喚醒)
            condition.signalAll();
            log.info("send signal ~ "); // 3 發送信號
// 線程2釋放鎖,這時候AQS隊列中只剩下線程1,線程1開始執行
            reentrantLock.unlock();
        }).start();
    }
}
相關文章
相關標籤/搜索