深刻理解 Java 多線程系列(1)——一個簡單需求的並行改造 & Java多線程的通訊問題

併發的學習門檻較高,相較單純的羅列併發編程 API 的枯燥被動學習方式,本系列文章試圖用一個簡單的栗子,一步步結合併發編程的相關知識分析舊有實現的不足,再實現邏輯進行分析改進,試圖展現例子背後的併發工具與實現原理。html

本文是本系列的第一篇文章,提出了一個簡單的業務場景,給出了一個簡單的串行實現以及基於原子變量的併發實現,同時詳細分析了 Java多線程通訊、 Java 內存模型、 happy before 等基本概念。java

寫在前面

文中全部的代碼筆者均所有實現了一遍,並上傳到了個人 github 上,多線程這部分源碼位於java-multithread模塊中 ,歡迎感興趣的讀者訪問並給出建議^_^git

倉庫地址: java-learning
git-clone: git@github.com:The-Hope/java-learning.git

串行實現

假定有這樣一個需求,給定一個目錄和一個關鍵字,要求統計指定的目錄中各文件內指定關鍵字出現的總次數。程序員

先來看看串行狀態下該怎麼實現:github

/**
 * Description:
 * 掃描指定目錄下指定關鍵字的出現次數——串行版本實現
 *
 * @author The hope
 * @date 2018/5/20.
 */
public class KeywordCount1 implements KeywordCount {

    private String keyword;
    private File directory;

    public KeywordCount1(File directory, String keyword) {

        this.keyword = keyword;
        this.directory = directory;
    }

    public int search() {
        return search(directory);
    }

    private int search(File directory) {
        int result = 0;
        for (File file : directory.listFiles())
            if (file.isDirectory()) result += search(file);
            else result += count(file);
        return result;
    }

