高併發編程學習(2)——線程通訊詳解

爲得到良好的閱讀體驗,請訪問原文: 傳送門java

前序文章git

- 高併發編程學習(1)——併發基礎 - www.wmyskxz.com/2019/11/26/…github

1、經典的生產者消費者案例

上一篇文章咱們提到一個應用能夠建立多個線程去執行不一樣的任務,若是這些任務之間有着某種關係,那麼線程之間必須可以通訊來協調完成工做。面試

生產者消費者問題(英語:Producer-consumer problem)就是典型的多線程同步案例,它也被稱爲有限緩衝問題(英語:Bounded-buffer problem)。該問題描述了共享固定大小緩衝區的兩個線程——即所謂的「生產者」和「消費者」——在實際運行時會發生的問題。生產者的主要做用是生成必定量的數據放到緩衝區中,而後重複此過程。與此同時,消費者也在緩衝區消耗這些數據。該問題的關鍵就是要保證生產者不會在緩衝區滿時加入數據,消費者也不會在緩衝區中空時消耗數據。(摘自維基百科:生產者消費者問題)編程

  • 注意: 生產者-消費者模式中的內存緩存區的主要功能是數據在多線程間的共享,此外,經過該緩衝區,能夠緩解生產者和消費者的性能差;

準備基礎代碼:無通訊的生產者消費者

咱們來本身編寫一個例子:一個生產者,一個消費者,而且讓他們讓他們使用同一個共享資源,而且咱們指望的是生產者生產一條放到共享資源中,消費者就會對應地消費一條。api

咱們先來模擬一個簡單的共享資源對象:緩存

public class ShareResource {

    private String name;
    private String gender;

    /**
     * 模擬生產者向共享資源對象中存儲數據
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        this.name = name;
        this.gender = gender;
    }

    /**
     * 模擬消費者從共享資源中取出數據
     */
    public void popup() {
        System.out.println(this.name + "-" + this.gender);
    }
}複製代碼

而後來編寫咱們的生產者,使用循環來交替地向共享資源中添加不一樣的數據:安全

public class Producer implements Runnable {

    private ShareResource shareResource;

    public Producer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            if (i % 2 == 0) {
                shareResource.push("鳳姐", "女");
            } else {
                shareResource.push("張三", "男");
            }
        }
    }
}複製代碼

接着讓咱們的消費者不停地消費生產者產生的數據:微信

public class Consumer implements Runnable {

    private ShareResource shareResource;

    public Consumer(ShareResource shareResource) {
        this.shareResource = shareResource;
    }

    @Override
    public void run() {
        for (int i = 0; i < 50; i++) {
            shareResource.popup();
        }
    }
}複製代碼

而後咱們寫一段測試代碼,來看看效果:session

public static void main(String[] args) {
    // 建立生產者和消費者的共享資源對象
    ShareResource shareResource = new ShareResource();
    // 啓動生產者線程
    new Thread(new Producer(shareResource)).start();
    // 啓動消費者線程
    new Thread(new Consumer(shareResource)).start();
}複製代碼

咱們運行發現出現了詭異的現象,全部的生產者都彷佛消費到了同一條數據:

張三-男
張三-男
....如下全是張三-男....複製代碼

爲何會出現這樣的狀況呢?照理說,個人生產者在交替地向共享資源中生產數據,消費者也應該交替消費纔對呀..咱們大膽猜想一下,會不會是由於消費者是直接循環了 30 次打印共享資源中的數據,而此時生產者尚未來得及更新共享資源中的數據,消費者就已經連續打印了 30 次了,因此咱們讓消費者消費的時候以及生產者生產的時候都小睡個 10 ms 來緩解消費太快 or 生產太快帶來的影響,也讓現象更明顯一些:

/**
 * 模擬生產者向共享資源對象中存儲數據
 *
 * @param name
 * @param gender
 */
public void push(String name, String gender) {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.name = name;
    this.gender = gender;
}

/**
 * 模擬消費者從共享資源中取出數據
 */
public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}複製代碼

再次運行代碼,發現了出現瞭如下的幾種狀況:

  • 重複消費:消費者連續地出現兩次相同的消費狀況(張三-男/ 張三-男);
  • 性別紊亂:消費者消費到了髒數據(張三-女/ 鳳姐-男);

