J.U.C之AQS

AQS是J.U.C的核心java

AQS(AbstractQueuedSynchronizer)隊列同步器,AQS是JDK下提供的一套用於實現基於FIFO等待隊列的阻塞鎖和相關的同步器的一個同步框架。程序員

同步器面向的是鎖的實現者,它簡化了鎖的實現方式,屏蔽了同步狀態管理、線程的排隊、等待和喚醒等底層操做。數據庫

同步隊列中的節點用來保存獲取同步狀態失敗的線程引用、等待狀態以及前驅和後繼節點。編程

同步器包含了兩個節點類型的引用,一個指向頭節點,而另外一個指向尾節點。
若是一個線程沒有得到同步狀態,那麼包裝它的節點將被加入到隊尾,顯然這個過程應該是線程安全的。所以同步器提供了一個基於CAS的設置尾節點的方法:compareAndSetTail(Node expect,Node update),它須要傳遞一個它認爲的尾節點和當前節點,只有設置成功,當前節點才被加入隊尾。這個過程以下所示
同步隊列遵循FIFO,首節點是獲取同步狀態成功的節點,首節點線程在釋放同步狀態時,將會喚醒後繼節點,然後繼節點將會在獲取同步狀態成功時將本身設置爲首節點,這一過程以下:
 
獨佔式同步狀態獲取
節點進入同步隊列後,就進入了自旋的過程,每一個節點都在自省的觀察,頭結點出隊列時,本身的前驅節點是不是頭結點,若是是,嘗試獲取同步狀態。能夠看見節點和節點之間在循環檢查的過程當中基本不相互通訊,而是簡單的判斷本身的前驅節點是不是頭結點,這樣就使得節點的釋放符合FIFO。
 

總結:在獲取同步狀態時,同步器維護這一個同步隊列,並持有對頭節點和尾節點的引用。獲取狀態失敗的線程會被包裝成節點加入到尾節點後面稱爲新的尾節點,在進入同步隊列後開始自旋,中止自旋的條件就是前驅節點爲頭節點而且成功獲取到同步狀態。在釋放同步狀態時,同步器調用tryRelease方法釋放同步狀態,而後喚醒頭節點的後繼節點。segmentfault

共享式同步狀態獲取緩存

 共享式獲取與獨佔式獲取的區別就是同一時刻是否能夠多個線程同時獲取到同步狀態。

設計原理安全

  • 使用Node實現FIFO隊列數據結構

  • 維護了一個volatile int state(表明共享資源)多線程

  • 使用方法是繼承,基於模板方法併發

  • 子類經過繼承同步器並實現它的抽象方法來管理同步狀態

  • 能夠實現排它鎖和共享鎖的模式(獨佔、共享)

具體實現的思路

1.首先 AQS內部維護了一個CLH隊列,多線程爭用資源被阻塞時會進入此隊列。同時AQS管理一個關於共享資源狀態信息的單一整數volatile int state,該整數能夠表現任何狀態,同時配合Unsafe工具對其原子性的操做來實現對當前鎖的狀態進行修改。。好比, Semaphore 用它來表現剩餘的許可數,ReentrantLock 用它來表現擁有它的線程已經請求了多少次鎖;FutureTask 用它來表現任務的狀態(還沒有開始、運行、完成和取消)

2.線程嘗試獲取鎖,若是獲取失敗,則將等待信息等包裝成一個Node結點,加入到同步隊列Sync queue裏

3.不斷從新嘗試獲取鎖(當前結點爲head的直接後繼纔會 嘗試),若是獲取失敗,則會阻塞本身,直到被喚醒

4.當持有鎖的線程釋放鎖的時候,會喚醒隊列中的後繼線程

AQS定義兩種資源共享方式:Exclusive(獨佔,只有一個線程能執行,如ReentrantLock)和Share(共享,多個線程可同時執行,如Semaphore/CountDownLatch),獨佔式或者共享式獲取同步狀態state。

