深刻理解 Java 併發鎖

📦 本文以及示例源碼已歸檔在 javacorehtml

1、併發鎖簡介

確保線程安全最多見的作法是利用鎖機制(Locksychronized)來對共享數據作互斥同步,這樣在同一個時刻,只有一個線程能夠執行某個方法或者某個代碼塊,那麼操做必然是原子性的,線程安全的。java

在工做、面試中,常常會聽到各類五花八門的鎖,聽的人云裏霧裏。鎖的概念術語不少,它們是針對不一樣的問題所提出的,經過簡單的梳理,也不難理解。git

可重入鎖

可重入鎖又名遞歸鎖,是指 同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖github

可重入鎖能夠在必定程度上避免死鎖面試

  • ReentrantLockReentrantReadWriteLock 是可重入鎖。這點,從其命名也不難看出。
  • synchronized 也是一個可重入鎖
synchronized void setA() throws Exception{
    Thread.sleep(1000);
    setB();
}

synchronized void setB() throws Exception{
    Thread.sleep(1000);
}

上面的代碼就是一個典型場景:若是使用的鎖不是可重入鎖的話,setB 可能不會被當前線程執行,從而形成死鎖。編程

公平鎖與非公平鎖

  • 公平鎖 - 公平鎖是指 多線程按照申請鎖的順序來獲取鎖
  • 非公平鎖 - 非公平鎖是指 多線程不按照申請鎖的順序來獲取鎖 。這就可能會出現優先級反轉(後來者居上)或者飢餓現象(某線程老是搶不過別的線程,致使始終沒法執行)。

公平鎖爲了保證線程申請順序,勢必要付出必定的性能代價,所以其吞吐量通常低於非公平鎖。api

公平鎖與非公平鎖 在 Java 中的典型實現:數組

  • synchronized 只支持非公平鎖
  • ReentrantLockReentrantReadWriteLock,默認是非公平鎖,但支持公平鎖

獨享鎖與共享鎖

獨享鎖與共享鎖是一種廣義上的說法,從實際用途上來看,也常被稱爲互斥鎖與讀寫鎖。緩存

  • 獨享鎖 - 獨享鎖是指 鎖一次只能被一個線程所持有
  • 共享鎖 - 共享鎖是指 鎖可被多個線程所持有

獨享鎖與共享鎖在 Java 中的典型實現:安全

  • synchronizedReentrantLock 只支持獨享鎖
  • ReentrantReadWriteLock 其寫鎖是獨享鎖,其讀鎖是共享鎖。讀鎖是共享鎖使得併發讀是很是高效的,讀寫,寫讀 ,寫寫的過程是互斥的。

悲觀鎖與樂觀鎖

樂觀鎖與悲觀鎖不是指具體的什麼類型的鎖,而是處理併發同步的策略

  • 悲觀鎖 - 悲觀鎖對於併發採起悲觀的態度,認爲:不加鎖的併發操做必定會出問題悲觀鎖適合寫操做頻繁的場景
  • 樂觀鎖 - 樂觀鎖對於併發採起樂觀的態度,認爲:不加鎖的併發操做也沒什麼問題。對於同一個數據的併發操做,是不會發生修改的。在更新數據的時候,會採用不斷嘗試更新的方式更新數據。樂觀鎖適合讀多寫少的場景

悲觀鎖與樂觀鎖在 Java 中的典型實現:

  • 悲觀鎖在 Java 中的應用就是經過使用 synchronizedLock 顯示加鎖來進行互斥同步,這是一種阻塞同步。

  • 樂觀鎖在 Java 中的應用就是採用 CAS 機制(CAS 操做經過 Unsafe 類提供,但這個類不直接暴露爲 API,因此都是間接使用,如各類原子類)。

輕量級鎖、重量級鎖與偏向鎖

所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細。顯然,控制粒度越細,阻塞開銷越小,併發性也就越高。

Java 1.6 之前,重量級鎖通常指的是 synchronized ,而輕量級鎖指的是 volatile

Java 1.6 之後,針對 synchronized 作了大量優化,引入 4 種鎖狀態: 無鎖狀態、偏向鎖、輕量級鎖和重量級鎖。鎖能夠單向的從偏向鎖升級到輕量級鎖,再從輕量級鎖升級到重量級鎖 。

  • 偏向鎖 - 偏向鎖是指一段同步代碼一直被一個線程所訪問,那麼該線程會自動獲取鎖。下降獲取鎖的代價。
  • 輕量級鎖 - 是指當鎖是偏向鎖的時候,被另外一個線程所訪問,偏向鎖就會升級爲輕量級鎖,其餘線程會經過自旋的形式嘗試獲取鎖,不會阻塞,提升性能。

  • 重量級鎖 - 是指當鎖爲輕量級鎖的時候,另外一個線程雖然是自旋,但自旋不會一直持續下去,當自旋必定次數的時候,尚未獲取到鎖,就會進入阻塞,該鎖膨脹爲重量級鎖。重量級鎖會讓其餘申請的線程進入阻塞,性能下降。