分析出現問題的緣由

  • 重複消費:咱們先來看看重複消費的問題,當生產者生產出一條數據的時候,消費者正確地消費了一條,可是當消費者再來共享資源中消費的時候,生產者尚未準備好新的一條數據,因此消費者就又消費到老數據了,這其中的根本緣由是生產者和消費者的速率不一致
  • 性別紊亂:再來分析第二種狀況。不一樣於上面的狀況,消費者在消費第二條數據時,生產者也正在生產新的數據,可是尷尬的是,生產者只生產了一半兒(也就是該執行完 this.name = name),也就是尚未來得及給 gender 賦值就被消費者給取走消費了.. 形成這樣狀況的根本緣由是沒有保證生產者生產數據的原子性

解決出現的問題

加鎖解決性別紊亂

咱們先來解決性別紊亂,也就是原子性的問題吧,上一篇文章裏咱們也提到了,對於這樣的原子性操做,解決方法也很簡單:加鎖。稍微改造一下就行了:

/**
 * 模擬生產者向共享資源對象中存儲數據
 *
 * @param name
 * @param gender
 */
synchronized public void push(String name, String gender) {
    this.name = name;
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    this.gender = gender;
}

/**
 * 模擬消費者從共享資源中取出數據
 */
synchronized public void popup() {
    try {
        Thread.sleep(10);
    } catch (InterruptedException ignored) {
    }
    System.out.println(this.name + "-" + this.gender);
}複製代碼

  • 咱們在方法前面都加上了 synchronized 關鍵字,來保證每一次讀取和修改都只能是一個線程,這是由於當 synchronized 修飾在普通同步方法上時,它會自動鎖住當前實例對象,也就是說這樣改造以後讀/ 寫操做同時只能進行其一;
  • 我把 push 方法小睡的代碼改在了賦值 namegender 的中間,以強化驗證原子性操做是否成功,由於若是不是原子性的話,就極可能出現賦值 name 還沒賦值給 gender 就被取走的狀況,小睡一下子是爲了增強這種狀況的出現機率(能夠試着把 synchronized 去掉看看效果);

運行代碼後發現,並無出現性別紊亂的現象了,可是重複消費仍然存在。

等待喚醒機制解決重複消費

咱們指望的是 張三-男鳳姐-女 交替出現,而不是有重複消費的狀況,因此咱們的生產者和消費者之間須要一點溝通,最容易想到的解決方法是,咱們新增長一個標誌位,而後在消費者中使用 while 循環判斷,不知足條件則不消費,條件知足則退出 while 循環,從而完成消費者的工做。

while (value != desire) {
    Thread.sleep(10);
}
doSomething();複製代碼

這樣作的目的就是爲了防止「過快的無效嘗試」,這種方法看似可以實現所需的功能,可是卻存在以下的問題:

  • 1)難以確保及時性。在睡眠時,基本不消耗處理器的資源,可是若是睡得太久,就不能及時發現條件已經變化,也就是及時性難以保證;
  • 2)難以下降開銷。若是下降睡眠的時間,好比休眠 1 毫秒,這樣消費者可以更加迅速地發現條件變化,可是卻可能消耗更多的處理資源,形成了無故的浪費。

以上兩個問題嗎,看似矛盾難以調和,可是 Java 經過內置的等待/ 通知機制可以很好地解決這個矛盾並實現所需的功能。

等待/ 通知機制,是指一個線程 A 調用了對象 O 的 wait() 方法進入等待狀態,而另外一個線程 B 調用了對象 O 的 notifyAll() 方法,線程 A 收到通知後從對象 O 的 wait() 方法返回,進而執行後續操做。上述兩個線程都是經過對象 O 來完成交互的,而對象上的 waitnotify/ notifyAll 的關係就如同開關信號同樣,用來完成等待方和通知方之間的交互工做。

這裏有一個比較奇怪的點是,爲何看起來像是線程之間操做的 waitnotify/ notifyAll 方法會是 Object 類中的方法,而不是 Thread 類中的方法呢?

- 簡單來講:由於 synchronized 中的這把鎖能夠是任意對象,由於要知足任意對象都可以調用,因此屬於 Object 類;

- 專業點說:由於這些方法在操做同步線程時,都必需要標識它們操做線程的鎖,只有同一個鎖上的被等待線程,能夠被同一個鎖上的 notify 喚醒,不能夠對不一樣鎖中的線程進行喚醒。也就是說,等待和喚醒必須是同一個鎖。而鎖能夠是任意對象,因此能夠被任意對象調用的方法是定義在 Object 類中。

好,簡單介紹完等待/ 通知機制,咱們開始改造吧:

public class ShareResource {

    private String name;
    private String gender;
    // 新增長一個標誌位,表示共享資源是否爲空,默認爲 true
    private boolean isEmpty = true;