以ReentrantLock爲例,state初始化爲0,表示未鎖定狀態。A線程lock()時,會調用tryAcquire()獨佔該鎖並將state+1。此後,其餘線程再tryAcquire()時就會失敗,直到A線程unlock()到state=0(即釋放鎖)爲止,其它線程纔有機會獲取該鎖。固然,釋放鎖以前,A線程本身是能夠重複獲取此鎖的(state會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多麼次,這樣才能保證state是能回到零態的。

再以CountDownLatch以例,任務分爲N個子線程去執行,state也初始化爲N(注意N要與線程個數一致)。這N個子線程是並行執行的,每一個子線程執行完後countDown()一次,state會CAS減1。等到全部子線程都執行完後(即state=0),會unpark()主調用線程,而後主調用線程就會從await()函數返回,繼續後餘動做。

AQS同步組件

  • CountDownLatch
  • Semaphore
  • CyclicBarrier
  • ReentrantLock
  • Condition
  • FutureTask

獨佔鎖:ReentrantLock

共享鎖:CountDownLatch, CyclicBarrier, Semaphore

共享和獨佔:ReentrantReadWriteLock

CountDownLatch

同步阻塞類,能夠完成阻塞線程的功能

CountDownLatch是經過一個計數器來實現的,計數器的初始值爲線程的數量。每當一個線程完成了本身的任務後,計數器的值就會減1。當計數器值到達0時,它表示全部的線程已經完成了任務,而後在閉鎖上等待的線程就能夠恢復執行任務。
構造器中的計數值(count)實際上就是閉鎖須要等待的線程數量。這個值只能被設置一次,並且CountDownLatch沒有提供任何機制去從新設置這個計數值。

與CountDownLatch的第一次交互是主線程等待其餘線程。主線程必須在啓動其餘線程後當即調用CountDownLatch.await()方法。這樣主線程的操做就會在這個方法上阻塞,直到其餘線程完成各自的任務。

 

使用場景

1.程序執行須要等待某個條件完成後,才能進行後面的操做。好比父任務等待全部子任務都完成的時候,在繼續往下進行

實例1:基本用法

@Slf4j
public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    // 爲防止出現異常,放在finally更保險一些
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }
}
View Code

2.好比有多個線程完成一個任務,可是這個任務只想給他一個指定的時間,超過這個任務就不繼續等待了。完成多少算多少

@Slf4j
public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
           // 放在這裏沒有用的,由於這時候仍是在主線程中阻塞,阻塞完之後纔開始執行下面的await
           // Thread.sleep(1);
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
       // 等待指定的時間 參數1:等待時間 參數2:時間單位
        countDownLatch.await(10, TimeUnit.MILLISECONDS);
        log.info("finish");
       // 並非第一時間內銷燬掉全部線程,而是先讓正在執行線程執行完
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
    }
}
View Code

Semaphore

控制某個資源能被併發訪問的次數

使用場景

1.僅能提供有限訪問的資源:好比數據庫的鏈接數最大隻有20,而上層的併發數遠遠大於20,這時候若是不作限制,可能會因爲沒法獲取鏈接而致使併發異常,這時候可使用Semaphore來進行控制,當信號量設置爲1的時候,就和單線程很類似了

實例1:每次獲取1個許可

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 獲取一個許可
                    test(threadNum);
                    semaphore.release(); // 釋放一個許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
View Code

實例2:一次性獲取多個許可

@Slf4j
public class SemaphoreExample2 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(3); // 獲取多個許可
                    test(threadNum);
                    semaphore.release(3); // 釋放多個許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
View Code

2.併發很高,想要超過容許的併發數以後,就拋棄