分段鎖

分段鎖實際上是一種鎖的設計,並非具體的一種鎖。所謂分段鎖,就是把鎖的對象分紅多段,每段獨立控制,使得鎖粒度更細,減小阻塞開銷,從而提升併發性。這其實很好理解,就像高速公路上的收費站,若是隻有一個收費口,那全部的車只能排成一條隊繳費;若是有多個收費口,就能夠分流了。

Hashtable 使用 synchronized 修飾方法來保證線程安全性,那麼面對線程的訪問,Hashtable 就會鎖住整個對象,全部的其它線程只能等待,這種阻塞方式的吞吐量顯然很低。

Java 1.7 之前的 ConcurrentHashMap 就是分段鎖的典型案例。ConcurrentHashMap 維護了一個 Segment 數組,通常稱爲分段桶。

final Segment<K,V>[] segments;

當有線程訪問 ConcurrentHashMap 的數據時,ConcurrentHashMap 會先根據 hashCode 計算出數據在哪一個桶(即哪一個 Segment),而後鎖住這個 Segment

顯示鎖和內置鎖

Java 1.5 以前,協調對共享對象的訪問時可使用的機制只有 synchronizedvolatile。這兩個都屬於內置鎖,即鎖的申請和釋放都是由 JVM 所控制。

Java 1.5 以後,增長了新的機制:ReentrantLockReentrantReadWriteLock ,這類鎖的申請和釋放均可以由程序所控制,因此常被稱爲顯示鎖。

💡 synchronized 的用法和原理能夠參考:Java 併發基礎機制 - synchronized

🔔 注意:若是不須要 ReentrantLockReentrantReadWriteLock 所提供的高級同步特性,應該優先考慮使用 synchronized 。理由以下:

  • Java 1.6 之後,synchronized 作了大量的優化,其性能已經與 ReentrantLockReentrantReadWriteLock 基本上持平。
  • 從趨勢來看,Java 將來更可能會優化 synchronized ,而不是 ReentrantLockReentrantReadWriteLock ,由於 synchronized 是 JVM 內置屬性,它能執行一些優化。
  • ReentrantLockReentrantReadWriteLock 申請和釋放鎖都是由程序控制,若是使用不當,可能形成死鎖,這是很危險的。

如下對比一下顯示鎖和內置鎖的差別:

  • 主動獲取鎖和釋放鎖
    • synchronized 不能主動獲取鎖和釋放鎖。獲取鎖和釋放鎖都是 JVM 控制的。
    • ReentrantLock 能夠主動獲取鎖和釋放鎖。(若是忘記釋放鎖,就可能產生死鎖)。
  • 響應中斷
    • synchronized 不能響應中斷。
    • ReentrantLock 能夠響應中斷。
  • 超時機制
    • synchronized 沒有超時機制。
    • ReentrantLock 有超時機制。ReentrantLock 能夠設置超時時間,超時後自動釋放鎖,避免一直等待。
  • 支持公平鎖
    • synchronized 只支持非公平鎖。
    • ReentrantLock 支持非公平鎖和公平鎖。
  • 是否支持共享
    • synchronized 修飾的方法或代碼塊,只能被一個線程訪問(獨享)。若是這個線程被阻塞,其餘線程也只能等待
    • ReentrantLock 能夠基於 Condition 靈活的控制同步條件。
  • 是否支持讀寫分離
    • synchronized 不支持讀寫鎖分離;
    • ReentrantReadWriteLock 支持讀寫鎖,從而使阻塞讀寫的操做分開,有效提升併發性。

2、AQS

AbstractQueuedSynchronizer(簡稱 AQS)是隊列同步器,顧名思義,其主要做用是處理同步。它是併發鎖和不少同步工具類的實現基石(如 ReentrantLockReentrantReadWriteLockSemaphore 等)。

所以,要想深刻理解 ReentrantLockReentrantReadWriteLock 等併發鎖和同步工具,必須先理解 AQS 的要點和原理。

AQS 的要點

java.util.concurrent.locks 包中的相關鎖(經常使用的有 ReentrantLockReadWriteLock)都是基於 AQS 來實現。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync 類去繼承 AQS。爲何要這樣呢?由於鎖面向的是使用用戶,而同步器面向的則是線程控制,那麼在鎖的實現中聚合同步器而不是直接繼承 AQS 就能夠很好的隔離兩者所關注的事情。

AQS 提供了對獨享鎖與共享鎖的支持

獨享鎖 API

獲取、釋放獨享鎖的主要 API 以下:

public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 獲取獨佔鎖。
  • acquireInterruptibly - 獲取可中斷的獨佔鎖。
  • tryAcquireNanos - 嘗試在指定時間內獲取可中斷的獨佔鎖。在如下三種狀況下回返回:
    • 在超時時間內,當前線程成功獲取了鎖;
    • 當前線程在超時時間內被中斷;
    • 超時時間結束,仍未得到鎖返回 false。
  • release - 釋放獨佔鎖。

共享鎖 API

獲取、釋放共享鎖的主要 API 以下:

public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 獲取共享鎖。
  • acquireSharedInterruptibly - 獲取可中斷的共享鎖。
  • tryAcquireSharedNanos - 嘗試在指定時間內獲取可中斷的共享鎖。
  • release - 釋放共享鎖。

AQS 的原理

AQS 的數據結構

閱讀 AQS 的源碼,能夠發現:AQS 繼承自 AbstractOwnableSynchronize

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** 等待隊列的隊頭,懶加載。只能經過 setHead 方法修改。 */
    private transient volatile Node head;
    /** 等待隊列的隊尾,懶加載。只能經過 enq 方法添加新的等待節點。*/
    private transient volatile Node tail;
    /** 同步狀態 */
    private volatile int state;
}
  • state - AQS 使用一個整型的 volatile 變量來 維護同步狀態
    • 這個整數狀態的意義由子類來賦予,如ReentrantLock 中該狀態值表示全部者線程已經重複獲取該鎖的次數,Semaphore 中該狀態值表示剩餘的許可數量。
  • headtail - AQS 維護了一個 Node 類型(AQS 的內部類)的雙鏈表來完成同步狀態的管理。這個雙鏈表是一個雙向的 FIFO 隊列,經過 headtail 指針進行訪問。當 有線程獲取鎖失敗後,就被添加到隊列末尾

img

再來看一下 Node 的源碼

static final class Node {
    /** 該等待同步的節點處於共享模式 */
    static final Node SHARED = new Node();
    /** 該等待同步的節點處於獨佔模式 */
    static final Node EXCLUSIVE = null;

    /** 線程等待狀態,狀態值有: 0、一、-一、-二、-3 */
    volatile int waitStatus;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 前驅節點 */
    volatile Node prev;
    /** 後繼節點 */
    volatile Node next;
    /** 等待鎖的線程 */
    volatile Thread thread;

    /** 和節點是否共享有關 */
    Node nextWaiter;
}

很顯然,Node 是一個雙鏈表結構。

  • waitStatus - Node 使用一個整型的 volatile 變量來 維護 AQS 同步隊列中線程節點的狀態。waitStatus 有五個狀態值:
    • CANCELLED(1) - 此狀態表示:該節點的線程可能因爲超時或被中斷而 處於被取消(做廢)狀態,一旦處於這個狀態,表示這個節點應該從等待隊列中移除。
    • SIGNAL(-1) - 此狀態表示:後繼節點會被掛起,所以在當前節點釋放鎖或被取消以後,必須喚醒(unparking)其後繼結點。
    • CONDITION(-2) - 此狀態表示:該節點的線程 處於等待條件狀態,不會被看成是同步隊列上的節點,直到被喚醒(signal),設置其值爲 0,再從新進入阻塞狀態。
    • PROPAGATE(-3) - 此狀態表示:下一個 acquireShared 應無條件傳播。
    • 0 - 非以上狀態。

獨佔鎖的獲取和釋放

獲取獨佔鎖

AQS 中使用 acquire(int arg) 方法獲取獨佔鎖,其大體流程以下:

  1. 先嚐試獲取同步狀態,若是獲取同步狀態成功,則結束方法,直接返回。
  2. 若是獲取同步狀態不成功,AQS 會不斷嘗試利用 CAS 操做將當前線程插入等待同步隊列的隊尾,直到成功爲止。
  3. 接着,不斷嘗試爲等待隊列中的線程節點獲取獨佔鎖。

img

img

詳細流程能夠用下圖來表示,請結合源碼來理解(一圖勝千言):

img

釋放獨佔鎖

AQS 中使用 release(int arg) 方法釋放獨佔鎖,其大體流程以下:

  1. 先嚐試獲取解鎖線程的同步狀態,若是獲取同步狀態不成功,則結束方法,直接返回。
  2. 若是獲取同步狀態成功,AQS 會嘗試喚醒當前線程節點的後繼節點。
獲取可中斷的獨佔鎖

AQS 中使用 acquireInterruptibly(int arg) 方法獲取可中斷的獨佔鎖。

acquireInterruptibly(int arg) 實現方式相較於獲取獨佔鎖方法( acquire)很是類似,區別僅在於它會經過 Thread.interrupted 檢測當前線程是否被中斷,若是是,則當即拋出中斷異常(InterruptedException)。

獲取超時等待式的獨佔鎖

AQS 中使用 tryAcquireNanos(int arg) 方法獲取超時等待的獨佔鎖。