    /**
     * 模擬生產者向共享資源對象中存儲數據
     *
     * @param name
     * @param gender
     */
    synchronized public void push(String name, String gender) {
        try {
            while (!isEmpty) {
                // 當前共享資源不爲空的時,則等待消費者來消費
                // 使用同步鎖對象來調用,表示當前線程釋放同步鎖,進入等待池,只能被其餘線程所喚醒
                this.wait();
            }
            // 開始生產
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生產結束
            isEmpty = false;
            // 生產結束喚醒一個消費者來消費
            this.notify();
        } catch (Exception ignored) {
        }
    }

    /**
     * 模擬消費者從共享資源中取出數據
     */
    synchronized public void popup() {
        try {
            while (isEmpty) {
                // 爲空則等着生產者進行生產
                // 使用同步鎖對象來調用,表示當前線程釋放同步鎖,進入等待池,只能被其餘線程所喚醒
                this.wait();
            }
            // 消費開始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消費結束
            isEmpty = true;
            // 消費結束喚醒一個生產者去生產
            this.notify();
        } catch (InterruptedException ignored) {
        }
    }
}複製代碼

  • 咱們指望生產者生產一條,而後就去通知消費者消費一條,那麼在生產和消費以前,都須要考慮當前是否須要生產 or 消費,因此咱們新增了一個標誌位來判斷,若是不知足則等待;
  • 被通知後仍然要檢查條件,條件知足,則執行咱們相應的生產 or 消費的邏輯,而後改變條件(這裏是 isEmpty),而且通知全部等待在對象上的線程;
  • 注意:上面的代碼中通知使用的 notify() 方法,這是由於例子中寫死了只有一個消費者和生產者,在實際狀況中建議仍是使用 notifyAll() 方法,這樣多個消費和生產者邏輯也可以保證(能夠本身試一下);

小結

經過初始版本一步步地分析問題和解決問題,咱們就差很少寫出了咱們經典生產者消費者的經典代碼,但一般消費和生產的邏輯是寫在各自的消費者和生產者代碼裏的,這裏我爲了方便閱讀,把他們都抽離到了共享資源上,咱們能夠簡單地再來回顧一下這個消費生產和等待通知的整個過程:

以上就是關於生產者生產一條數據,消費者消費一次的過程了,涉及的一些具體細節咱們下面來講。

2、線程間的通訊方式

等待喚醒機制的替代:Lock 和 Condition

咱們從上面的中看到了 wait()notify() 方法,只能被同步監聽鎖對象來調用,不然就會報出 IllegalMonitorZStateException 的異常,那麼如今問題來了,咱們在上一篇提到的 Lock 機制根本就沒有同步鎖了,也就是沒有自動獲取鎖和自動釋放鎖的概念,由於沒有同步鎖,也就意味着 Lock 機制不能調用 waitnotify 方法,咱們怎麼辦呢?

好在 Java 5 中提供了 Lock 機制的同時也提供了用於 Lock 機制控制通訊的 Condition 接口,若是你們理解了上面說到的 Object.wait()Object.notify() 方法的話,那麼就能很容易地理解 Condition 對象了。

它和 wait()notify() 方法的做用是大體相同的,只不事後者是配合 synchronized 關鍵字使用的,而 Condition 是與重入鎖相關聯的。經過 Lock 接口(重入鎖就實現了這一接口)的 newCondition() 方法能夠生成一個與當前重入鎖綁定的 Condition 實例。利用 Condition 對象,咱們就可讓線程在合適的時間等待,或者在某一個特定的時刻獲得通知,繼續執行。

咱們拿上面的生產者消費者來舉例,修改爲 Lock 和 Condition 代碼以下:

public class ShareResource {

    private String name;
    private String gender;
    // 新增長一個標誌位,表示共享資源是否爲空,默認爲 true
    private boolean isEmpty = true;
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    /**
     * 模擬生產者向共享資源對象中存儲數據
     *
     * @param name
     * @param gender
     */
    public void push(String name, String gender) {
        lock.lock();
        try {
            while (!isEmpty) {
                // 當前共享資源不爲空的時,則等待消費者來消費
                condition.await();
            }
            // 開始生產
            this.name = name;
            Thread.sleep(10);
            this.gender = gender;
            // 生產結束
            isEmpty = false;
            // 生產結束喚醒消費者來消費
            condition.signalAll();
        } catch (Exception ignored) {
        } finally {
            lock.unlock();
        }
    }

    /**
     * 模擬消費者從共享資源中取出數據
     */
    public void popup() {
        lock.lock();
        try {
            while (isEmpty) {
                // 爲空則等着生產者進行生產
                condition.await();
            }
            // 消費開始
            Thread.sleep(10);
            System.out.println(this.name + "-" + this.gender);
            // 消費結束
            isEmpty = true;
            // 消費結束喚醒生產者去生產
            condition.signalAll();
        } catch (InterruptedException ignored) {
        } finally {
            lock.unlock();
        }
    }
}複製代碼