@Slf4j
public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try{
                    if (semaphore.tryAcquire()) { // 嘗試獲取一個許可
   // 本例中只有一個三個線程能夠執行到這裏
                        test(threadNum);
                        semaphore.release(); // 釋放一個許可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
View Code

3.嘗試獲取獲取許可的次數以及超時時間均可以設置

@Slf4j
public class SemaphoreExample4 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個許可
                        test(threadNum);
                        semaphore.release(); // 釋放一個許可
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}
View Code

CyclicBarrier

同步輔助類,容許一組線程相互等待,知道全部線程都準備就緒後,才能繼續操做,當某個線程調用了await方法以後,就會進入等待狀態,並將計數器-1,直到全部線程調用await方法使計數器爲0,才能夠繼續執行,因爲計數器能夠重複使用,因此咱們又叫他循環屏障。

CyclicBarrier與CountDownLatch區別

1.CyclicBarrier能夠重複使用(使用reset方法),CountDownLatch只能用一次
2.CountDownLatch主要用於實現一個或n個線程須要等待其餘線程完成某項操做以後,才能繼續往下執行,描述的是一個或n個線程等待其餘線程的關係,而CyclicBarrier是多個線程相互等待,知道知足條件之後再一塊兒往下執行。描述的是多個線程相互等待的場景

能夠設置等待時間

@Slf4j
public class CyclicBarrierExample1 {

   // 1.給定一個值,說明有多少個線程同步等待
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            // 延遲1秒,方便觀察
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
       // 2.使用await方法進行等待
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
View Code
@Slf4j
public class CyclicBarrierExample2 {

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        try {
            // 因爲狀態可能會改變,因此會拋出BarrierException異常,若是想繼續往下執行,須要加上try-catch
            barrier.await(2000, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            log.warn("BarrierException", e);
        }
        log.info("{} continue", threadNum);
    }
}
View Code
@Slf4j
public class CyclicBarrierExample3 {

    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
       // 當線程所有到達屏障時,優先執行這裏的runable
        log.info("callback is running");
    });

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}
View Code

Lock

ReentrantLock與Condition

java一共分爲兩類鎖,一類是由synchornized修飾的鎖,還有一種是JUC裏提供的鎖,核心就是ReentrantLock

synchornized與ReentrantLock的區別對比:

 

對比維度 synchornized ReentrantLock
可重入性(進入鎖的時候計數器自增1) 可重入 可重入
鎖的實現 JVM實現,很難操做源碼,獲得實現 JDK實現
性能 在引入輕量級鎖後性能大大提高,建議均可以選擇的時候選擇synchornized -
功能區別 方便簡潔,由編譯器負責加鎖和釋放鎖 手工操做
粒度、靈活度 粗粒度,不靈活
能否指定公平所 不能夠 能夠
能否放棄鎖 不能夠 能夠

基本使用

@Slf4j
@ThreadSafe
public class LockExample2 {

    // 請求總數
    public static int clientTotal = 5000;

    // 同時併發執行的線程數
    public static int threadTotal = 200;

    public static int count = 0;

    private final static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }

    private static void add() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}
View Code

公平鎖非公平鎖以及可重入的理解

輕鬆學習java可重入鎖(ReentrantLock)的實現原理

Condition

Condition的特性:

1.Condition中的await()方法至關於Object的wait()方法,Condition中的signal()方法至關於Object的notify()方法,Condition中的signalAll()至關於Object的notifyAll()方法。不一樣的是,Object中的這些方法是和同步鎖捆綁使用的;而Condition是須要與互斥鎖/共享鎖捆綁使用的。

2.Condition它更強大的地方在於:可以更加精細的控制多線程的休眠與喚醒。對於同一個鎖,咱們能夠建立多個Condition,在不一樣的狀況下使用不一樣的Condition。
例如,假如多線程讀/寫同一個緩衝區:當向緩衝區中寫入數據以後,喚醒"讀線程";當從緩衝區讀出數據以後,喚醒"寫線程";而且當緩衝區滿的時候,"寫線程"須要等待;當緩衝區爲空時,"讀線程"須要等待。      

 若是採用Object類中的wait(), notify(), notifyAll()實現該緩衝區,當向緩衝區寫入數據以後須要喚醒"讀線程"時,不可能經過notify()或notifyAll()明確的指定喚醒"讀線程",而只能經過notifyAll喚醒全部線程(可是notifyAll沒法區分喚醒的線程是讀線程,仍是寫線程)。  可是,經過Condition,就能明確的指定喚醒讀線程。

