深刻理解JAVA併發鎖

深刻理解 Java 併發鎖

1. 併發鎖簡介

確保線程安全最多見的作法是利用鎖機制(Locksychronized)來對共享數據作互斥同步,這樣在同一個時刻,只有一個線程能夠執行某個方法或者某個代碼塊,那麼操做必然是原子性的,線程安全的。html

在工做、面試中,常常會聽到各類五花八門的鎖,聽的人云裏霧裏。鎖的概念術語不少,它們是針對不一樣的問題所提出的,經過簡單的梳理,也不難理解。java

1.1. 可重入鎖

可重入鎖,顧名思義,指的是線程能夠重複獲取同一把鎖。即同一個線程在外層方法獲取了鎖,在進入內層方法會自動獲取鎖。面試

可重入鎖能夠在必定程度上避免死鎖數據庫

  • ReentrantLockReentrantReadWriteLock 是可重入鎖。這點,從其命名也不難看出。
  • synchronized 也是一個可重入鎖

【示例】synchronized 的可重入示例編程

synchronized void setA() throws Exception{
    Thread.sleep(1000);
    setB();
}

synchronized void setB() throws Exception{
    Thread.sleep(1000);
}

上面的代碼就是一個典型場景:若是使用的鎖不是可重入鎖的話,setB 可能不會被當前線程執行,從而形成死鎖。數組

【示例】ReentrantLock 的可重入示例緩存

class Task {

    private int value;
    private final Lock lock = new ReentrantLock();

    public Task() {
        this.value = 0;
    }

    public int get() {
        // 獲取鎖
        lock.lock();
        try {
            return value;
        } finally {
            // 保證鎖能釋放
            lock.unlock();
        }
    }

    public void addOne() {
        // 獲取鎖
        lock.lock();
        try {
            // 注意:此處已經成功獲取鎖,進入 get 方法後,又嘗試獲取鎖,
            // 若是鎖不是可重入的,會致使死鎖
            value = 1 + get();
        } finally {
            // 保證鎖能釋放
            lock.unlock();
        }
    }

}

1.2. 公平鎖與非公平鎖

  • 公平鎖 - 公平鎖是指 多線程按照申請鎖的順序來獲取鎖
  • 非公平鎖 - 非公平鎖是指 多線程不按照申請鎖的順序來獲取鎖 。這就可能會出現優先級反轉(後來者居上)或者飢餓現象(某線程老是搶不過別的線程,致使始終沒法執行)。

公平鎖爲了保證線程申請順序,勢必要付出必定的性能代價,所以其吞吐量通常低於非公平鎖。安全

公平鎖與非公平鎖 在 Java 中的典型實現:數據結構

  • synchronized 只支持非公平鎖
  • ReentrantLockReentrantReadWriteLock,默認是非公平鎖,但支持公平鎖

1.3. 獨享鎖與共享鎖

獨享鎖與共享鎖是一種廣義上的說法,從實際用途上來看,也常被稱爲互斥鎖與讀寫鎖。多線程

  • 獨享鎖 - 獨享鎖是指 鎖一次只能被一個線程所持有
  • 共享鎖 - 共享鎖是指 鎖可被多個線程所持有

獨享鎖與共享鎖在 Java 中的典型實現:

  • synchronizedReentrantLock 只支持獨享鎖
  • ReentrantReadWriteLock 其寫鎖是獨享鎖,其讀鎖是共享鎖。讀鎖是共享鎖使得併發讀是很是高效的,讀寫,寫讀 ,寫寫的過程是互斥的。

1.4. 悲觀鎖與樂觀鎖

樂觀鎖與悲觀鎖不是指具體的什麼類型的鎖,而是處理併發同步的策略

  • 悲觀鎖 - 悲觀鎖對於併發採起悲觀的態度,認爲:不加鎖的併發操做必定會出問題悲觀鎖適合寫操做頻繁的場景
  • 樂觀鎖 - 樂觀鎖對於併發採起樂觀的態度,認爲:不加鎖的併發操做也沒什麼問題。對於同一個數據的併發操做,是不會發生修改的。在更新數據的時候,會採用不斷嘗試更新的方式更新數據。樂觀鎖適合讀多寫少的場景

悲觀鎖與樂觀鎖在 Java 中的典型實現:

  • 悲觀鎖在 Java 中的應用就是經過使用 synchronizedLock 顯示加鎖來進行互斥同步,這是一種阻塞同步。

  • 樂觀鎖在 Java 中的應用就是採用 CAS 機制(CAS 操做經過 Unsafe 類提供,但這個類不直接暴露爲 API,因此都是間接使用,如各類原子類)。

1.5. 偏向鎖、輕量級鎖、重量級鎖

所謂輕量級鎖與重量級鎖,指的是鎖控制粒度的粗細。顯然,控制粒度越細,阻塞開銷越小,併發性也就越高。

Java 1.6 之前,重量級鎖通常指的是 synchronized ,而輕量級鎖指的是 volatile

Java 1.6 之後,針對 synchronized 作了大量優化,引入 4 種鎖狀態: 無鎖狀態、偏向鎖、輕量級鎖和重量級鎖。鎖能夠單向的從偏向鎖升級到輕量級鎖,再從輕量級鎖升級到重量級鎖 。

  • 偏向鎖 - 偏向鎖是指一段同步代碼一直被一個線程所訪問,那麼該線程會自動獲取鎖。下降獲取鎖的代價。

  • 輕量級鎖 - 是指當鎖是偏向鎖的時候,被另外一個線程所訪問,偏向鎖就會升級爲輕量級鎖,其餘線程會經過自旋的形式嘗試獲取鎖,不會阻塞,提升性能。

  • 重量級鎖 - 是指當鎖爲輕量級鎖的時候,另外一個線程雖然是自旋,但自旋不會一直持續下去,當自旋必定次數的時候,尚未獲取到鎖,就會進入阻塞,該鎖膨脹爲重量級鎖。重量級鎖會讓其餘申請的線程進入阻塞,性能下降。

1.6. 分段鎖

分段鎖實際上是一種鎖的設計,並非具體的一種鎖。所謂分段鎖,就是把鎖的對象分紅多段,每段獨立控制,使得鎖粒度更細,減小阻塞開銷,從而提升併發性。這其實很好理解,就像高速公路上的收費站,若是隻有一個收費口,那全部的車只能排成一條隊繳費;若是有多個收費口,就能夠分流了。