在 JDK 內部,重入鎖和 Condition 對象被普遍地使用,以 ArrayBlockingQueue 爲例,它的 put() 方法實現以下:

/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

// 構造函數,初始化鎖以及對應的 Condition 對象
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            // 等待隊列有足夠的空間
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    // 通知須要 take() 的線程,隊列已有數據
    notEmpty.signal();
}複製代碼

同理,對應的 take() 方法實現以下:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            // 若是隊列爲空,則消費者隊列要等待一個非空的信號
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}複製代碼

容許多個線程同時訪問:信號量(Semaphore)

如下內容摘錄 or 改編自 《實戰 Java 高併發程序設計》 3.1.3 節的內容

信號量爲多線程協做提供了更爲強大的控制方法。廣義上說,信號量是對鎖的擴展,不管是內部鎖 synchronized 仍是重入鎖 ReentrantLock,一次都只容許一個線程訪問一個資源,而信號量卻能夠指定多個線程,同時訪問某一個資源。信號量主要提供瞭如下構造函數:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)        // 第二個參數能夠指定是否公平複製代碼

在構造信號量對象時,必需要指定信號量的准入數,即同時能申請多少個許可。當每一個線程每次只申請一個許可時,這就至關於指定了同時有多少個線程能夠訪問某一個資源。信號量的主要邏輯以下:

public void acquire()
public void acquireUninterruptibly()
public boolean tryAcquire()
public boolean tryAcquire(long timeout, TimeUnit unit)
public void release()複製代碼

  • acquire() 方法嘗試得到一個准入的許可。若沒法得到,則線程會等待,直到有線程釋放一個許可或者當前線程被中斷。
  • acquireUninterruptibly() 方法和 acquire() 方法相似,可是不響應中斷。
  • tryAcquire() 嘗試得到一個許可,若是成功則返回 true,失敗則返回 false,它不會進行等待,當即返回。
  • release() 用於在線程訪問資源結束後,釋放一個許可,以使其餘等待許可的線程能夠進行資源訪問。

在 JDK 的官方 Javadoc 中,就有一個有關信號量使用的簡單實例,有興趣的讀者能夠自行去翻閱一下,這裏給出一個更傻瓜化的例子:

public class SemapDemo implements Runnable {

    final Semaphore semaphore = new Semaphore(5);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            // 模擬耗時操做
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getId() + ":done!");
            semaphore.release();
        } catch (InterruptedException ignore) {
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        final SemapDemo demo = new SemapDemo();
        for (int i = 0; i < 20; i++) {
            executorService.submit(demo);
        }
    }
}複製代碼

執行程序,就會發現系統以 5 個線程爲單位,依次輸出帶有線程 ID 的提示文本。

在實現上,Semaphore 藉助了線程同步框架 AQS(AbstractQueuedSynchornizer),一樣藉助了 AQS 來實現的是 Java 中可重入鎖的實現。AQS 的強大之處在於,你僅僅須要繼承它,而後使用它提供的 api 就能夠實現任意複雜的線程同步方案,AQS 爲咱們作了大部分的同步工做,因此這裏不細說,以後再來詳細探究一下...

我等着你:Thread.join()

若是一個線程 A 執行了 thread.join() 方法,其含義是:當前線程 A 等待 thread 線程終止以後才從 thread.join() 返回。線程 Thread 除了提供 join() 方法以外,還提供了 join(long millis)join(long millis, int nanos) 兩個具有超時特性的方法。這兩個超時方法表示,若是線程 Thread 在給定的超時時間裏沒有終止,那麼將會從該超時方法中返回。

在下面的代碼中,咱們建立了 10 個線程,編號 0 ~ 9,每一個線程調用前一個線程的 join() 方法,也就是線程 0 結束了,線程 1 才能從 join() 方法中返回,而線程 0 須要等待 main 線程結束。

public class Join {

    public static void main(String[] args) throws InterruptedException {
        Thread previous = Thread.currentThread();
        for (int i = 0; i < 10; i++) {
            // 每一個線程擁有前一個線程的引用,須要等待前一個線程終止,才能從等待中返回
            Thread thread = new Thread(new Domino(previous), String.valueOf(i));
            thread.start();
            previous = thread;
        }
        TimeUnit.SECONDS.sleep(5);
        System.out.println(Thread.currentThread().getName() + " terminate. ");
    }