public class Task {
    
    private final Lock lock = new ReentrantLock();
    
    private final Condition addCondition = lock.newCondition();
    
    private final Condition subCondition = lock.newCondition();
    
    
    private static int num = 0;
    private List<String> lists = new LinkedList<String>();
    
    public void add() {
        lock.lock();
        
        try {
            while(lists.size() == 10) {//當集合已滿,則"添加"線程等待
                addCondition.await();
            }
            
            num++;
            lists.add("add Banana" + num);
            System.out.println("The Lists Size is " + lists.size());
            System.out.println("The Current Thread is " + Thread.currentThread().getName());
            System.out.println("==============================");
            this.subCondition.signal();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {//釋放鎖
            lock.unlock();
        }
    }
    
    
    public void sub() {
        lock.lock();
        
        try {
            while(lists.size() == 0) {//當集合爲空時,"減小"線程等待
                subCondition.await();
            }
            
            String str = lists.get(0);
            lists.remove(0);
            System.out.println("The Token Banana is [" + str + "]");
            System.out.println("The Current Thread is " + Thread.currentThread().getName());
            System.out.println("==============================");
            num--;
            addCondition.signal();
            
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
}
View Code

Condition的實現分析

 ConditionObject類是AQS的內部類,實現了Condition接口。每一個Condition對象都包含着一個等待隊列,這個隊列是實現等待/通知功能的關鍵。在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而並不是包中的Lock(同步器)擁有一個同步隊列和多個等待隊列。等待隊列和同步隊列同樣,使用的都是同步器AQS中的節點類Node。 一樣擁有首節點和尾節點, 每一個Condition對象都包含着一個FIFO隊列。

 等待:

若是一個線程調用了Condition.await()方法,那麼該線程就會釋放鎖,構成節點加入等待隊列並進入等待狀態。至關於同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中。

 

通知:

調用Condition.signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點以前,會將節點移到同步隊列中,加入到獲取同步狀態的競爭中,成功獲取同步狀態(或者說鎖以後),被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功獲取了鎖。

 

=============================================================================

ReentrantReadWriteLock

ReadWriteLock,顧名思義,是讀寫鎖。它維護了一對相關的鎖 — — 「讀取鎖」和「寫入鎖」,一個用於讀取操做,另外一個用於寫入操做。
「讀取鎖」用於只讀操做,它是「共享鎖」,能同時被多個線程獲取。
「寫入鎖」用於寫入操做,它是「獨佔鎖」,寫入鎖只能被一個線程鎖獲取。

@Slf4j
public class LockExample3 {

    private final Map<String, Data> map = new TreeMap<>();

    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock readLock = lock.readLock();

    private final Lock writeLock = lock.writeLock();

    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Set<String> getAllKeys() {
        readLock.lock();
        try {
            return map.keySet();
        } finally {
            readLock.unlock();
        }
    }

// 在沒有任何讀寫鎖的時候才能夠進行寫入操做
    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            readLock.unlock();
        }
    }

    class Data {

    }
}
View Code

特性:

  1. 公平性選擇:支持公平和非公平(默認)兩種獲取鎖的方式,非公平鎖的吞吐量優於公平鎖;
  2. 可重入:支持可重入,讀線程在獲取讀鎖以後可以再次獲取讀鎖,寫線程在獲取了寫鎖以後可以再次獲取寫鎖,同時也能夠獲取讀鎖(同一線程)
  3. 鎖降級:線程獲取鎖的順序遵循獲取寫鎖,獲取讀鎖,釋放寫鎖,寫鎖能夠降級成爲讀鎖。

 優勢:

  1. 經過分離讀鎖和寫鎖,可以提供比排它鎖更好的併發性和吞吐量。
  2. 讀寫鎖可以簡化讀寫交互場景的編程。

針對第二點,好比說一個共享的用做緩存數據結構,大部分時間提供讀服務,而寫操做佔有的時間較少,可是寫操做完成後的更新須要對後序的讀服務可見。

在沒有讀寫鎖支持的時候,若是須要完成上述工做就要使用Java的等待通知機制,就是當寫操做開始時,全部晚於寫操做的讀操做均會進入等待狀態,只有寫操做完成並進行 通知以後,全部等待的讀操做才能繼續執行(寫操做之間依靠synchronized關鍵字進行同步),這樣作的目的是使讀操做都能讀取到正確的數據,而不會出現髒讀。

改用讀寫鎖實現上述功能,只須要在讀操做時獲取讀鎖,而寫操做時獲取寫鎖便可,當寫鎖被獲取到時,後續(非當前寫操做線程)的讀寫操做都會被 阻塞,寫鎖釋放以後,全部操做繼續執行,編程方式相對於使用等待通知機制的實現方式而言,變得簡單明瞭。

讀寫鎖實現簡單的Cache

public class Cache { 
  static Map<String, Object> map = new HashMap<String, Object>(); 
  static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); 
  static Lock r = rwl.readLock(); 
  static Lock w = rwl.writeLock(); 
  // 獲取一個key對應的value 
  public static final Object get(String key) { 
    r.lock(); 
    try { 
      return map.get(key); 
    } finally { 
      r.unlock(); 
    } 
  } 

  // 設置key對應的value,並返回舊有的value 
  public static final Object put(String key, Object value) { 
    w.lock(); 
    try { 
      return map.put(key, value); 
    } finally { 
      w.unlock(); 
    } 
  } 

  // 清空全部的內容 
  public static final void clear() { 
    w.lock(); 
    try { 
      map.clear(); 
    } finally { 
      w.unlock(); 
    } 
  } 
} 
View Code

Cache使用讀寫鎖提高讀操做併發性,也保證每次寫操做對全部的讀寫操做的可見性,同時簡化了編程方式。

讀寫鎖的實現分析:

1.讀寫狀態設計

讀寫鎖一樣依賴自定義同步器來實現同步功能,而讀寫狀態就是其同步器的同步狀態。回想ReentrantLock中自定義同步器的實現,同步狀態 表示鎖被一個線程重複獲取的次數,而讀寫鎖的自定義同步器須要在同步狀態(一個整型變量)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成爲讀寫 鎖實現的關鍵。

若是在一個整型變量上維護多種狀態,就必定須要「按位切割使用」這個變量,讀寫鎖是將變量切分紅了兩個部分,高16位表示讀,低16位表示寫,劃分方式如圖1所示。

2.鎖降級

鎖降級指的是寫鎖降級成爲讀鎖。若是當前線程擁有寫鎖,而後將其釋放,最後再獲取讀鎖,這種分段完成的過程不能稱之爲鎖降級。鎖降級是指把持住(當前擁有的)寫鎖,再獲取到讀鎖,隨後釋放(先前擁有的)寫鎖的過程。

class CachedData {
   Object data;
   volatile boolean cacheValid;
   final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

   void processCachedData() {
     rwl.readLock().lock();
     if (!cacheValid) {
       // Must release read lock before acquiring write lock
       rwl.readLock().unlock();
       rwl.writeLock().lock();
       try {
         // Recheck state because another thread might have
         // acquired write lock and changed state before we did.
         if (!cacheValid) {
           data = ...
           cacheValid = true;
         }
         // Downgrade by acquiring read lock before releasing write lock
         rwl.readLock().lock();
       } finally {
         rwl.writeLock().unlock(); // Unlock write, still hold read
       }
     }

     try {
       use(data);
     } finally {
       rwl.readLock().unlock();
     }
   }
 }