doAcquireNanos 的實現方式 相較於獲取獨佔鎖方法( acquire)很是類似,區別在於它會根據超時時間和當前時間計算出截止時間。在獲取鎖的流程中,會不斷判斷是否超時,若是超時,直接返回 false;若是沒超時,則用 LockSupport.parkNanos 來阻塞當前線程。

共享鎖的獲取和釋放

獲取共享鎖

AQS 中使用 acquireShared(int arg) 方法獲取共享鎖。

acquireShared 方法和 acquire 方法的邏輯很類似,區別僅在於自旋的條件以及節點出隊的操做有所不一樣。

成功得到共享鎖的條件以下:

  • tryAcquireShared(arg) 返回值大於等於 0 (這意味着共享鎖的 permit 尚未用完)。
  • 當前節點的前驅節點是頭結點。
釋放共享鎖

AQS 中使用 releaseShared(int arg) 方法釋放共享鎖。

releaseShared 首先會嘗試釋放同步狀態,若是成功,則解鎖一個或多個後繼線程節點。釋放共享鎖和釋放獨享鎖流程大致類似,區別在於:

對於獨享模式,若是須要 SIGNAL,釋放僅至關於調用頭節點的 unparkSuccessor

獲取可中斷的共享鎖

AQS 中使用 acquireSharedInterruptibly(int arg) 方法獲取可中斷的共享鎖。

acquireSharedInterruptibly 方法與 acquireInterruptibly 幾乎一致,再也不贅述。

獲取超時等待式的共享鎖

AQS 中使用 tryAcquireSharedNanos(int arg) 方法獲取超時等待式的共享鎖。

tryAcquireSharedNanos 方法與 tryAcquireNanos 幾乎一致,再也不贅述。

3、ReentrantLock

ReentrantLock 類是 Lock 接口的具體實現,它是一個可重入鎖。與內置鎖 synchronized 不一樣,ReentrantLock 提供了一組無條件的、可輪詢的、定時的以及可中斷的鎖操做,全部獲取鎖、釋放鎖的操做都是顯式的操做。

ReentrantLock 的特性

ReentrantLock 的特性以下:

  • ReentrantLock 提供了與 synchronized 相同的互斥性、內存可見性和可重入性
  • ReentrantLock 支持公平鎖和非公平鎖(默認)兩種模式。
  • ReentrantLock 實現了 Lock 接口,支持了 synchronized 所不具有的靈活性
    • synchronized 沒法中斷一個正在等待獲取鎖的線程
    • synchronized 沒法在請求獲取一個鎖時無休止地等待

Lock 的接口定義以下:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
  • lock() - 獲取鎖
  • unlock() - 釋放鎖
  • tryLock() - 嘗試獲取鎖,僅在調用時鎖未被另外一個線程持有的狀況下,才獲取該鎖。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 相似,區別僅在於限定時間,若是限定時間內未獲取到鎖,視爲失敗。
  • lockInterruptibly() - 鎖未被另外一個線程持有,且線程沒有被中斷的狀況下,才能獲取鎖。
  • newCondition() - 返回一個綁定到 Lock 對象上的 Condition 實例。

ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下來,咱們要講述其具體用法。

ReentrantLock 的構造方法

ReentrantLock 有兩個構造方法:

public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默認構造方法會初始化一個非公平鎖(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 會初始化一個公平鎖(FairSync)

lock 和 unlock 方法

  • lock() - 無條件獲取鎖。若是當前線程沒法獲取鎖,則當前線程進入休眠狀態不可用,直至當前線程獲取到鎖。若是該鎖沒有被另外一個線程持有,則獲取該鎖並當即返回,將鎖的持有計數設置爲 1。
  • unlock() - 用於釋放鎖

🔔 注意:請務必牢記,獲取鎖操做 lock() 必須在 try catch 塊中進行,而且將釋放鎖操做 unlock() 放在 finally 塊中進行,以保證鎖必定被被釋放,防止死鎖的發生

示例:ReentrantLock 的基本操做

public class ReentrantLockDemo {

    public static void main(String[] args) {
        Task task = new Task();
        MyThread tA = new MyThread("Thread-A", task);
        MyThread tB = new MyThread("Thread-B", task);
        MyThread tC = new MyThread("Thread-C", task);
        tA.start();
        tB.start();
        tC.start();
    }

    static class MyThread extends Thread {

        private Task task;

        public MyThread(String name, Task task) {
            super(name);
            this.task = task;
        }

        @Override
        public void run() {
            task.execute();
        }

    }

    static class Task {

        private ReentrantLock lock = new ReentrantLock();

        public void execute() {
            lock.lock();
            try {
                for (int i = 0; i < 3; i++) {
                    System.out.println(lock.toString());

                    // 查詢當前線程 hold 住此鎖的次數
                    System.out.println("\t holdCount: " + lock.getHoldCount());

                    // 查詢正等待獲取此鎖的線程數
                    System.out.println("\t queuedLength: " + lock.getQueueLength());

                    // 是否爲公平鎖
                    System.out.println("\t isFair: " + lock.isFair());

                    // 是否被鎖住
                    System.out.println("\t isLocked: " + lock.isLocked());

                    // 是否被當前線程持有鎖
                    System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

    }

}

輸出結果:

java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
     holdCount: 1
     queuedLength: 2
     isFair: false
     isLocked: true
     isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
     holdCount: 1
     queuedLength: 1
     isFair: false
     isLocked: true
     isHeldByCurrentThread: true
// ...

tryLock 方法

與無條件獲取鎖相比,tryLock 有更完善的容錯機制。

  • tryLock() - 可輪詢獲取鎖。若是成功,則返回 true;若是失敗,則返回 false。也就是說,這個方法不管成敗都會當即返回,獲取不到鎖(鎖已被其餘線程獲取)時不會一直等待。
  • tryLock(long, TimeUnit) - 可定時獲取鎖。和 tryLock() 相似,區別僅在於這個方法在獲取不到鎖時會等待必定的時間,在時間期限以內若是還獲取不到鎖,就返回 false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。

示例:ReentrantLocktryLock() 操做

修改上個示例中的 execute() 方法

public void execute() {
    if (lock.tryLock()) {
        try {
            for (int i = 0; i < 3; i++) {
               // 略...
            }
        } finally {
            lock.unlock();
        }
    } else {
        System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
    }
}

示例:ReentrantLocktryLock(long, TimeUnit) 操做

修改上個示例中的 execute() 方法

public void execute() {
    try {
        if (lock.tryLock(2, TimeUnit.SECONDS)) {
            try {
                for (int i = 0; i < 3; i++) {
                    // 略...
                }
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
        }
    } catch (InterruptedException e) {
        System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
        e.printStackTrace();
    }
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中斷獲取鎖。可中斷獲取鎖能夠在得到鎖的同時保持對中斷的響應。可中斷獲取鎖比其它獲取鎖的方式稍微複雜一些,須要兩個 try-catch 塊(若是在獲取鎖的操做中拋出了 InterruptedException ,那麼可使用標準的 try-finally 加鎖模式)。
    • 舉例來講:假設有兩個線程同時經過 lock.lockInterruptibly() 獲取某個鎖時,若線程 A 獲取到了鎖,則線程 B 只能等待。若此時對線程 B 調用 threadB.interrupt() 方法可以中斷線程 B 的等待過程。因爲 lockInterruptibly() 的聲明中拋出了異常,因此 lock.lockInterruptibly() 必須放在 try 塊中或者在調用 lockInterruptibly() 的方法外聲明拋出 InterruptedException

🔔 注意:當一個線程獲取了鎖以後,是不會被 interrupt() 方法中斷的。單獨調用 interrupt() 方法不能中斷正在運行狀態中的線程,只能中斷阻塞狀態中的線程。所以當經過 lockInterruptibly() 方法獲取某個鎖時,若是未獲取到鎖,只有在等待的狀態下,才能夠響應中斷。

示例:ReentrantLocklockInterruptibly() 操做

修改上個示例中的 execute() 方法

public void execute() {
    try {
        lock.lockInterruptibly();

        for (int i = 0; i < 3; i++) {
            // 略...
        }
    } catch (InterruptedException e) {
        System.out.println(Thread.currentThread().getName() + "被中斷");
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

newCondition 方法

newCondition() - 返回一個綁定到 Lock 對象上的 Condition 實例。Condition 的特性和具體方法請閱讀下文 Condition

ReentrantLock 的原理

ReentrantLock 的數據結構

閱讀 ReentrantLock 的源碼,能夠發現它有一個核心字段:

private final Sync sync;
  • sync - 內部抽象類 ReentrantLock.Sync 對象,Sync 繼承自 AQS。它有兩個子類:
  • ReentrantLock.FairSync - 公平鎖。
  • ReentrantLock.NonfairSync - 非公平鎖。

查看源碼能夠發現,ReentrantLock 實現 Lock 接口實際上是調用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的實現,這裏不一一列舉。

ReentrantLock 的獲取鎖和釋放鎖

ReentrantLock 獲取鎖和釋放鎖的接口,從表象看,是調用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的實現;從本質上看,是基於 AQS 的實現。

仔細閱讀源碼很容易發現:

  • void lock() 調用 Sync 的 lock() 方法。
  • void lockInterruptibly() 直接調用 AQS 的 獲取可中斷的獨佔鎖 方法 lockInterruptibly()

  • boolean tryLock() 調用 Sync 的 nonfairTryAcquire()
  • boolean tryLock(long time, TimeUnit unit) 直接調用 AQS 的 獲取超時等待式的獨佔鎖 方法 tryAcquireNanos(int arg, long nanosTimeout)
  • void unlock() 直接調用 AQS 的 釋放獨佔鎖 方法 release(int arg)

直接調用 AQS 接口的方法就再也不贅述了,其原理在 AQS 的原理 中已經用很大篇幅進行過講解。

nonfairTryAcquire 方法源碼以下:

// 公平鎖和非公平鎖都會用這個方法區嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
         // 若是同步狀態爲0,將其設爲 acquires,並設置當前線程爲排它線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

處理流程很簡單:

  • 若是同步狀態爲 0,設置同步狀態設爲 acquires,並設置當前線程爲排它線程,而後返回 true,獲取鎖成功。
  • 若是同步狀態不爲 0 且當前線程爲排它線程,設置同步狀態爲當前狀態值+acquires 值,而後返回 true,獲取鎖成功。
  • 不然,返回 false,獲取鎖失敗。

lock 方法在公平鎖和非公平鎖中的實現:

兩者的區別僅在於申請非公平鎖時,若是同步狀態爲 0,嘗試將其設爲 1,若是成功,直接將當前線程置爲排它線程;不然和公平鎖同樣,調用 AQS 獲取獨佔鎖方法 acquire

// 非公平鎖實現
final void lock() {
    if (compareAndSetState(0, 1))
    // 若是同步狀態爲0,將其設爲1,並設置當前線程爲排它線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
    // 調用 AQS 獲取獨佔鎖方法 acquire
        acquire(1);
}

// 公平鎖實現
final void lock() {
    // 調用 AQS 獲取獨佔鎖方法 acquire
    acquire(1);
}

4、ReentrantReadWriteLock

ReentrantReadWriteLock 類是 ReadWriteLock 接口的具體實現,它是一個可重入的讀寫鎖ReentrantReadWriteLock 維護了一對讀寫鎖,將讀寫鎖分開,有利於提升併發效率

ReentrantLock 實現了一種標準的互斥鎖:每次最多隻有一個線程能持有 ReentrantLock。但對於維護數據的完整性來講,互斥一般是一種過於強硬的加鎖策略,所以也就沒必要要地限制了併發性。大多數場景下,讀操做比寫操做頻繁,只要保證每一個線程都能讀取到最新數據,而且在讀數據時不會有其它線程在修改數據,那麼就不會出現線程安全問題。這種策略減小了互斥同步,天然也提高了併發性能,ReentrantReadWriteLock 就是這種策略的具體實現。

ReentrantReadWriteLock 的特性

ReentrantReadWriteLock 的特性以下:

  • ReentrantReadWriteLock 適用於讀多寫少的場景。若是是寫多讀少的場景,因爲 ReentrantReadWriteLock 其內部實現比 ReentrantLock 複雜,性能可能反而要差一些。若是存在這樣的問題,須要具體問題具體分析。因爲 ReentrantReadWriteLock 的讀寫鎖(ReadLockWriteLock)都實現了 Lock 接口,因此要替換爲 ReentrantLock 也較爲容易。
  • ReentrantReadWriteLock 實現了 ReadWriteLock 接口,支持了 ReentrantLock 所不具有的讀寫鎖分離。ReentrantReadWriteLock 維護了一對讀寫鎖(ReadLockWriteLock)。將讀寫鎖分開,有利於提升併發效率。ReentrantReadWriteLock 的加鎖策略是:容許多個讀操做併發執行,但每次只容許一個寫操做
  • ReentrantReadWriteLock 爲讀寫鎖都提供了可重入的加鎖語義。
  • ReentrantReadWriteLock 支持公平鎖和非公平鎖(默認)兩種模式。

ReadWriteLock 接口定義以下:

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}
  • readLock - 返回用於讀操做的鎖(ReadLock)。
  • writeLock - 返回用於寫操做的鎖(WriteLock)。

在讀寫鎖和寫入鎖之間的交互能夠採用多種實現方式,ReadWriteLock 的一些可選實現包括:

  • 釋放優先 - 當一個寫入操做釋放寫鎖,而且隊列中同時存在讀線程和寫線程,那麼應該優先選擇讀線程、寫線程,仍是最早發出請求的線程?
  • 讀線程插隊 - 若是鎖是由讀線程持有,但有寫線程正在等待,那麼新到達的讀線程可否當即得到訪問權,仍是應該在寫線程後面等待?若是容許讀線程插隊到寫線程以前,那麼將提升併發性,但可能形成線程飢餓問題。
  • 重入性 - 讀鎖和寫鎖是不是可重入的?
  • 降級 - 若是一個線程持有寫入鎖,那麼它可否在不釋放該鎖的狀況下得到讀鎖?這可能會使得寫鎖被降級爲讀鎖,同時不容許其餘寫線程修改被保護的資源。
  • 升級 - 讀鎖可否優先於其餘正在等待的讀線程和寫線程而升級爲一個寫鎖?在大多數的讀寫鎖實現中並不支持升級,由於若是沒有顯式的升級操做,那麼很容易形成死鎖。

ReentrantReadWriteLock 的用法

前文了解了 ReentrantReadWriteLock 的特性,接下來,咱們要講述其具體用法。

ReentrantReadWriteLock 的構造方法

ReentrantReadWriteLockReentrantLock 同樣,也有兩個構造方法,且用法類似。

public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
  • ReentrantReadWriteLock() - 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中,線程得到鎖的順序是不肯定的。寫線程降級爲讀線程是能夠的,但讀線程升級爲寫線程是不能夠的(這樣會致使死鎖)。
  • ReentrantReadWriteLock(boolean) - new ReentrantLock(true) 會初始化一個公平鎖(FairSync)。對於公平鎖,等待時間最長的線程將優先得到鎖。若是這個鎖是讀線程持有,則另外一個線程請求寫鎖,那麼其餘讀線程都不能得到讀鎖,直到寫線程釋放寫鎖。

ReentrantReadWriteLock 的使用實例

ReentrantReadWriteLock 的特性 中已經介紹過,ReentrantReadWriteLock 的讀寫鎖(ReadLockWriteLock)都實現了 Lock 接口,因此其各自獨立的使用方式與 ReentrantLock 同樣,這裏再也不贅述。

ReentrantReadWriteLockReentrantLock 用法上的差別,主要在於讀寫鎖的配合使用。本文以一個典型使用場景來進行講解。

示例:基於 ReentrantReadWriteLock 實現一個簡單的本地緩存

/**
 * 簡單的無界緩存實現
 * <p>
 * 使用 WeakHashMap 存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。
 */
static class UnboundedCache<K, V> {

    private final Map<K, V> cacheMap = new WeakHashMap<>();

    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();

    public V get(K key) {
        cacheLock.readLock().lock();
        V value;
        try {
            value = cacheMap.get(key);
            String log = String.format("%s 讀數據 %s:%s", Thread.currentThread().getName(), key, value);
            System.out.println(log);
        } finally {
            cacheLock.readLock().unlock();
        }
        return value;
    }

    public V put(K key, V value) {
        cacheLock.writeLock().lock();
        try {
            cacheMap.put(key, value);
            String log = String.format("%s 寫入數據 %s:%s", Thread.currentThread().getName(), key, value);
            System.out.println(log);
        } finally {
            cacheLock.writeLock().unlock();
        }
        return value;
    }

    public V remove(K key) {
        cacheLock.writeLock().lock();
        try {
            return cacheMap.remove(key);
        } finally {
            cacheLock.writeLock().unlock();
        }
    }

    public void clear() {
        cacheLock.writeLock().lock();
        try {
            this.cacheMap.clear();
        } finally {
            cacheLock.writeLock().unlock();
        }
    }

}

說明:

  • 使用 WeakHashMap 而不是 HashMap 來存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。
  • Map 寫數據前加寫鎖,寫完後,釋放寫鎖。
  • Map 讀數據前加讀鎖,讀完後,釋放讀鎖。

測試其線程安全性:

/**
 * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
 * @since 2020-01-01
 */
public class ReentrantReadWriteLockDemo {

    static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new MyThread());
            cache.get(0);
        }
        executorService.shutdown();
    }

    /** 線程任務每次向緩存中寫入 3 個隨機值,key 固定 */
    static class MyThread implements Runnable {

        @Override
        public void run() {
            Random random = new Random();
            for (int i = 0; i < 3; i++) {
                cache.put(i, random.nextInt(100));
            }
        }

    }

}

說明:示例中,經過線程池啓動 20 個併發任務。任務每次向緩存中寫入 3 個隨機值,key 固定;而後主線程每次固定讀取緩存中第一個 key 的值。

輸出結果:

main 讀數據 0:null
pool-1-thread-1 寫入數據 0:16
pool-1-thread-1 寫入數據 1:58
pool-1-thread-1 寫入數據 2:50
main 讀數據 0:16
pool-1-thread-1 寫入數據 0:85
pool-1-thread-1 寫入數據 1:76
pool-1-thread-1 寫入數據 2:46
pool-1-thread-2 寫入數據 0:21
pool-1-thread-2 寫入數據 1:41
pool-1-thread-2 寫入數據 2:63
main 讀數據 0:21
main 讀數據 0:21
// ...

ReentrantReadWriteLock 的原理

前面瞭解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。

ReentrantReadWriteLock 的數據結構

閱讀 ReentrantReadWriteLock 的源碼,能夠發現它有三個核心字段:

/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
  • sync - 內部類 ReentrantReadWriteLock.Sync 對象。與 ReentrantLock 相似,它有兩個子類:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分別表示公平鎖和非公平鎖的實現。
  • readerLock - 內部類 ReentrantReadWriteLock.ReadLock 對象,這是一把讀鎖。
  • writerLock - 內部類 ReentrantReadWriteLock.WriteLock 對象,這是一把寫鎖。

ReentrantReadWriteLock 的獲取鎖和釋放鎖

public static class ReadLock implements Lock, java.io.Serializable {

    // 調用 AQS 獲取共享鎖方法
    public void lock() {
        sync.acquireShared(1);
    }

    // 調用 AQS 釋放共享鎖方法
    public void unlock() {
        sync.releaseShared(1);
    }
}

public static class WriteLock implements Lock, java.io.Serializable {

    // 調用 AQS 獲取獨佔鎖方法
    public void lock() {
        sync.acquire(1);
    }

    // 調用 AQS 釋放獨佔鎖方法
    public void unlock() {
        sync.release(1);
    }
}

5、Condition

前文中提過 Lock 接口中 有一個 newCondition() 方法用於返回一個綁定到 Lock 對象上的 Condition 實例。Condition 是什麼?有什麼做用?本節將一一講解。

在單線程中,一段代碼的執行可能依賴於某個狀態,若是不知足狀態條件,代碼就不會被執行(典型的場景,如:if ... else ...)。在併發環境中,當一個線程判斷某個狀態條件時,其狀態多是因爲其餘線程的操做而改變,這時就須要有必定的協調機制來確保在同一時刻,數據只能被一個線程鎖修改,且修改的數據狀態被全部線程所感知。

Java 1.5 以前,主要是利用 Object 類中的 waitnotifynotifyAll 配合 synchronized 來進行線程間通訊(若是不瞭解其特性,能夠參考:Java 線程基礎 - wait/notify/notifyAll)。

waitnotifynotifyAll 須要配合 synchronized 使用,不適用於 Lock。而使用 Lock 的線程,彼此間通訊應該使用 Condition 。這能夠理解爲,什麼樣的鎖配什麼樣的鑰匙。內置鎖(synchronized)配合內置條件隊列(waitnotifynotifyAll ),顯式鎖(Lock)配合顯式條件隊列(Condition

Condition 的特性

Condition 接口定義以下:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相對應,功能也類似。除此之外,Condition 相比內置條件隊列( waitnotifynotifyAll ),提供了更爲豐富的功能:

  • 每一個鎖(Lock)上能夠存在多個 Condition,這意味着鎖的狀態條件能夠有多個。
  • 支持公平的或非公平的隊列操做。
  • 支持可中斷的條件等待,相關方法:awaitUninterruptibly()
  • 支持可定時的等待,相關方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

這裏以 Condition 來實現一個消費者、生產者模式。

🔔 注意:事實上,解決此類問題使用 CountDownLatchSemaphore 等工具更爲便捷、安全。想了解詳情,能夠參考 Java 併發工具類

產品類

class Message {

    private final Lock lock = new ReentrantLock();

    private final Condition producedMsg = lock.newCondition();

    private final Condition consumedMsg = lock.newCondition();

    private String message;

    private boolean state;

    private boolean end;

    public void consume() {
        //lock
        lock.lock();
        try {
            // no new message wait for new message
            while (!state) { producedMsg.await(); }

            System.out.println("consume message : " + message);
            state = false;
            // message consumed, notify waiting thread
            consumedMsg.signal();
        } catch (InterruptedException ie) {
            System.out.println("Thread interrupted - viewMessage");
        } finally {
            lock.unlock();
        }
    }

    public void produce(String message) {
        lock.lock();
        try {
            // last message not consumed, wait for it be consumed
            while (state) { consumedMsg.await(); }

            System.out.println("produce msg: " + message);
            this.message = message;
            state = true;
            // new message added, notify waiting thread
            producedMsg.signal();
        } catch (InterruptedException ie) {
            System.out.println("Thread interrupted - publishMessage");
        } finally {
            lock.unlock();
        }
    }

    public boolean isEnd() {
        return end;
    }

    public void setEnd(boolean end) {
        this.end = end;
    }

}

消費者

class MessageConsumer implements Runnable {

    private Message message;

    public MessageConsumer(Message msg) {
        message = msg;
    }

    @Override
    public void run() {
        while (!message.isEnd()) { message.consume(); }
    }

}

生產者

class MessageProducer implements Runnable {

    private Message message;

    public MessageProducer(Message msg) {
        message = msg;
    }

    @Override
    public void run() {
        produce();
    }

    public void produce() {
        List<String> msgs = new ArrayList<>();
        msgs.add("Begin");
        msgs.add("Msg1");
        msgs.add("Msg2");

        for (String msg : msgs) {
            message.produce(msg);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        message.produce("End");
        message.setEnd(true);
    }

}

測試

public class LockConditionDemo {

    public static void main(String[] args) {
        Message msg = new Message();
        Thread producer = new Thread(new MessageProducer(msg));
        Thread consumer = new Thread(new MessageConsumer(msg));
        producer.start();
        consumer.start();
    }
}

參考資料

相關文章
相關標籤/搜索