    static class Domino implements Runnable {

        private Thread thread;

        public Domino(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            try {
                thread.join();
            } catch (InterruptedException ignore) {
            }
            System.out.println(Thread.currentThread().getName() + " terminate. ");
        }
    }
}複製代碼

運行程序,能夠看到下列輸出:

main terminate. 
0 terminate. 
1 terminate. 
2 terminate. 
3 terminate. 
4 terminate. 
5 terminate. 
6 terminate. 
7 terminate. 
8 terminate. 
9 terminate. 複製代碼

說明每一個線程終止的前提都是前驅線程的終止,每一個線程等待前驅線程結束後,才從 join() 方法中返回,這裏涉及了等待/ 通知機制,在 JDK 的源碼中,咱們能夠看到 join() 的方法以下:

public final synchronized void join(long millis)
    throws InterruptedException {
    long base = System.currentTimeMillis();
    long now = 0;

    if (millis < 0) {
        throw new IllegalArgumentException("timeout value is negative");
    }

    if (millis == 0) {
        // 條件不知足則繼續等待
        while (isAlive()) {
            wait(0);
        }
        // 條件符合則返回
    } else {
        while (isAlive()) {
            long delay = millis - now;
            if (delay <= 0) {
                break;
            }
            wait(delay);
            now = System.currentTimeMillis() - base;
        }
    }
}複製代碼

當線程終止時,會調用線程自身的 notifyAll() 方法,會通知全部等待在該線程對象上的線程。能夠看到 join() 方法的邏輯結構跟咱們上面寫的生產者消費者相似,即加鎖、循環和處理邏輯三個步驟。

3、線程之間的數據交互

保證可見性:volatile 關鍵字

咱們先從一個有趣的例子入手:

private static boolean isOver = false;

public static void main(String[] args) throws InterruptedException {
    Thread thread = new Thread(() -> {
        while (!isOver) {
        }
        System.out.println("線程已感知到 isOver 置爲 true,線程正常返回!");
    });
    thread.start();
    Thread.sleep(500);
    isOver = true;
    System.out.println("isOver 已置爲 true");
}複製代碼

咱們開啓了一個主線程和一個子線程,咱們指望子線程可以感知到 isOver 變量的變化以結束掉死循環正常返回,可是運行程序卻發現並非像咱們指望的那樣發生,子線程一直處在了死循環的狀態!

爲何會這樣呢?

Java 內存模型

關於這一點,咱們有幾點須要說明,首先須要搞懂 Java 的內存模型:

Java 虛擬機規範中試圖定義一種 Java 內存模型(Java Memory Model, JMM)來屏蔽掉各層硬件和操做系統的內存訪問差別,以實現讓 Java 程序在各類平臺下都能達到一致的內存訪問效果。

Java 內存模型規定了全部的變量都存儲在主內存(Main Memory)中。每條線程還有本身的工做內存(Working Memory),線程的工做內存中保存了被該線程使用到的變量的主內存副本拷貝,線程對變量的全部操做(讀取、賦值等)都必須在主內存中進行,而不能直接讀寫主內存中的變量。不一樣的線程之間也沒法直接訪問對方工做內存中的變量,線程間的變量值的傳遞均須要經過主內存來完成,線程、主內存、工做內存三者的關係如上圖。

那麼不一樣的線程之間是如何通訊的呢?

共享內存的併發模型裏,線程之間共享程序的公共狀態,線程之間經過寫-讀內存中的公共狀態來隱式進行通訊,典型的共享內存通訊方式就是經過共享對象進行通訊。

例如上圖線程 A 與 線程 B 之間若是要通訊的話,那麼就必須經歷下面兩個步驟:

  1. 首先,線程 A 把本地內存 A 更新過的共享變量刷新到主內存中去
  2. 而後,線程 B 到主內存中去讀取線程 A 以前更新過的共享變量

在消息傳遞的併發模型裏,線程之間沒有公共狀態,線程之間必須經過明確的發送消息來顯式進行通訊,在 Java 中典型的消息傳遞方式就是 wait()notify()

說回剛纔出現的問題,就很容易理解了:每一個線程都有獨佔的內存區域,如操做棧、本地變量表等。線程本地保存了引用變量在堆內存中的副本,線程對變量的全部操做都在本地內存區域中進行,執行結束後再同步到堆內存中去。也就是說,咱們在主線程中修改的 isOver 的值並無被子線程讀取到(沒有被刷入主內存),也就形成了子線程對於 isOver 變量不可見。

解決方法也很簡單,只須要在 isOver 變量前加入 volatile 關鍵字就能夠了,這是由於加入了 volatile 修飾的變量容許直接與主內存交互,進行讀寫操做,保證可見性。