鎖降級中讀鎖的獲取是否必要呢?答案是必要的。主要是爲了保證數據的可見性。就例子的代碼來講,是須要鎖降級獲取讀鎖的。若是不這樣,在釋放完寫鎖後,別的線程(假設B)可能在當前線程(假設A)尚未執行到user(data)時獲取到寫鎖,而後修改data的值,當線程A恢復運行後,因爲可見性問題,此時線程A的data已經不是正確的data了,當前線程沒法感知線程B的數據更新。使用讀鎖能夠保證在線程A獲取讀鎖時別的線程沒法修改data

這裏要着重講一講「沒法感知」是什麼意思:

也就是說,在另外一個線程(假設叫線程1)修改數據的那一個瞬間,當前線程(線程2)是不知道數據此時已經變化了,可是並不意味着以後線程2使用的數據就是舊的數據,相反線程2使用仍是被線程1更新以後的數據。也就是說,就算我不使用鎖降級,程序的運行結果也是正確的(這是由於鎖的機制和volatile關鍵字類似)。「感知」實際上是想強調讀的實時連續性,可是卻容易讓人誤導爲強調數據操做。

RentrantReadWriteLock不支持鎖升級(把持讀鎖、獲取寫鎖,最後釋放讀鎖的過程)。緣由也是保證數據可見性,若是讀鎖已被多個線程獲取,其中任意線程成功獲取了寫鎖並更新了數據,則其更新對其餘獲取到讀鎖的線程不可見。

使用java的ReentrantReadWriteLock讀寫鎖時,鎖降級是必須的麼?

不是必須的。

在這個問題裏,若是不想使用鎖降級

  1. 能夠繼續持有寫鎖,完成後續的操做。
  2. 也能夠先把寫鎖釋放,再獲取讀鎖。

但問題是

  1. 若是繼續持有寫鎖,若是 use 函數耗時較長,那麼就沒必要要的阻塞了可能的讀流程
  2.  若是先把寫鎖釋放,再獲取讀鎖。在有些邏輯裏,這個 cache 值可能被修改也可能被移除,這個取決於能不能接受了。
  3. 另外,降級鎖比釋放寫再獲取讀性能要好,由於當前只有一個寫鎖,能夠直接不競爭的降級。而釋放寫鎖,獲取讀鎖的過程就面對着其餘讀鎖請求的競爭,引入額外沒必要要的開銷。

downgrading 只是提供了一個手段,這個手段可讓流程不被中斷的下降到低級別鎖,而且相對一樣知足業務要求的其餘手段性能更爲良好

https://www.zhihu.com/question/265909728/answer/301363927

https://segmentfault.com/q/1010000009659039

 

StampLock

該類是一個讀寫鎖的改進,它的思想是讀寫鎖中讀不只不阻塞讀,同時也不該該阻塞寫。

讀不阻塞寫的實現思路:在讀的時候若是發生了寫,則應當重讀而不是在讀的時候直接阻塞寫!由於在讀線程很是多而寫線程比較少的狀況下寫線程可能發生飢餓現象,也就是由於大量的讀線程存在而且讀線程都阻塞寫線程,所以寫線程可能幾乎不多被調度成功!當讀執行的時候另外一個線程執行了寫,則讀線程發現數據不一致則執行重讀便可。