Hashtable 使用 synchronized 修飾方法來保證線程安全性,那麼面對線程的訪問,Hashtable 就會鎖住整個對象,全部的其它線程只能等待,這種阻塞方式的吞吐量顯然很低。

Java 1.7 之前的 ConcurrentHashMap 就是分段鎖的典型案例。ConcurrentHashMap 維護了一個 Segment 數組,通常稱爲分段桶。

final Segment<K,V>[] segments;

當有線程訪問 ConcurrentHashMap 的數據時,ConcurrentHashMap 會先根據 hashCode 計算出數據在哪一個桶(即哪一個 Segment),而後鎖住這個 Segment

1.7. 顯示鎖和內置鎖

Java 1.5 以前,協調對共享對象的訪問時可使用的機制只有 synchronizedvolatile。這兩個都屬於內置鎖,即鎖的申請和釋放都是由 JVM 所控制。

Java 1.5 以後,增長了新的機制:ReentrantLockReentrantReadWriteLock ,這類鎖的申請和釋放均可以由程序所控制,因此常被稱爲顯示鎖。

注意:若是不須要 ReentrantLockReentrantReadWriteLock 所提供的高級同步特性,應該優先考慮使用 synchronized 。理由以下:

  • Java 1.6 之後,synchronized 作了大量的優化,其性能已經與 ReentrantLockReentrantReadWriteLock 基本上持平。
  • 從趨勢來看,Java 將來更可能會優化 synchronized ,而不是 ReentrantLockReentrantReadWriteLock ,由於 synchronized 是 JVM 內置屬性,它能執行一些優化。
  • ReentrantLockReentrantReadWriteLock 申請和釋放鎖都是由程序控制,若是使用不當,可能形成死鎖,這是很危險的。

如下對比一下顯示鎖和內置鎖的差別:

  • 主動獲取鎖和釋放鎖
    • synchronized 不能主動獲取鎖和釋放鎖。獲取鎖和釋放鎖都是 JVM 控制的。
    • ReentrantLock 能夠主動獲取鎖和釋放鎖。(若是忘記釋放鎖,就可能產生死鎖)。
  • 響應中斷
    • synchronized 不能響應中斷。
    • ReentrantLock 能夠響應中斷。
  • 超時機制
    • synchronized 沒有超時機制。
    • ReentrantLock 有超時機制。ReentrantLock 能夠設置超時時間,超時後自動釋放鎖,避免一直等待。
  • 支持公平鎖
    • synchronized 只支持非公平鎖。
    • ReentrantLock 支持非公平鎖和公平鎖。
  • 是否支持共享
    • synchronized 修飾的方法或代碼塊,只能被一個線程訪問(獨享)。若是這個線程被阻塞,其餘線程也只能等待
    • ReentrantLock 能夠基於 Condition 靈活的控制同步條件。
  • 是否支持讀寫分離
    • synchronized 不支持讀寫鎖分離;
    • ReentrantReadWriteLock 支持讀寫鎖,從而使阻塞讀寫的操做分開,有效提升併發性。

2. Lock 和 Condition

2.1. 爲什麼引入 Lock 和 Condition

併發編程領域,有兩大核心問題:一個是互斥,即同一時刻只容許一個線程訪問共享資源;另外一個是同步,即線程之間如何通訊、協做。這兩大問題,管程都是可以解決的。Java SDK 併發包經過 Lock 和 Condition 兩個接口來實現管程,其中 Lock 用於解決互斥問題,Condition 用於解決同步問題

synchronized 是管程的一種實現,既然如此,何須再提供 Lock 和 Condition。

JDK 1.6 之前,synchronized 尚未作優化,性能遠低於 Lock。可是,性能不是引入 Lock 的最重要因素。真正關鍵在於:synchronized 使用不當,可能會出現死鎖。

synchronized 沒法經過破壞不可搶佔條件來避免死鎖。緣由是 synchronized 申請資源的時候,若是申請不到,線程直接進入阻塞狀態了,而線程進入阻塞狀態,啥都幹不了,也釋放不了線程已經佔有的資源。

與內置鎖 synchronized 不一樣的是,Lock 提供了一組無條件的、可輪詢的、定時的以及可中斷的鎖操做,全部獲取鎖、釋放鎖的操做都是顯式的操做。

  • 可以響應中斷。synchronized 的問題是,持有鎖 A 後,若是嘗試獲取鎖 B 失敗,那麼線程就進入阻塞狀態,一旦發生死鎖,就沒有任何機會來喚醒阻塞的線程。但若是阻塞狀態的線程可以響應中斷信號,也就是說當咱們給阻塞的線程發送中斷信號的時候,可以喚醒它,那它就有機會釋放曾經持有的鎖 A。這樣就破壞了不可搶佔條件了。
  • 支持超時。若是線程在一段時間以內沒有獲取到鎖,不是進入阻塞狀態,而是返回一個錯誤,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶佔條件。
  • 非阻塞地獲取鎖。若是嘗試獲取鎖失敗,並不進入阻塞狀態,而是直接返回,那這個線程也有機會釋放曾經持有的鎖。這樣也能破壞不可搶佔條件。

2.2. Lock 接口

Lock 的接口定義以下:

public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}
  • lock() - 獲取鎖。
  • unlock() - 釋放鎖。
  • tryLock() - 嘗試獲取鎖,僅在調用時鎖未被另外一個線程持有的狀況下,才獲取該鎖。
  • tryLock(long time, TimeUnit unit) - 和 tryLock() 相似,區別僅在於限定時間,若是限定時間內未獲取到鎖,視爲失敗。
  • lockInterruptibly() - 鎖未被另外一個線程持有,且線程沒有被中斷的狀況下,才能獲取鎖。
  • newCondition() - 返回一個綁定到 Lock 對象上的 Condition 實例。

2.3. Condition

Condition 實現了管程模型裏面的條件變量

前文中提過 Lock 接口中 有一個 newCondition() 方法用於返回一個綁定到 Lock 對象上的 Condition 實例。Condition 是什麼?有什麼做用?本節將一一講解。

在單線程中,一段代碼的執行可能依賴於某個狀態,若是不知足狀態條件,代碼就不會被執行(典型的場景,如:if ... else ...)。在併發環境中,當一個線程判斷某個狀態條件時,其狀態多是因爲其餘線程的操做而改變,這時就須要有必定的協調機制來確保在同一時刻,數據只能被一個線程鎖修改,且修改的數據狀態被全部線程所感知。

Java 1.5 以前,主要是利用 Object 類中的 waitnotifynotifyAll 配合 synchronized 來進行線程間通訊 。