指令重排/ happen-before 原則

再從另外一個有趣的例子中入手,這是在高併發場景下會存在的問題:

class LazyInitDemo {
    private static TransationService service = null;
    
    public static TransationService getTransationService(){
        if (service == null) {
            synchronized (this) {
                if (service == null) {
                    service = new TransationService();
                }
            }
        }
    }
}複製代碼

這是一個典型的雙重檢查鎖定思想,這段代碼也是一個典型的雙重檢查鎖定(Double-checked Locking)問題。在高併發的狀況下,該對象引用在沒有同步的狀況下進行讀寫操做,致使用戶可能會獲取未構造完成的對象

這是由於指令優化的結果。計算機不會根據代碼順序循序漸進地執行相關指令,咱們來舉一個借書的例子:假如你要去還書而且想要借一個《高併發編程學習》系列叢書,而你的室友剛好也要還書,而且還想讓你幫忙借一本《Java 從入門到放棄》。

這件事乍一看有兩件事:你的事和你室友的事。先辦完你的事,再開始處理你室友的事情是屬於單線程的死板行爲,此時你會潛意識地進行「優化」,例如你能夠把你要還的書和你室友須要還的書一塊兒還了,再一塊兒把想要借的書借出來,這其實就至關於合併數據進行存取的操做過程了。

咱們知道一條指令的執行是能夠分紅不少步驟的,簡單地說,能夠分爲:

  • 取值 IF
  • 譯碼和去寄存器操做數 ID
  • 執行或者有效地址計算 EX
  • 存儲器訪問 MEM
  • 寫回 WB

因爲每個步驟可能使用不一樣的硬件完成,所以,聰明的工程師就發明了流水線技術來執行指令,以下圖所示:

能夠看到,當第 2 條指令執行時,第 1 條執行其實並無執行完,確切地說第一條指令尚未開始執行,只是剛剛完成了取值操做而已。這樣的好處很是明顯,假如這裏每個步驟都須要花費 1 毫秒,那麼指令 2 等待指令 1 徹底執行後再執行,則須要等待 5 毫秒,而使用流水線指令,指令 2 只須要等待 1 毫秒就能夠執行了。如此大的性能提高,固然讓人眼紅。

回到最初的問題,咱們分析一下:對於 Java 編譯器來講,初始化 TransactionService 實例和將對象地址寫到 service 字段並不是原子操做,且這兩個階段的執行順序是未定義的。加入某個線程執行 new TransactionService() 時,構造方法還未被調用,編譯器僅僅爲該對象分配了內存空間並設爲默認值,此時若另外一個線程調用 getTransactionService() 方法,因爲 service != null,可是此時 service 對象尚未被賦予真正的有效值,從而沒法取到正確的 service 單例對象。

對於此問題,一種較爲簡單的解決方案就是用 volatile 關鍵字修飾目標屬性(適用於 JDK5 及以上版本),這樣 service 就限制了編譯器對它的相關讀寫操做,對它的讀寫操做進行指令重排,肯定對象實例化以後才返回引用。

另外指令重排也有本身的規則,並不是全部的指令均可以隨意改變執行位置,下面列舉一下基本的原則:

  • 程序次序規則:一個線程內,按照代碼順序,書寫在前面的操做先行發生於書寫在後面的操做;
  • 鎖定規則:一個 unLock 操做先行發生於後面對同一個鎖的 lock 操做;
  • volatile 變量規則:對一個變量的寫操做先行發生於後面對這個變量的讀操做;
  • 傳遞規則:若是操做 A 先行發生於操做 B,而操做 B 又先行發生於操做 C,則能夠得出操做 A 先行發生於操做 C;
  • 線程啓動規則:Thread 對象的 start() 方法先行發生於此線程的每一個一個動做;
  • 線程中斷規則:對線程 interrupt() 方法的調用先行發生於被中斷線程的代碼檢測到中斷事件的發生;
  • 線程終結規則:線程中全部的操做都先行發生於線程的終止檢測,咱們能夠經過 Thread.join() 方法結束、Thread.isAlive() 的返回值手段檢測到線程已經終止執行;
  • 對象終結規則:一個對象的初始化完成先行發生於他的 finalize() 方法的開始;

volatile 不保證原子性

volatile 解決的是多線程共享變量的可見性問題,相似於 synchronized,但不具有 synchronized 的互斥性。因此對 volatile 變量的操做並不是都具備原子性,例如咱們用下面的例子來講明:

public class VolatileNotAtomic {

    private static volatile long count = 0L;
    private static final int NUMBER = 10000;