因此讀寫都存在的狀況下,使用StampedLock就能夠實現一種無障礙操做,即讀寫之間不會阻塞對方,可是寫和寫之間仍是阻塞的!

  StampedLock有三種模式的鎖,用於控制讀取/寫入訪問。StampedLock的狀態由版本和模式組成。鎖獲取操做返回一個用於展現和訪問鎖狀態的票據(stamp)變量,它用相應的鎖狀態表示並控制訪問,數字0表示沒有寫鎖被受權訪問。在讀鎖上分爲悲觀鎖和樂觀鎖。鎖釋放以及其餘相關方法須要使用郵編(stamps)變量做爲參數,若是他們和當前鎖狀態不符則失敗,這三種模式爲:
       • 寫入:方法writeLock可能爲了獲取獨佔訪問而阻塞當前線程,返回一個stamp變量,可以在unlockWrite方法中使用從而釋放鎖。也提供了tryWriteLock。當鎖被寫模式所佔有,沒有讀或者樂觀的讀操做可以成功。
       • 讀取:方法readLock可能爲了獲取非獨佔訪問而阻塞當前線程,返回一個stamp變量,可以在unlockRead方法中用於釋放鎖。也提供了tryReadLock。
       • 樂觀讀取:方法tryOptimisticRead返回一個非0郵編變量,僅在當前鎖沒有以寫入模式被持有。若是在得到stamp變量以後沒有被寫模式持有,方法validate將返回true。這種模式能夠被看作一種弱版本的讀鎖,能夠被一個寫入者在任什麼時候間打斷。樂觀讀取模式僅用於短期讀取操做時常常可以下降競爭和提升吞吐量。

程序舉例:

public class Point {
    //一個點的x,y座標
    private double x, y;
    /**
     * Stamped相似一個時間戳的做用,每次寫的時候對其+1來改變被操做對象的Stamped值
     * 這樣其它線程讀的時候發現目標對象的Stamped改變,則執行重讀
     */
    private final StampedLock stampedLock = new StampedLock();
 
    // an exclusively locked method
    void move(doubledeltaX, doubledeltaY) {
        /**stampedLock調用writeLock和unlockWrite時候都會致使stampedLock的stamp值的變化
         * 即每次+1,直到加到最大值,而後從0從新開始 */
        long stamp = stampedLock.writeLock(); //寫鎖
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            stampedLock.unlockWrite(stamp);//釋放寫鎖
        }
    }
 
    double distanceFromOrigin() {    // A read-only method
        /**tryOptimisticRead是一個樂觀的讀,使用這種鎖的讀不阻塞寫
         * 每次讀的時候獲得一個當前的stamp值(相似時間戳的做用)*/
        long stamp = stampedLock.tryOptimisticRead();
 
        //這裏就是讀操做,讀取x和y,由於讀取x時,y可能被寫了新的值,因此下面須要判斷
        double currentX = x, currentY = y;
 
        /**若是讀取的時候發生了寫,則stampedLock的stamp屬性值會變化,此時須要重讀,
         * 再重讀的時候須要加讀鎖(而且重讀時使用的應當是悲觀的讀鎖,即阻塞寫的讀鎖)
         * 固然重讀的時候還可使用tryOptimisticRead,此時須要結合循環了,即相似CAS方式
         * 讀鎖又從新返回一個stampe值*/
        if (!stampedLock.validate(stamp)) {
            stamp = stampedLock.readLock(); //讀鎖
            try {
                currentX = x;
                currentY = y;
            } finally {
                stampedLock.unlockRead(stamp);//釋放讀鎖
            }
        }
        //讀鎖驗證成功後才執行計算,即讀的時候沒有發生寫
        return Math.sqrt(currentX * currentX + currentY * currentY);
    }
}
View Code

==================================================================

Synchronize和ReentrantLock的比較:

功能比較:

便利性:很明顯Synchronized的使用比較方便簡潔,而且由編譯器去保證鎖的加鎖和釋放,而ReenTrantLock須要手工聲明來加鎖和釋放鎖,爲了不忘記手工釋放鎖形成死鎖,因此最好在finally中聲明釋放鎖。

鎖的細粒度和靈活度:很明顯ReenTrantLock優於Synchronized

ReenTrantLock獨有的能力:

1.      ReenTrantLock能夠指定是公平鎖仍是非公平鎖。而synchronized只能是非公平鎖。所謂的公平鎖就是先等待的線程先得到鎖。

public class TestReentrantLock2 {
    private static Lock lock = new ReentrantLock(true);  //lock爲公平鎖