    private int count(File file) {
        int result = 0;
        try (Scanner in = new Scanner(file)) {
            while (in.hasNextLine()) {
                String line = in.nextLine();
                if (line.contains(keyword))
                    result++;
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        return result;
    }

    @Override
    public void shutDown() {}
}

代碼很簡單,核心實現是search(File directory) 函數:編程

private int search(File directory) {
    int result = 0;
    for (File file : directory.listFiles())
        if (file.isDirectory()) result += search(file);
        else result += count(file);
    return result;
}

邏輯很簡單,判斷當前 file 對象若是是文件夾就遞歸調用本身,不然統計關鍵字出現次數。(注,爲了方便測試函數的調用,我抽象了接口 KeywordCount 以規範暴露出的方法)segmentfault

爲了看看它的執行效果咱們再來寫個簡單的測試函數:緩存

/**
 * Description:
 *  掃描指定目錄下指定關鍵字的出現次數——測試函數
 * @author The hope
 * @date 2018/5/20.
 */
public class KeywordCountTest {

    public static void main(String... args) throws Exception{
        Scanner in = new Scanner(System.in);
        System.out.println("Enter base directory (e.g. C:\\Program Files\\Java\\jdk1.6.0_45\\src): ");
        String directory = in.nextLine();
        System.out.println("Enter keyword (e.g. java): ");
        String keyword = in.nextLine();
        int execTimes = 5;// 設定執行次數

        long start = System.currentTimeMillis();//開始計時

        int totalCount = 0;
        KeywordCount counter = new KeywordCount1(new File(directory), keyword);
        for (int i = 0; i < execTimes; i++) {
            int count = counter.search();
            totalCount += count;
        }

        long end = System.currentTimeMillis();//結束計時
        System.out.println("Statistics: " + totalCount/ execTimes);
        System.out.println("used time: " + (end-start)/ execTimes);

        counter.shutDown();
    }

}

(爲了消除單次運行的波動影響,這裏故意寫了個循環來作平均)多線程

執行效果以下:併發

Enter base directory (e.g. C:\Program Files\Java\jdk1.6.0_45\src): 
C:\Program Files\Java\jdk1.6.0_45\src
Enter keyword (e.g. java): 
java
Statistics: 43781
used time: 5152

Process finished with exit code 0

能夠看到用時大概在5秒左右

拓展思考

咱們能夠簡單的分析下整個功能的邏輯,大致上能夠分爲兩個部分:

  1. 從給定目錄尋找下級文件
  2. 從給定文件中統計指定關鍵字出現次數

其中第二步明顯是相互獨立、互不依賴且耗時較多的任務,假使咱們可以引入多線程併發的去執行那麼就能合理的提高系統的吞吐量進而提升系統響應時間。


注意,在分析是否值得利用多線程改進一個需求實現時,自什麼維度來進行任務的拆分是一件比較重要的考慮因素。若是任務之間存在執行順序依賴或者數據依賴,那麼就很難簡單的對任務進行拆分,而應該從更高的維度從新思考任務的邊界並設計相應的實現。好比,針對有執行順序依賴的任務,能夠從更高維度來對任務進行分組,並將一組任務放入一個線程中順序執行,並經過 ThreadLocal 來傳遞變量,這樣能夠有效減小數據爭用的競態條件。

引入併發

在開始動筆實現以前,咱們先來思考這麼兩個問題:
1. 線程什麼時候執行不受咱們控制,咱們怎麼知道線程什麼時候可以執行完畢
2. 即使咱們知道線程何時執行完畢,但是 Java 並無提供線程之間顯示的通訊方法,那麼咱們怎麼獲取須要的結果。

其實這兩個問題,都是典型的線程間通訊問題。好比第一個問題,換種角度看就是主線程如何接收子線程執行完畢的信息。第二個問題更是一種典型的主線程如何接受子線程計算結果的問題。

因此接下來,咱們須要簡單的介紹下多線程中的併發通訊模型。

多線程間的併發通訊

對於多線程編程來講,最根本的就是解決兩個問題:

  1. 線程之間如何進行通訊(以何種信息來交換信息)
  2. 線程之間如何進行同步

咱們先來講說如何通訊,大致上有這麼兩種方式:

  1. 基於消息傳遞
  2. 基於共享內存

消息傳遞的併發模型

基於消息傳遞的併發模型中,線程之間沒有公共狀態,通訊基於顯式消息傳遞實現,因爲消息的接收必定存在於消息的發送以後,此時同步是隱式進行的

結合併發模型的介紹,咱們能夠很容易的知道,Thread.join() 方法就是一種很典型的線程間消息傳遞機制。他傳遞的消息就是目標線程什麼時候執行完畢的信息,併兼具阻塞代碼執行的功能。相似的消息傳遞機制還有 wait(),notifyAll() 等方法。

舉個栗子:
針對前文的第一個問題:

線程什麼時候執行不受咱們控制,咱們怎麼知道線程什麼時候可以執行完畢?

以下方代碼所示。經過使用 Thread.join()方法,保證代碼阻塞,直到子線程執行完畢再繼續執行:

class ThreadA{
  public static void main(String... args){
    ThreadB b = new Thread(new Runnable{ void run(){...}});
    b.start();
    b.join(); // join() 方法會等待線程執行完畢。若是不加這一行將會繼續運行下去
    // do something
  }
}

共享內存的併發模型

基於共享內存的併發模型中,線程之間共享程序的公共狀態,通訊是經過線程之間串行的對公共狀態進行讀寫來實現的,所以老是須要(程序員)顯示的指定同步來實現隱式的通訊。

好比 Java 中 volatile,synchronized 以及各類鎖機制,均爲了解決線程間公共狀態的串行訪問問題。

講到這裏,咱們還能夠再宕開一筆,簡單聊聊爲何基於共享內存的併發模型必定要花大力氣保證線程之間的串行執行。

Java 內存模型的抽象(JMM)

相似現代多核處理器會給每一個核心設計本身的 CPU 寄存器緩存主內存中的目標數據,以方便處理器的快速存取。當多個處理器的任務涉及同一塊主內存時,就須要利用 MSI、MESI、MOSI 等緩存一致性協議來協調各個處理器之間的對特定內存或者高速緩存的訪問規則。以下圖:

clipboard.png

針對一個線程對共享變量的寫入什麼時候對另外一個線程可見問題,Java 利用 JMM 抽象了線程與主內存之間的關係。
咱們先來看看Java內存模型(JMM)的示意圖:

clipboard.png

注意:這裏的工做內存並不實際存在,而是涵蓋了緩存,寫緩衝區,寄存器以及其餘的硬件和編譯器優化等概念的一種抽象

從圖中就能夠很清晰的概括出,若是線程A想要和線程B之間想要經過共享內存進行通訊,那麼必須通過如下步驟:

  1. 線程A將工做內存中更新的工做內存副本寫回至主內存中
  2. 線程B從根據主內存中的值從新更新刷新本身的工做內存副本

上述兩步必須有序進行,不然將會致使通訊錯誤。

例如考慮如下時序:

  1. 變量X初始值爲100
  2. 線程 A 將 X 值寫入工做內存中,此時工做內存與主內存 X 值均爲100
  3. 線程 A 給 X + 50 而後寫入工做內存中,此時 A 的時間片用完。 X 在工做內存值爲 150,X在主內存中值爲100
  4. 線程 B 將 X 的值寫入本身的工做內存中。此時線程 B 的工做內存值爲 100,主內存值仍爲 100。
  5. 線程 B 給 X + 30 而後寫入工做內存中,此時 B 的工做內存值爲 130,主內存值爲 100。
  6. 線程 B 將工做內存的值寫回主內存,線程 B 運行結束。此時主內存值爲 130。
  7. 線程 A 從休眠中醒來,將工做內存中的 150 同步回主內存,此時主內存值爲 150。

從上述時序中,咱們能夠看到,因爲線程 A & B 針對共享狀態 X 寫入並非串行的,致使中間出現了數據覆蓋的錯誤狀況。同理,讀者能夠再繼續分析思考下寫讀模型中的同步問題。

重排序

值得注意的是,除了上述例子中,線程間錯誤的時序會致使併發錯誤,重排序也一樣會致使意想不到的併發錯誤。
重排序的緣由大致分爲這三種:

  1. 編譯期優化的重排序(編譯器僅保證不更改單線程運行語義)
  2. 指令級並行的重排序(處理器僅保證不破壞存在數據依賴的指令)
  3. 內存系統的重排序(讀/寫緩衝區到主內存同步機制)

關於這部分的介紹,前人珠玉在前,列舉了大量簡明易懂的例子。這裏援引併發編程網的程曉明在《深刻理解Java內存模型》系列文章中的一個例子來給你們作個簡單介紹:

處理器重排序與內存屏障指令

現代的處理器使用寫緩衝區來臨時保存向內存寫入的數據。寫緩衝區能夠保證指令流水線持續運行,它能夠避免因爲處理器停頓下來等待向內存寫入數據而產生的延遲。同時,經過以批處理的方式刷新寫緩衝區,以及合併寫緩衝區中對同一內存地址的屢次寫,能夠減小對內存總線的佔用。雖然寫緩衝區有這麼多好處,但每一個處理器上的寫緩衝區,僅僅對它所在的處理器可見。這個特性會對內存操做的執行順序產生重要的影響:處理器對內存的讀/寫操做的執行順序,不必定與內存實際發生的讀/寫操做順序一致!爲了具體說明,請看下面示例:

Processor A Processor B
a = 1; //A1
x = b; //A2
b = 2; //B1
y = a; //B2
初始狀態:a = b = 0
處理器容許執行後獲得結果:x = y = 0

假設處理器A和處理器B按程序的順序並行執行內存訪問,最終卻可能獲得x = y = 0的結果。具體的緣由以下圖所示:
重排序問題

這裏處理器A和處理器B能夠同時把共享變量寫入本身的寫緩衝區(A1,B1),而後從內存中讀取另外一個共享變量(A2,B2),最後才把本身寫緩存區中保存的髒數據刷新到內存中(A3,B3)。當以這種時序執行時,程序就能夠獲得x = y = 0的結果。

從內存操做實際發生的順序來看,直處處理器A執行A3來刷新本身的寫緩存區,寫操做A1纔算真正執行了。雖然處理器A執行內存操做的順序爲:A1->A2,但內存操做實際發生的順序倒是:A2->A1。此時,處理器A的內存操做順序被重排序了(處理器B的狀況和處理器A同樣,這裏就不贅述了)。

這裏的關鍵是,因爲寫緩衝區僅對本身的處理器可見,它會致使處理器執行內存操做的順序可能會與內存實際的操做執行順序不一致。因爲現代的處理器都會使用寫緩衝區,所以現代的處理器都會容許對寫-讀操做重排序。

上述引文介紹了一個簡單的小栗子,說明了重排序問題致使的一個併發錯誤。既然重排序問題可能致使程序在併發執行時致使意想不到的錯誤發生,做爲程序員咱們又該怎麼分析定位問題呢?

先行發生(happens before)原則

雖然重排序問題會致使併發程序的可見性錯誤,不過 Java 經過先行發生的概念從新約定了操做之間的可見性。

換句話說若是一個操做的執行結果須要對另外一個線程可見,那麼這兩個操做之間必定要存在 happens before 關係。這裏的兩個操做能夠是在一個線程也能夠是兩個線程。

與咱們平常開發聯繫最緊密的先行發生原則以下:

  1. 程序順序規則:一個線程中的每一個操做,happens- before 於該線程中的任意後續操做。
  2. 監視器鎖規則:對一個監視器鎖的解鎖,happens- before 於隨後對這個監視器鎖的加鎖。
  3. volatile變量規則:對一個volatile域的寫,happens- before 於任意後續對這個volatile域的讀。
  4. 傳遞性:若是A happens- before B,且B happens- before C,那麼A happens- before C。

注:咱們常說的 synchronized,volatile,ReentrantLock 等顯示同步的原理,就是依託於這裏的監視器鎖規則實現的。

小結

這裏咱們介紹了基於共享狀態的併發模型,指出了因爲線程工做內存與主內存的同步,代碼執行的重排序等問題,可能致使線程共享狀態的可見性及原子性錯誤。所以,當線程之間存在公共狀態時,須要利用先行發生原則針對共享狀態的訪問進行合理性分析,確保共享狀態的訪問/修改操做兩兩符合先行發生原則。換句話說,須要保證對多線程之間共享狀態的操做進行合理同步。

拓展思考

學了這麼多,回到咱們最開始的問題:

即使咱們知道線程何時執行完畢,但是 Java 並無提供線程之間顯示的通訊方法,那麼咱們怎麼獲取須要的結果?

在進行分析以前,咱們回過頭來看看以前版本的核心代碼實現:

int totalCount = 0;
KeywordCount counter = new KeywordCount1(new File(directory), keyword);
for (int i = 0; i < execTimes; i++) {
    int count = counter.search();
    totalCount += count;
}

能夠看到,咱們最終的結果是經過 totalCount 變量記錄的,也就是說,若是咱們依舊依賴這個變量做爲咱們的最重結果,由於每一個線程都會統計本身的關鍵詞,累加到該變量。那麼這就是一種典型的共享數據的競態問題,這時依據先行發生原則進行分析,咱們發現:

  1. 由於不是單線程環境,因此程序順序規則失效
  2. 由於沒有用任何鎖,也沒有用 synchronized 關鍵字,因此監視器規則失效
  3. 由於沒有用 volatile 關鍵字,因此volatile規則失效
  4. 由於上述規則都失效,因此傳遞性規則也失效

綜上,經過利用先行發生原則對競態條件進行分析,咱們發現這部分代碼不作改變那麼多線程環境下鐵定會出錯,那麼咱們接下來該怎麼辦呢?

解決方法

咱們能夠新建一個 Counter 類,將這個 Counter 類傳遞給各個線程去運行計算相應的任務。同時在 Counter 類中設置一個原子的計數器域(AtomicInteger),利用 AtomicIntegerincrementAndGet() 來實現原子的自增操做。等主線程判斷計算任務執行完畢時,再從 Counter 類獲取計算結果便可。核心代碼以下:

class Counter{
    private AtomicInteger count = new AtomicInteger(0);
    
    counter(File file){
        ···
        count.incrementAndGet();
        ···
    }
    
    int getCounterNum(){
        count.get();
    }
}

注:這裏因爲計數器的實現須要依賴變量自身的舊狀態,因此不能使用 volatile 變量。反之,若是業務場景只須要共享狀態的單一更新(不依賴舊狀態),那麼使用 volatile 關鍵字效率會更高。
拓展來看,若是業務操做再複雜一些,須要確保多個變量的組合操做的併發原子性時,更建議使用 ReentrantLock 以及 synchronized 關鍵字來對方法或者代碼塊進行鎖定以保證正確性。

基於線程的併發實現

基於上文對併發編程模型的思考,咱們解決了擺在咱們面前的兩尊攔路虎,線程什麼時候結束 & 變量在線程中如何傳遞。

如今咱們終於能夠再來看看併發版本的關鍵字統計功能該如何實現了。代碼實現以下:

/**
 * Description:
 *   掃描指定目錄下指定關鍵字的出現次數——多線程+原子變量版本實現
 * @author The hope
 * @date 2018/5/20.
 */
public class KeywordCount2 implements KeywordCount {


    private final File directory;
    private final String keyword;

    KeywordCount2(File directory, String keyword) {

        this.keyword = keyword;
        this.directory = directory;
    }

    public int search() throws InterruptedException {
        Counter counter = new Counter(keyword);
        FileSearch fileSearch = new FileSearch(directory, counter);
        Thread t = new Thread(fileSearch);
        t.start();
        t.join();
        return counter.getCountNum();
    }

    @Override
    public void shutDown() {}

    private static class FileSearch implements Runnable {
        private File directory;
        private Counter counter;

        FileSearch(File file, Counter counter) {
            this.directory = file;
            this.counter = counter;
        }

        @Override
        public void run() {
            List<Thread> subThreads = new ArrayList<>();

            for (File file : directory.listFiles())
                if (file.isDirectory()) {
                    FileSearch fileSearch = new FileSearch(file, counter);
                    Thread t = new Thread(fileSearch);
                    subThreads.add(t);
                    t.start();
                } else {
                    counter.search(file);
                }

            for (Thread subThread : subThreads)
                try {
                    subThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        }
    }

    private static class Counter {
        String keyword;
        AtomicInteger count = new AtomicInteger(0);

        Counter(String keyword) {
            this.keyword = keyword;
        }

        int getCountNum() {
            return count.get();
        }

        void search(File file) {
            try (Scanner in = new Scanner(file)) {
                while (in.hasNextLine()) {
                    String line = in.nextLine();
                    if (line.contains(keyword))
                        count.incrementAndGet();
                }
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
        }
    }
}

這裏咱們新建立了兩個類 FileSearchCounter
利用FileSearch來進行線程的建立與子計算的分發問題:

@Override
public void run() {
    List<Thread> subThreads = new ArrayList<>();

    for (File file : directory.listFiles())
        if (file.isDirectory()) {
            FileSearch fileSearch = new FileSearch(file, counter);
            Thread t = new Thread(fileSearch);
            subThreads.add(t);
            t.start();
        } else {
            counter.search(file);
        }

    for (Thread subThread : subThreads)
        try {
            subThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

利用Counter來解決計算結果在線程間的傳遞問題:

···
AtomicInteger count = new AtomicInteger(0);
···
void search(File file) {
    try (Scanner in = new Scanner(file)) {
        while (in.hasNextLine()) {
            String line = in.nextLine();
            if (line.contains(keyword))
                count.incrementAndGet();
        }
    } catch (FileNotFoundException e) {
        e.printStackTrace();
    }
}

執行結果以下:

Enter base directory (e.g. C:\Program Files\Java\jdk1.6.0_45\src): 
C:\Program Files\Java\jdk1.6.0_45\src
Enter keyword (e.g. java): 
java
Statistics: 43781
used time: 2418

Process finished with exit code 0

能夠看到時間下降至2秒半左右,提升了50%,的確是極大的提升了響應速度

小結

本文經過提出一個簡單的業務場景(統計指定目錄下關鍵字出現數量),並設計了一個簡單的串行實現。

針對串行版本響應緩慢的問題,筆者以提出問題-解決問題的模式,引入Java多線程通訊以及 Java 內存模型的相關知識,一步步解決改造過程當中的痛點並最終完成了一個基於原子變量的併發版本實現。

經過測試驗證,本輪改形成功解決了串行版本的業務痛點 :)

拓展思考

雖然上述實現極大的提升了程序的執行速度,將執行時間縮短了一半。可是仍然存在下面幾個問題。

  1. 代碼變得更爲複雜: 串行版本50行不到解決問題併發版本,卻暴增至100行,客觀上增長了複雜度。
  2. 建立線程的數量不可肯定: 本版本的實現中,線程的建立數量僅取決於文件數目,衍生出執行效率問題。
  3. 多了些額外的對象,好比 Counter:本問題其實是問題 1 的具體版本,爲了併發而引入新的類本就客觀增長了複雜度。
  4. Counter 面臨多個線程的競態條件,必須進行同步:因爲使用Counter來解決線程間的通訊問題,於是勢必引出同步問題。

上述問題該如何解決與避免,請看下文:深刻理解 Java多線程系列(2)——執行器框架

未完待續~

參考文獻

Java 併發編程實戰
Java 核心技術——卷Ⅰ
深刻理解 Jvm 虛擬機——周志明
深刻理解 Java 內存模型——程曉明

聯繫做者

zhihu.com
segmentfault.com
oschina.net

相關文章
相關標籤/搜索