    public static void main(String[] args) {
        Thread subtractThread = new SubstractThread();
        subtractThread.start();

        for (int i = 0; i < NUMBER; i++) {
            count++;
        }

        // 等待減法線程結束
        while (subtractThread.isAlive()) {
        }

        System.out.println("count 最後的值爲: " + count);
    }

    private static class SubstractThread extends Thread {

        @Override
        public void run() {
            for (int i = 0; i < NUMBER; i++) {
                count--;
            }
        }
    }
}複製代碼

屢次執行後,發現結果基本都不爲 0。只有在 count++count-- 兩處都進行加鎖時,才能正確的返回 0,瞭解 Java 的童鞋都應該知道這 count++count-- 都不是一個原子操做,這裏就不做說明了。

volatile 的使用優化

在瞭解一點吧,註明的併發編程大師 Doug lea 在 JDK 7 的併發包裏新增一個隊列集合類 LinkedTransferQueue,它在使用 volatile 變量時,用一種追加字節的方式來優化對列出隊和入隊的性能,具體的能夠看一下下列的連接,這裏就不具體說明了。

保證原子性:synchronized

Java 中任何一個對象都有一個惟一與之關聯的鎖,這樣的鎖做爲該對象的一系列標誌位存儲在對象信息的頭部。Java 對象頭裏的 Mark Word 裏默認的存放的對象的 Hashcode/ 分代年齡和鎖標記位。32 爲JVM Mark Word 默認存儲結構以下:

Java SE 1.6中,鎖一共有 4 種狀態,級別從低到高依次是:無鎖狀態、偏向鎖狀態、輕量級鎖狀態和重量級鎖狀態,這幾個狀態會隨着競爭狀況逐漸升級。鎖能夠升級但不能降級,意味着偏向鎖升級成輕量級鎖後不能降級成偏向鎖。這種鎖升級卻不能降級的策略,目的是爲了提升得到鎖和釋放鎖的效率。

偏向鎖

HotSpot 的做者通過研究發現,大多數狀況下,鎖不只不存在多線程競爭,並且老是由同一線程屢次得到,爲了讓線程得到鎖的代價更低而引入了偏向鎖。

  • 偏向鎖的獲取:當一個線程訪問同步塊並獲取鎖時,會在對象頭和棧幀中的鎖記錄裏存儲鎖偏向的線程 ID,之後該線程在進入和退出同步塊時不須要進行 CAS 操做來加鎖和解鎖,只需簡單地測試一下對象頭的 Mark Word 裏是否存儲着指向當前線程的偏向鎖。若是測試成功,表示線程已經得到了鎖。若是測試失敗,則須要再測試一下 Mark Word 中偏向鎖的標識是否設置成 1(表示當前是偏向鎖),若是沒有設置,則使用CAS競爭鎖;若是設置了,則嘗試使用CAS將對象頭的偏向鎖指向當前線程。
  • 偏向鎖的撤銷:偏向鎖使用了一種等到競爭出現才釋放鎖的機制,因此當其餘線程嘗試競爭偏向鎖時,持有偏向鎖的線程纔會釋放鎖。

下圖線程 1 展現了偏向鎖獲取的過程,線程 2 展現了偏向鎖撤銷的過程。

輕量級鎖和自旋鎖

若是偏向鎖失敗,虛擬機並不會當即掛起線程。它還會使用一種稱爲輕量級鎖的優化手段。

線程在執行同步塊以前,JVM 會先在當前線程的棧楨中建立用於存儲鎖記錄的空間,並將對象頭中的 Mark Word 複製到鎖記錄中,官方稱爲 Displaced Mark Word。而後線程嘗試使用 CAS 將對象頭中的 Mark Word 替換爲指向鎖記錄的指針。若是成功,當前線程得到鎖,若是失敗,表示其餘線程競爭鎖,當前線程便嘗試使用自旋(本身執行幾個空循環再進行嘗試)來獲取鎖。

輕量級解鎖時,會使用原子的 CAS 操做將 Displaced Mark Word 替換回到對象頭,若是成功,則表示沒有競爭發生。若是失敗,表示當前鎖存在競爭,鎖就會膨脹成重量級鎖。下圖是兩個線程同時爭奪鎖,致使鎖膨脹的流程圖。

幾種鎖的比較

下圖就簡單歸納了一下幾種鎖的比較:

每人一支筆:ThreadLocal