    public static void main(String[] args) {
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();

                try {
                    System.out.println("線程1啓動...");
                } finally {
                    lock.unlock();
                }
            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();

                try {
                    System.out.println("線程2啓動...");
                } finally {
                    lock.unlock();
                }
            }
        });

        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();

                try {
                    System.out.println("線程3啓動...");
                } finally {
                    lock.unlock();
                }
            }
        });

        t1.start();
        t3.start();
        t2.start();
    }
}
運行結果:
線程1啓動...
線程3啓動...
線程2啓動...
View Code

  在ReentrantLock中的構造函數中,提供了一個參數,指定是否爲公平鎖。

  公平鎖:線程將按照它們發出的請求順序來得到鎖 
  非公鎖:當一個線程請求非公平鎖的時候,若是發出請求時,得到鎖的線程恰好釋放鎖,則該線程將會得到鎖而跳過在該鎖上等待的線程。

2.      ReenTrantLock提供了一個Condition(條件)類,用來實現分組喚醒須要喚醒的線程們,而不是像synchronized要麼隨機喚醒一個線程要麼喚醒所有線程。

3.      ReenTrantLock提供了一種可以中斷等待鎖的線程的機制,經過lock.lockInterruptibly()來實現這個機制。可讓它中斷本身或者在別的線程中中斷它,中斷後能夠放棄等待,去  處理其餘事,而不可中斷鎖不會響應中斷,將一直等待,synchronized就是不可中斷。

4.      ReenTrantLock的tryLock(long time, TimeUnit unit)起到了定時鎖的做用,若是在指定時間內沒有獲取到鎖,將會返回false。應用:具備時間限制的操做時使用 

public class TestReentrantLock {
    public static void main(String[] args) {
        Lock r = new ReentrantLock();

        //線程1
        Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                //得到鎖
                r.lock();
                try {
                    System.out.println("線程1得到了鎖");
                    //睡眠5秒
                    Thread.currentThread().sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    r.unlock();
                }
            }
        });
        thread1.start();

        //線程2
        Thread thread2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if (r.tryLock(1000, TimeUnit.MILLISECONDS)) {
                        System.out.println("線程2得到了鎖");
                    } else {
                        System.out.println("獲取鎖失敗了");
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        thread2.start();
    }
}
View Code

 

什麼狀況下使用ReenTrantLock:

答案是,若是你須要實現ReenTrantLock的四個獨有功能時。由於對於 java.util.concurrent.lock 中的鎖定類來講,synchronized 仍然有一些優點。好比,在使用 synchronized 的時候,不可能忘記釋放鎖;在退出 synchronized 塊時,JVM 會爲您作這件事。您很容易忘記用 finally 塊釋放鎖,這對程序很是有害。

 

性能比較:

synchronized: 在資源競爭不是很激烈的狀況下,偶爾會有同步的情形下,synchronized是很合適的。緣由在於,編譯程序一般會盡量的進行優化synchronize,另外可讀性很是好,無論用沒用過5.0多線程包的程序員都能理解。 ReentrantLock: ReentrantLock提供了多樣化的同步,好比有時間限制的同步,能夠被Interrupt的同步(synchronized的同步是不能Interrupt的)等。在資源競爭不激烈的情形下,性能稍微比synchronized差點點。可是當同步很是激烈的時候,synchronized的性能一會兒能降低好幾十倍。而ReentrantLock確還能維持常態。 Atomic: 和上面的相似,不激烈狀況下,性能比synchronized略遜,而激烈的時候,也能維持常態。激烈的時候,Atomic的性能會優於ReentrantLock一倍左右。可是其有一個缺點,就是隻能同步一個值,一段代碼中只能出現一個Atomic的變量,多於一個同步無效。由於他不能在多個Atomic之間同步。 因此,咱們寫同步的時候,優先考慮synchronized,若是有特殊須要,再進一步優化。ReentrantLock和Atomic若是用的很差,不只不能提升性能,還可能帶來災難。 

相關文章
相關標籤/搜索