waitnotifynotifyAll 須要配合 synchronized 使用,不適用於 Lock。而使用 Lock 的線程,彼此間通訊應該使用 Condition 。這能夠理解爲,什麼樣的鎖配什麼樣的鑰匙。內置鎖(synchronized)配合內置條件隊列(waitnotifynotifyAll ),顯式鎖(Lock)配合顯式條件隊列(Condition

Condition 的特性

Condition 接口定義以下:

public interface Condition {
    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

其中,awaitsignalsignalAllwaitnotifynotifyAll 相對應,功能也類似。除此之外,Condition 相比內置條件隊列( waitnotifynotifyAll ),提供了更爲豐富的功能:

  • 每一個鎖(Lock)上能夠存在多個 Condition,這意味着鎖的狀態條件能夠有多個。
  • 支持公平的或非公平的隊列操做。
  • 支持可中斷的條件等待,相關方法:awaitUninterruptibly()
  • 支持可定時的等待,相關方法:awaitNanos(long)await(long, TimeUnit)awaitUntil(Date)

Condition 的用法

這裏以 Condition 來實現一個消費者、生產者模式。

產品類

class Message {

    private final Lock lock = new ReentrantLock();

    private final Condition producedMsg = lock.newCondition();

    private final Condition consumedMsg = lock.newCondition();

    private String message;

    private boolean state;

    private boolean end;

    public void consume() {
        //lock
        lock.lock();
        try {
            // no new message wait for new message
            while (!state) { producedMsg.await(); }

            System.out.println("consume message : " + message);
            state = false;
            // message consumed, notify waiting thread
            consumedMsg.signal();
        } catch (InterruptedException ie) {
            System.out.println("Thread interrupted - viewMessage");
        } finally {
            lock.unlock();
        }
    }

    public void produce(String message) {
        lock.lock();
        try {
            // last message not consumed, wait for it be consumed
            while (state) { consumedMsg.await(); }

            System.out.println("produce msg: " + message);
            this.message = message;
            state = true;
            // new message added, notify waiting thread
            producedMsg.signal();
        } catch (InterruptedException ie) {
            System.out.println("Thread interrupted - publishMessage");
        } finally {
            lock.unlock();
        }
    }

    public boolean isEnd() {
        return end;
    }

    public void setEnd(boolean end) {
        this.end = end;
    }

}

消費者

class MessageConsumer implements Runnable {

    private Message message;

    public MessageConsumer(Message msg) {
        message = msg;
    }

    @Override
    public void run() {
        while (!message.isEnd()) { message.consume(); }
    }

}

生產者

class MessageProducer implements Runnable {

    private Message message;

    public MessageProducer(Message msg) {
        message = msg;
    }

    @Override
    public void run() {
        produce();
    }

    public void produce() {
        List<String> msgs = new ArrayList<>();
        msgs.add("Begin");
        msgs.add("Msg1");
        msgs.add("Msg2");

        for (String msg : msgs) {
            message.produce(msg);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        message.produce("End");
        message.setEnd(true);
    }

}

測試

public class LockConditionDemo {

    public static void main(String[] args) {
        Message msg = new Message();
        Thread producer = new Thread(new MessageProducer(msg));
        Thread consumer = new Thread(new MessageConsumer(msg));
        producer.start();
        consumer.start();
    }
}

3. ReentrantLock

ReentrantLock 類是 Lock 接口的具體實現,與內置鎖 synchronized 相同的是,它是一個可重入鎖

3.1. ReentrantLock 的特性

ReentrantLock 的特性以下:

  • ReentrantLock 提供了與 synchronized 相同的互斥性、內存可見性和可重入性
  • ReentrantLock 支持公平鎖和非公平鎖(默認)兩種模式。
  • ReentrantLock 實現了 Lock 接口,支持了 synchronized 所不具有的靈活性
    • synchronized 沒法中斷一個正在等待獲取鎖的線程
    • synchronized 沒法在請求獲取一個鎖時無休止地等待

3.2. ReentrantLock 的用法

前文了解了 ReentrantLock 的特性,接下來,咱們要講述其具體用法。

ReentrantLock 的構造方法

ReentrantLock 有兩個構造方法:

public ReentrantLock() {}
public ReentrantLock(boolean fair) {}
  • ReentrantLock() - 默認構造方法會初始化一個非公平鎖(NonfairSync)
  • ReentrantLock(boolean) - new ReentrantLock(true) 會初始化一個公平鎖(FairSync)

lock 和 unlock 方法

  • lock() - 無條件獲取鎖。若是當前線程沒法獲取鎖,則當前線程進入休眠狀態不可用,直至當前線程獲取到鎖。若是該鎖沒有被另外一個線程持有,則獲取該鎖並當即返回,將鎖的持有計數設置爲 1。
  • unlock() - 用於釋放鎖

:bell: 注意:請務必牢記,獲取鎖操做 lock() 必須在 try catch 塊外進行,而且將釋放鎖操做 unlock() 放在 finally 塊中進行,以保證鎖必定被被釋放,防止死鎖的發生

示例:ReentrantLock 的基本操做

public class ReentrantLockDemo {

    public static void main(String[] args) {
        Task task = new Task();
        MyThread tA = new MyThread("Thread-A", task);
        MyThread tB = new MyThread("Thread-B", task);
        MyThread tC = new MyThread("Thread-C", task);
        tA.start();
        tB.start();
        tC.start();
    }

    static class MyThread extends Thread {

        private Task task;

        public MyThread(String name, Task task) {
            super(name);
            this.task = task;
        }

        @Override
        public void run() {
            task.execute();
        }

    }

    static class Task {

        private ReentrantLock lock = new ReentrantLock();

        public void execute() {
            lock.lock();
            try {
                for (int i = 0; i < 3; i++) {
                    System.out.println(lock.toString());

                    // 查詢當前線程 hold 住此鎖的次數
                    System.out.println("\t holdCount: " + lock.getHoldCount());

                    // 查詢正等待獲取此鎖的線程數
                    System.out.println("\t queuedLength: " + lock.getQueueLength());

                    // 是否爲公平鎖
                    System.out.println("\t isFair: " + lock.isFair());

                    // 是否被鎖住
                    System.out.println("\t isLocked: " + lock.isLocked());

                    // 是否被當前線程持有鎖
                    System.out.println("\t isHeldByCurrentThread: " + lock.isHeldByCurrentThread());

                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

    }

}

輸出結果:

java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-A]
	 holdCount: 1
	 queuedLength: 2
	 isFair: false
	 isLocked: true
	 isHeldByCurrentThread: true
java.util.concurrent.locks.ReentrantLock@64fcd88a[Locked by thread Thread-C]
	 holdCount: 1
	 queuedLength: 1
	 isFair: false
	 isLocked: true
	 isHeldByCurrentThread: true
// ...

tryLock 方法

與無條件獲取鎖相比,tryLock 有更完善的容錯機制。

  • tryLock() - 可輪詢獲取鎖。若是成功,則返回 true;若是失敗,則返回 false。也就是說,這個方法不管成敗都會當即返回,獲取不到鎖(鎖已被其餘線程獲取)時不會一直等待。
  • tryLock(long, TimeUnit) - 可定時獲取鎖。和 tryLock() 相似,區別僅在於這個方法在獲取不到鎖時會等待必定的時間,在時間期限以內若是還獲取不到鎖,就返回 false。若是若是一開始拿到鎖或者在等待期間內拿到了鎖,則返回 true。

示例:ReentrantLocktryLock() 操做

修改上個示例中的 execute() 方法

public void execute() {
    if (lock.tryLock()) {
        try {
            for (int i = 0; i < 3; i++) {
               // 略...
            }
        } finally {
            lock.unlock();
        }
    } else {
        System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
    }
}

示例:ReentrantLocktryLock(long, TimeUnit) 操做

修改上個示例中的 execute() 方法

public void execute() {
    try {
        if (lock.tryLock(2, TimeUnit.SECONDS)) {
            try {
                for (int i = 0; i < 3; i++) {
                    // 略...
                }
            } finally {
                lock.unlock();
            }
        } else {
            System.out.println(Thread.currentThread().getName() + " 獲取鎖失敗");
        }
    } catch (InterruptedException e) {
        System.out.println(Thread.currentThread().getName() + " 獲取鎖超時");
        e.printStackTrace();
    }
}

lockInterruptibly 方法

  • lockInterruptibly() - 可中斷獲取鎖。可中斷獲取鎖能夠在得到鎖的同時保持對中斷的響應。可中斷獲取鎖比其它獲取鎖的方式稍微複雜一些,須要兩個 try-catch 塊(若是在獲取鎖的操做中拋出了 InterruptedException ,那麼可使用標準的 try-finally 加鎖模式)。
    • 舉例來講:假設有兩個線程同時經過 lock.lockInterruptibly() 獲取某個鎖時,若線程 A 獲取到了鎖,則線程 B 只能等待。若此時對線程 B 調用 threadB.interrupt() 方法可以中斷線程 B 的等待過程。因爲 lockInterruptibly() 的聲明中拋出了異常,因此 lock.lockInterruptibly() 必須放在 try 塊中或者在調用 lockInterruptibly() 的方法外聲明拋出 InterruptedException

:bell: 注意:當一個線程獲取了鎖以後,是不會被 interrupt() 方法中斷的。單獨調用 interrupt() 方法不能中斷正在運行狀態中的線程,只能中斷阻塞狀態中的線程。所以當經過 lockInterruptibly() 方法獲取某個鎖時,若是未獲取到鎖,只有在等待的狀態下,才能夠響應中斷。

示例:ReentrantLocklockInterruptibly() 操做

修改上個示例中的 execute() 方法

public void execute() {
    try {
        lock.lockInterruptibly();

        for (int i = 0; i < 3; i++) {
            // 略...
        }
    } catch (InterruptedException e) {
        System.out.println(Thread.currentThread().getName() + "被中斷");
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
}

newCondition 方法

newCondition() - 返回一個綁定到 Lock 對象上的 Condition 實例。

3.3. ReentrantLock 的原理

ReentrantLock 的可見性

class X {
  private final Lock rtl =
  new ReentrantLock();
  int value;
  public void addOne() {
    // 獲取鎖
    rtl.lock();
    try {
      value+=1;
    } finally {
      // 保證鎖能釋放
      rtl.unlock();
    }
  }
}

ReentrantLock,內部持有一個 volatile 的成員變量 state,獲取鎖的時候,會讀寫 state 的值;解鎖的時候,也會讀寫 state 的值(簡化後的代碼以下面所示)。也就是說,在執行 value+=1 以前,程序先讀寫了一次 volatile 變量 state,在執行 value+=1 以後,又讀寫了一次 volatile 變量 state。根據相關的 Happens-Before 規則:

  1. 順序性規則:對於線程 T1,value+=1 Happens-Before 釋放鎖的操做 unlock();
  2. volatile 變量規則:因爲 state = 1 會先讀取 state,因此線程 T1 的 unlock() 操做 Happens-Before 線程 T2 的 lock() 操做;
  3. 傳遞性規則:線程 T1 的 value+=1 Happens-Before 線程 T2 的 lock() 操做。

ReentrantLock 的數據結構

閱讀 ReentrantLock 的源碼,能夠發現它有一個核心字段:

private final Sync sync;
  • sync - 內部抽象類 ReentrantLock.Sync 對象,Sync 繼承自 AQS。它有兩個子類:
  • ReentrantLock.FairSync - 公平鎖。
  • ReentrantLock.NonfairSync - 非公平鎖。

查看源碼能夠發現,ReentrantLock 實現 Lock 接口實際上是調用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的實現,這裏不一一列舉。

ReentrantLock 的獲取鎖和釋放鎖

ReentrantLock 獲取鎖和釋放鎖的接口,從表象看,是調用 ReentrantLock.FairSyncReentrantLock.NonfairSync 中各自的實現;從本質上看,是基於 AQS 的實現。

仔細閱讀源碼很容易發現:

  • void lock() 調用 Sync 的 lock() 方法。

  • void lockInterruptibly() 直接調用 AQS 的 獲取可中斷的獨佔鎖 方法 lockInterruptibly()

  • boolean tryLock() 調用 Sync 的 nonfairTryAcquire()

  • boolean tryLock(long time, TimeUnit unit) 直接調用 AQS 的 獲取超時等待式的獨佔鎖 方法 tryAcquireNanos(int arg, long nanosTimeout)

  • void unlock() 直接調用 AQS 的 釋放獨佔鎖 方法 release(int arg)

直接調用 AQS 接口的方法就再也不贅述了,其原理在 [AQS 的原理](#AQS 的原理) 中已經用很大篇幅進行過講解。

nonfairTryAcquire 方法源碼以下:

// 公平鎖和非公平鎖都會用這個方法區嘗試獲取鎖
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
         // 若是同步狀態爲0,將其設爲 acquires,並設置當前線程爲排它線程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

處理流程很簡單:

  • 若是同步狀態爲 0,設置同步狀態設爲 acquires,並設置當前線程爲排它線程,而後返回 true,獲取鎖成功。
  • 若是同步狀態不爲 0 且當前線程爲排它線程,設置同步狀態爲當前狀態值+acquires 值,而後返回 true,獲取鎖成功。
  • 不然,返回 false,獲取鎖失敗。

公平鎖和非公平鎖

ReentrantLock 這個類有兩個構造函數,一個是無參構造函數,一個是傳入 fair 參數的構造函數。fair 參數表明的是鎖的公平策略,若是傳入 true 就表示須要構造一個公平鎖,反之則表示要構造一個非公平鎖。

鎖都對應着一個等待隊列,若是一個線程沒有得到鎖,就會進入等待隊列,當有線程釋放鎖的時候,就須要從等待隊列中喚醒一個等待的線程。若是是公平鎖,喚醒的策略就是誰等待的時間長,就喚醒誰,很公平;若是是非公平鎖,則不提供這個公平保證,有可能等待時間短的線程反而先被喚醒。

lock 方法在公平鎖和非公平鎖中的實現:

兩者的區別僅在於申請非公平鎖時,若是同步狀態爲 0,嘗試將其設爲 1,若是成功,直接將當前線程置爲排它線程;不然和公平鎖同樣,調用 AQS 獲取獨佔鎖方法 acquire

// 非公平鎖實現
final void lock() {
    if (compareAndSetState(0, 1))
    // 若是同步狀態爲0,將其設爲1,並設置當前線程爲排它線程
        setExclusiveOwnerThread(Thread.currentThread());
    else
    // 調用 AQS 獲取獨佔鎖方法 acquire
        acquire(1);
}

// 公平鎖實現
final void lock() {
    // 調用 AQS 獲取獨佔鎖方法 acquire
    acquire(1);
}

4. ReentrantReadWriteLock

ReadWriteLock 適用於讀多寫少的場景

ReentrantReadWriteLock 類是 ReadWriteLock 接口的具體實現,它是一個可重入的讀寫鎖。ReentrantReadWriteLock 維護了一對讀寫鎖,將讀寫鎖分開,有利於提升併發效率。

讀寫鎖,並非 Java 語言特有的,而是一個廣爲使用的通用技術,全部的讀寫鎖都遵照如下三條基本原則:

  • 容許多個線程同時讀共享變量;
  • 只容許一個線程寫共享變量;
  • 若是一個寫線程正在執行寫操做,此時禁止讀線程讀共享變量。

讀寫鎖與互斥鎖的一個重要區別就是讀寫鎖容許多個線程同時讀共享變量,而互斥鎖是不容許的,這是讀寫鎖在讀多寫少場景下性能優於互斥鎖的關鍵。但讀寫鎖的寫操做是互斥的,當一個線程在寫共享變量的時候,是不容許其餘線程執行寫操做和讀操做。

4.1. ReentrantReadWriteLock 的特性

ReentrantReadWriteLock 的特性以下:

  • ReentrantReadWriteLock 適用於讀多寫少的場景。若是是寫多讀少的場景,因爲 ReentrantReadWriteLock 其內部實現比 ReentrantLock 複雜,性能可能反而要差一些。若是存在這樣的問題,須要具體問題具體分析。因爲 ReentrantReadWriteLock 的讀寫鎖(ReadLockWriteLock)都實現了 Lock 接口,因此要替換爲 ReentrantLock 也較爲容易。
  • ReentrantReadWriteLock 實現了 ReadWriteLock 接口,支持了 ReentrantLock 所不具有的讀寫鎖分離。ReentrantReadWriteLock 維護了一對讀寫鎖(ReadLockWriteLock)。將讀寫鎖分開,有利於提升併發效率。ReentrantReadWriteLock 的加鎖策略是:容許多個讀操做併發執行,但每次只容許一個寫操做
  • ReentrantReadWriteLock 爲讀寫鎖都提供了可重入的加鎖語義。
  • ReentrantReadWriteLock 支持公平鎖和非公平鎖(默認)兩種模式。

ReadWriteLock 接口定義以下:

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}
  • readLock - 返回用於讀操做的鎖(ReadLock)。
  • writeLock - 返回用於寫操做的鎖(WriteLock)。

在讀寫鎖和寫入鎖之間的交互能夠採用多種實現方式,ReadWriteLock 的一些可選實現包括:

  • 釋放優先 - 當一個寫入操做釋放寫鎖,而且隊列中同時存在讀線程和寫線程,那麼應該優先選擇讀線程、寫線程,仍是最早發出請求的線程?
  • 讀線程插隊 - 若是鎖是由讀線程持有,但有寫線程正在等待,那麼新到達的讀線程可否當即得到訪問權,仍是應該在寫線程後面等待?若是容許讀線程插隊到寫線程以前,那麼將提升併發性,但可能形成線程飢餓問題。
  • 重入性 - 讀鎖和寫鎖是不是可重入的?
  • 降級 - 若是一個線程持有寫入鎖,那麼它可否在不釋放該鎖的狀況下得到讀鎖?這可能會使得寫鎖被降級爲讀鎖,同時不容許其餘寫線程修改被保護的資源。
  • 升級 - 讀鎖可否優先於其餘正在等待的讀線程和寫線程而升級爲一個寫鎖?在大多數的讀寫鎖實現中並不支持升級,由於若是沒有顯式的升級操做,那麼很容易形成死鎖。

4.2. ReentrantReadWriteLock 的用法

前文了解了 ReentrantReadWriteLock 的特性,接下來,咱們要講述其具體用法。

ReentrantReadWriteLock 的構造方法

ReentrantReadWriteLockReentrantLock 同樣,也有兩個構造方法,且用法類似。

public ReentrantReadWriteLock() {}
public ReentrantReadWriteLock(boolean fair) {}
  • ReentrantReadWriteLock() - 默認構造方法會初始化一個非公平鎖(NonfairSync)。在非公平的鎖中,線程得到鎖的順序是不肯定的。寫線程降級爲讀線程是能夠的,但讀線程升級爲寫線程是不能夠的(這樣會致使死鎖)。
  • ReentrantReadWriteLock(boolean) - new ReentrantLock(true) 會初始化一個公平鎖(FairSync)。對於公平鎖,等待時間最長的線程將優先得到鎖。若是這個鎖是讀線程持有,則另外一個線程請求寫鎖,那麼其餘讀線程都不能得到讀鎖,直到寫線程釋放寫鎖。

ReentrantReadWriteLock 的使用實例

ReentrantReadWriteLock 的特性 中已經介紹過,ReentrantReadWriteLock 的讀寫鎖(ReadLockWriteLock)都實現了 Lock 接口,因此其各自獨立的使用方式與 ReentrantLock 同樣,這裏再也不贅述。

ReentrantReadWriteLockReentrantLock 用法上的差別,主要在於讀寫鎖的配合使用。本文以一個典型使用場景來進行講解。

【示例】基於 ReadWriteLock 實現一個簡單的泛型無界緩存

/**
 * 簡單的無界緩存實現
 * <p>
 * 使用 WeakHashMap 存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。
 */
static class UnboundedCache<K, V> {

    private final Map<K, V> cacheMap = new WeakHashMap<>();

    private final ReadWriteLock cacheLock = new ReentrantReadWriteLock();

    public V get(K key) {
        cacheLock.readLock().lock();
        V value;
        try {
            value = cacheMap.get(key);
            String log = String.format("%s 讀數據 %s:%s", Thread.currentThread().getName(), key, value);
            System.out.println(log);
        } finally {
            cacheLock.readLock().unlock();
        }
        return value;
    }

    public V put(K key, V value) {
        cacheLock.writeLock().lock();
        try {
            cacheMap.put(key, value);
            String log = String.format("%s 寫入數據 %s:%s", Thread.currentThread().getName(), key, value);
            System.out.println(log);
        } finally {
            cacheLock.writeLock().unlock();
        }
        return value;
    }

    public V remove(K key) {
        cacheLock.writeLock().lock();
        try {
            return cacheMap.remove(key);
        } finally {
            cacheLock.writeLock().unlock();
        }
    }

    public void clear() {
        cacheLock.writeLock().lock();
        try {
            this.cacheMap.clear();
        } finally {
            cacheLock.writeLock().unlock();
        }
    }

}

說明:

  • 使用 WeakHashMap 而不是 HashMap 來存儲鍵值對。WeakHashMap 中存儲的對象是弱引用,JVM GC 時會自動清除沒有被引用的弱引用對象。
  • Map 寫數據前加寫鎖,寫完後,釋放寫鎖。
  • Map 讀數據前加讀鎖,讀完後,釋放讀鎖。

測試其線程安全性:

/**
 * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
 * @since 2020-01-01
 */
public class ReentrantReadWriteLockDemo {

    static UnboundedCache<Integer, Integer> cache = new UnboundedCache<>();

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
            executorService.execute(new MyThread());
            cache.get(0);
        }
        executorService.shutdown();
    }

    /** 線程任務每次向緩存中寫入 3 個隨機值,key 固定 */
    static class MyThread implements Runnable {

        @Override
        public void run() {
            Random random = new Random();
            for (int i = 0; i < 3; i++) {
                cache.put(i, random.nextInt(100));
            }
        }

    }

}

說明:示例中,經過線程池啓動 20 個併發任務。任務每次向緩存中寫入 3 個隨機值,key 固定;而後主線程每次固定讀取緩存中第一個 key 的值。

輸出結果:

main 讀數據 0:null
pool-1-thread-1 寫入數據 0:16
pool-1-thread-1 寫入數據 1:58
pool-1-thread-1 寫入數據 2:50
main 讀數據 0:16
pool-1-thread-1 寫入數據 0:85
pool-1-thread-1 寫入數據 1:76
pool-1-thread-1 寫入數據 2:46
pool-1-thread-2 寫入數據 0:21
pool-1-thread-2 寫入數據 1:41
pool-1-thread-2 寫入數據 2:63
main 讀數據 0:21
main 讀數據 0:21
// ...

4.3. ReentrantReadWriteLock 的原理

前面瞭解了 ReentrantLock 的原理,理解 ReentrantReadWriteLock 就容易多了。

ReentrantReadWriteLock 的數據結構

閱讀 ReentrantReadWriteLock 的源碼,能夠發現它有三個核心字段:

/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
  • sync - 內部類 ReentrantReadWriteLock.Sync 對象。與 ReentrantLock 相似,它有兩個子類:ReentrantReadWriteLock.FairSyncReentrantReadWriteLock.NonfairSync ,分別表示公平鎖和非公平鎖的實現。
  • readerLock - 內部類 ReentrantReadWriteLock.ReadLock 對象,這是一把讀鎖。
  • writerLock - 內部類 ReentrantReadWriteLock.WriteLock 對象,這是一把寫鎖。

ReentrantReadWriteLock 的獲取鎖和釋放鎖

public static class ReadLock implements Lock, java.io.Serializable {

    // 調用 AQS 獲取共享鎖方法
    public void lock() {
        sync.acquireShared(1);
    }

    // 調用 AQS 釋放共享鎖方法
    public void unlock() {
        sync.releaseShared(1);
    }
}

public static class WriteLock implements Lock, java.io.Serializable {

    // 調用 AQS 獲取獨佔鎖方法
    public void lock() {
        sync.acquire(1);
    }

    // 調用 AQS 釋放獨佔鎖方法
    public void unlock() {
        sync.release(1);
    }
}

5. StampedLock

ReadWriteLock 支持兩種模式:一種是讀鎖,一種是寫鎖。而 StampedLock 支持三種模式,分別是:寫鎖悲觀讀鎖樂觀讀。其中,寫鎖、悲觀讀鎖的語義和 ReadWriteLock 的寫鎖、讀鎖的語義很是相似,容許多個線程同時獲取悲觀讀鎖,可是隻容許一個線程獲取寫鎖,寫鎖和悲觀讀鎖是互斥的。不一樣的是:StampedLock 裏的寫鎖和悲觀讀鎖加鎖成功以後,都會返回一個 stamp;而後解鎖的時候,須要傳入這個 stamp。

注意這裏,用的是「樂觀讀」這個詞,而不是「樂觀讀鎖」,是要提醒你,樂觀讀這個操做是無鎖的,因此相比較 ReadWriteLock 的讀鎖,樂觀讀的性能更好一些。

StampedLock 的性能之因此比 ReadWriteLock 還要好,其關鍵是 StampedLock 支持樂觀讀的方式。

  • ReadWriteLock 支持多個線程同時讀,可是當多個線程同時讀的時候,全部的寫操做會被阻塞;
  • 而 StampedLock 提供的樂觀讀,是容許一個線程獲取寫鎖的,也就是說不是全部的寫操做都被阻塞。

對於讀多寫少的場景 StampedLock 性能很好,簡單的應用場景基本上能夠替代 ReadWriteLock,可是StampedLock 的功能僅僅是 ReadWriteLock 的子集,在使用的時候,仍是有幾個地方須要注意一下。

  • StampedLock 不支持重入
  • StampedLock 的悲觀讀鎖、寫鎖都不支持條件變量。
  • 若是線程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上時,此時調用該阻塞線程的 interrupt() 方法,會致使 CPU 飆升。使用 StampedLock 必定不要調用中斷操做,若是須要支持中斷功能,必定使用可中斷的悲觀讀鎖 readLockInterruptibly() 和寫鎖 writeLockInterruptibly()

【示例】StampedLock 阻塞時,調用 interrupt() 致使 CPU 飆升

final StampedLock lock
  = new StampedLock();
Thread T1 = new Thread(()->{
  // 獲取寫鎖
  lock.writeLock();
  // 永遠阻塞在此處,不釋放寫鎖
  LockSupport.park();
});
T1.start();
// 保證 T1 獲取寫鎖
Thread.sleep(100);
Thread T2 = new Thread(()->
  // 阻塞在悲觀讀鎖
  lock.readLock()
);
T2.start();
// 保證 T2 阻塞在讀鎖
Thread.sleep(100);
// 中斷線程 T2
// 會致使線程 T2 所在 CPU 飆升
T2.interrupt();
T2.join();

【示例】StampedLock 讀模板:

final StampedLock sl =
  new StampedLock();

// 樂觀讀
long stamp =
  sl.tryOptimisticRead();
// 讀入方法局部變量
......
// 校驗 stamp
if (!sl.validate(stamp)){
  // 升級爲悲觀讀鎖
  stamp = sl.readLock();
  try {
    // 讀入方法局部變量
    .....
  } finally {
    // 釋放悲觀讀鎖
    sl.unlockRead(stamp);
  }
}
// 使用方法局部變量執行業務操做
......

【示例】StampedLock 寫模板:

long stamp = sl.writeLock();
try {
  // 寫共享變量
  ......
} finally {
  sl.unlockWrite(stamp);
}

6. AQS

AbstractQueuedSynchronizer(簡稱 AQS)是隊列同步器,顧名思義,其主要做用是處理同步。它是併發鎖和不少同步工具類的實現基石(如 ReentrantLockReentrantReadWriteLockCountDownLatchSemaphoreFutureTask 等)。

6.1. AQS 的要點

AQS 提供了對獨享鎖與共享鎖的支持

java.util.concurrent.locks 包中的相關鎖(經常使用的有 ReentrantLockReadWriteLock)都是基於 AQS 來實現。這些鎖都沒有直接繼承 AQS,而是定義了一個 Sync 類去繼承 AQS。爲何要這樣呢?由於鎖面向的是使用用戶,而同步器面向的則是線程控制,那麼在鎖的實現中聚合同步器而不是直接繼承 AQS 就能夠很好的隔離兩者所關注的事情。

6.2. AQS 的應用

AQS 提供了對獨享鎖與共享鎖的支持

獨享鎖 API

獲取、釋放獨享鎖的主要 API 以下:

public final void acquire(int arg)
public final void acquireInterruptibly(int arg)
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
public final boolean release(int arg)
  • acquire - 獲取獨佔鎖。
  • acquireInterruptibly - 獲取可中斷的獨佔鎖。
  • tryAcquireNanos - 嘗試在指定時間內獲取可中斷的獨佔鎖。在如下三種狀況下回返回:
    • 在超時時間內,當前線程成功獲取了鎖;
    • 當前線程在超時時間內被中斷;
    • 超時時間結束,仍未得到鎖返回 false。
  • release - 釋放獨佔鎖。

共享鎖 API

獲取、釋放共享鎖的主要 API 以下:

public final void acquireShared(int arg)
public final void acquireSharedInterruptibly(int arg)
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
public final boolean releaseShared(int arg)
  • acquireShared - 獲取共享鎖。
  • acquireSharedInterruptibly - 獲取可中斷的共享鎖。
  • tryAcquireSharedNanos - 嘗試在指定時間內獲取可中斷的共享鎖。
  • release - 釋放共享鎖。

6.3. AQS 的原理

ASQ 原理要點:

  • AQS 使用一個整型的 volatile 變量來 維護同步狀態。狀態的意義由子類賦予。
  • AQS 維護了一個 FIFO 的雙鏈表,用來存儲獲取鎖失敗的線程。

AQS 圍繞同步狀態提供兩種基本操做「獲取」和「釋放」,並提供一系列判斷和處理方法,簡單說幾點:

  • state 是獨佔的,仍是共享的;
  • state 被獲取後,其餘線程須要等待;
  • state 被釋放後,喚醒等待線程;
  • 線程等不及時,如何退出等待。

至於線程是否能夠得到 state,如何釋放 state,就不是 AQS 關心的了,要由子類具體實現。

AQS 的數據結構

閱讀 AQS 的源碼,能夠發現:AQS 繼承自 AbstractOwnableSynchronize

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** 等待隊列的隊頭,懶加載。只能經過 setHead 方法修改。 */
    private transient volatile Node head;
    /** 等待隊列的隊尾,懶加載。只能經過 enq 方法添加新的等待節點。*/
    private transient volatile Node tail;
    /** 同步狀態 */
    private volatile int state;
}
  • state - AQS 使用一個整型的 volatile 變量來 維護同步狀態
    • 這個整數狀態的意義由子類來賦予,如ReentrantLock 中該狀態值表示全部者線程已經重複獲取該鎖的次數,Semaphore 中該狀態值表示剩餘的許可數量。
  • headtail - AQS 維護了一個 Node 類型(AQS 的內部類)的雙鏈表來完成同步狀態的管理。這個雙鏈表是一個雙向的 FIFO 隊列,經過 headtail 指針進行訪問。當 有線程獲取鎖失敗後,就被添加到隊列末尾

img

再來看一下 Node 的源碼

static final class Node {
    /** 該等待同步的節點處於共享模式 */
    static final Node SHARED = new Node();
    /** 該等待同步的節點處於獨佔模式 */
    static final Node EXCLUSIVE = null;

    /** 線程等待狀態,狀態值有: 0、一、-一、-二、-3 */
    volatile int waitStatus;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 前驅節點 */
    volatile Node prev;
    /** 後繼節點 */
    volatile Node next;
    /** 等待鎖的線程 */
    volatile Thread thread;

  	/** 和節點是否共享有關 */
    Node nextWaiter;
}

很顯然,Node 是一個雙鏈表結構。

  • waitStatus - Node 使用一個整型的 volatile 變量來 維護 AQS 同步隊列中線程節點的狀態。waitStatus 有五個狀態值:
    • CANCELLED(1) - 此狀態表示:該節點的線程可能因爲超時或被中斷而 處於被取消(做廢)狀態,一旦處於這個狀態,表示這個節點應該從等待隊列中移除。
    • SIGNAL(-1) - 此狀態表示:後繼節點會被掛起,所以在當前節點釋放鎖或被取消以後,必須喚醒(unparking)其後繼結點。
    • CONDITION(-2) - 此狀態表示:該節點的線程 處於等待條件狀態,不會被看成是同步隊列上的節點,直到被喚醒(signal),設置其值爲 0,再從新進入阻塞狀態。
    • PROPAGATE(-3) - 此狀態表示:下一個 acquireShared 應無條件傳播。
    • 0 - 非以上狀態。

獨佔鎖的獲取和釋放

獲取獨佔鎖

AQS 中使用 acquire(int arg) 方法獲取獨佔鎖,其大體流程以下:

  1. 先嚐試獲取同步狀態,若是獲取同步狀態成功,則結束方法,直接返回。
  2. 若是獲取同步狀態不成功,AQS 會不斷嘗試利用 CAS 操做將當前線程插入等待同步隊列的隊尾,直到成功爲止。
  3. 接着,不斷嘗試爲等待隊列中的線程節點獲取獨佔鎖。

img

img

詳細流程能夠用下圖來表示,請結合源碼來理解(一圖勝千言):

img

釋放獨佔鎖

AQS 中使用 release(int arg) 方法釋放獨佔鎖,其大體流程以下:

  1. 先嚐試獲取解鎖線程的同步狀態,若是獲取同步狀態不成功,則結束方法,直接返回。
  2. 若是獲取同步狀態成功,AQS 會嘗試喚醒當前線程節點的後繼節點。
獲取可中斷的獨佔鎖

AQS 中使用 acquireInterruptibly(int arg) 方法獲取可中斷的獨佔鎖。

acquireInterruptibly(int arg) 實現方式相較於獲取獨佔鎖方法( acquire)很是類似,區別僅在於它會經過 Thread.interrupted 檢測當前線程是否被中斷,若是是,則當即拋出中斷異常(InterruptedException)。

獲取超時等待式的獨佔鎖

AQS 中使用 tryAcquireNanos(int arg) 方法獲取超時等待的獨佔鎖。

doAcquireNanos 的實現方式 相較於獲取獨佔鎖方法( acquire)很是類似,區別在於它會根據超時時間和當前時間計算出截止時間。在獲取鎖的流程中,會不斷判斷是否超時,若是超時,直接返回 false;若是沒超時,則用 LockSupport.parkNanos 來阻塞當前線程。

共享鎖的獲取和釋放

獲取共享鎖

AQS 中使用 acquireShared(int arg) 方法獲取共享鎖。

acquireShared 方法和 acquire 方法的邏輯很類似,區別僅在於自旋的條件以及節點出隊的操做有所不一樣。

成功得到共享鎖的條件以下:

  • tryAcquireShared(arg) 返回值大於等於 0 (這意味着共享鎖的 permit 尚未用完)。
  • 當前節點的前驅節點是頭結點。
釋放共享鎖

AQS 中使用 releaseShared(int arg) 方法釋放共享鎖。

releaseShared 首先會嘗試釋放同步狀態,若是成功,則解鎖一個或多個後繼線程節點。釋放共享鎖和釋放獨享鎖流程大致類似,區別在於:

對於獨享模式,若是須要 SIGNAL,釋放僅至關於調用頭節點的 unparkSuccessor

獲取可中斷的共享鎖

AQS 中使用 acquireSharedInterruptibly(int arg) 方法獲取可中斷的共享鎖。

acquireSharedInterruptibly 方法與 acquireInterruptibly 幾乎一致,再也不贅述。

獲取超時等待式的共享鎖

AQS 中使用 tryAcquireSharedNanos(int arg) 方法獲取超時等待式的共享鎖。

tryAcquireSharedNanos 方法與 tryAcquireNanos 幾乎一致,再也不贅述。

7. 死鎖

7.1. 什麼是死鎖

死鎖是一種特定的程序狀態,在實體之間,因爲循環依賴致使彼此一直處於等待之中,沒有任何個體能夠繼續前進。死鎖不只僅是在線程之間會發生,存在資源獨佔的進程之間一樣也 可能出現死鎖。一般來講,咱們大可能是聚焦在多線程場景中的死鎖,指兩個或多個線程之間,因爲互相持有對方須要的鎖,而永久處於阻塞的狀態。

7.2. 如何定位死鎖

定位死鎖最多見的方式就是利用 jstack 等工具獲取線程棧,而後定位互相之間的依賴關係,進而找到死鎖。若是是比較明顯的死鎖,每每 jstack 等就能直接定位,相似 JConsole 甚至能夠在圖形界面進行有限的死鎖檢測。

若是咱們是開發本身的管理工具,須要用更加程序化的方式掃描服務進程、定位死鎖,能夠考慮使用 Java 提供的標準管理 API,ThreadMXBean,其直接就提供了 findDeadlockedThreads() 方法用於定位。

7.3. 如何避免死鎖

基本上死鎖的發生是由於:

  • 互斥,相似 Java 中 Monitor 都是獨佔的。
  • 長期保持互斥,在使用結束以前,不會釋放,也不能被其餘線程搶佔。
  • 循環依賴,多個個體之間出現了鎖的循環依賴,彼此依賴上一環釋放鎖。

由此,咱們能夠分析出避免死鎖的思路和方法。

(1)避免一個線程同時獲取多個鎖。

避免一個線程在鎖內同時佔用多個資源,儘可能保證每一個鎖只佔用一個資源。

嘗試使用定時鎖 lock.tryLock(timeout),避免鎖一直不能釋放。

對於數據庫鎖,加鎖和解鎖必須在一個數據庫鏈接中裏,不然會出現解鎖失敗的狀況。

8. 參考資料

關注公衆號:java寶典 a

相關文章
相關標籤/搜索