除了控制資源的訪問外,咱們還能夠經過增長資源來保證全部對象的線程安全。好比,讓 100 我的填寫我的信息表,若是隻有一支筆,那麼你們就得挨個寫,對於管理人員來講,必須保證你們不會去哄搶這僅存的一支筆,不然,誰也填不完。從另一個角度出發,咱們能夠乾脆就準備 100 支筆,那麼全部人均可以各自爲營,很快就能完成表格的填寫工做。

若是說鎖是使用第一種思路,那麼 ThreadLocal 就是使用第二種思路了。

當使用 ThreadLocal 維護變量時,其爲每一個使用該變量的線程提供獨立的變量副本,因此每個線程均可以獨立的改變本身的副本,而不會影響其餘線程對應的副本。

ThreadLocal 內部實現機制

  1. 每一個線程內部都會維護一個相似 HashMap 的對象,稱爲 ThreadLocalMap,裏邊會包含若干了 Entry(K-V 鍵值對),相應的線程被稱爲這些 Entry 的屬主線程;
  2. Entry 的 Key 是一個 ThreadLocal 實例,Value 是一個線程特有對象。Entry 的做用便是:爲其屬主線程創建起一個 ThreadLocal 實例與一個線程特有對象之間的對應關係;
  3. Entry 對 Key 的引用是弱引用;Entry 對 Value 的引用是強引用。

ThreadLodal 的反作用

爲了讓線程安全地共享某個變量,JDK 開出了 ThreadLocal 這副藥方,但「是藥三分毒」,ThreadLocal 也有必定的反作用。主要問題是「產生髒數據」和「內存泄漏」。這兩個問題一般是在線程池中使用 ThreadLocal 引起的,由於線程池有 「線程複用」「內存常駐」 兩個特色。

髒數據

線程複用會產生髒數據。因爲線程池會重用 Thread 對象,那麼與 Thread 綁定的類的靜態屬性 ThreadLocal 變量也會被重用。若是在實現的線程 run() 方法中不顯式地 remove() 清理與線程相關的 ThreadLocal 信息,那麼假若下一個線程不調用 set() 設置初始值,就可能 get() 到重用的線程信息,包括 ThreadLocal 所關聯的線程對象的 value 值。

爲了方便理解,用一段簡要代碼來模擬,以下所示:

public class DirtyDataInThreadLocal {

    public static ThreadLocal<String> threadLocal = new ThreadLocal<>();

    public static void main(String[] args) {
        // 使用固定大小爲 1 的線程池,說明上一個的線程屬性會被下一個線程屬性複用
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 2; i++) {
            Mythread mythread = new Mythread();
            pool.execute(mythread);
        }
    }

    private static class Mythread extends Thread {

        private static boolean flag = true;

        @Override
        public void run() {
            if (flag) {
                // 第 1 個線程 set 後,並無進行 remove
                // 而第二個線程因爲某種緣由沒有進行 set 操做
                threadLocal.set(this.getName() + ", session info.");
                flag = false;
            }
            System.out.println(this.getName() + " 線程是 " + threadLocal.get());
        }
    }
}複製代碼

執行結果:

Thread-0 線程是 Thread-0, session info.
Thread-1 線程是 Thread-0, session info.複製代碼

內存泄漏

在源碼註釋中提示使用 static 關鍵字來修飾 ThreadLocal。在此場景下,寄但願於 ThreadLocal 對象失去引用後,觸發弱引用機制來回收 Entry 的 Value 就變得不現實了。在上面的例子中,若是不進行 remove() 操做,那麼這個線程執行完成後,經過 ThreadLocal 對象持有的 String 對象是不會被釋放的。

以上兩個問題的解決辦法很簡單,就是在每次使用完 ThreadLocal 時,必需要及時調用 remove() 方法清理。

參考資料

  1. 《Java 零基礎入門教程》 - study.163.com/course/cour…
  2. 《Java 併發編程的藝術》
  3. 《碼出高效 Java 開發手冊》 - 楊冠寶(孤盡) 高海慧(鳴莎)著
  4. Java面試知識點解析(二)——高併發編程篇 - www.wmyskxz.com/2018/05/10/…
  5. 讓你完全理解Synchronized - www.jianshu.com/p/d53bf830f…
  6. 《Offer來了 - Java面試核心知識點精講》 - 王磊 編著
  7. 《實戰Java高併發程序設計》 - 葛一鳴 郭超 編著

---

按照慣例黏一個尾巴:

歡迎轉載,轉載請註明出處!

獨立域名博客:wmyskxz.com

簡書 ID:@我沒有三顆心臟

github:wmyskxz

歡迎關注公衆微信號:wmyskxz

分享本身的學習 & 學習資料 & 生活

想要交流的朋友也能夠加 qq 羣:3382693

相關文章
相關標籤/搜索