多線程程序加速指南

雖然對於一個計算機程序來講最重要的是正確性,若是一個程序沒辦法產出正確的結果,那麼這個程序的價值就大打折扣了。但程序性能也是很重要的一個方面,若是程序運行得太慢,那也會影響到程序的適用範圍和硬件配置的成本。java

在以前的文章《4.多線程中那些看不到的陷阱》中,咱們瞭解了線程間的同步機制,這主要是爲了保證程序在多線程環境下的正確性。在這篇文章中咱們將會深刻探究多線程程序的性能瓶頸和多種不一樣的優化方式,那麼咱們首先就從對程序性能的測量與分析開始吧。編程

分析多線程程序的性能

咱們先來看一個使用AtomicLong進行多線程計數的程序,下面的程序中會啓動兩個線程,每一個線程會對靜態變量count進行一億次(10的8次方)的累加操做,這段代碼在開始和結束的時候都獲取了當前時間,而後經過這兩個時間值計算程序的運行耗時。數組

public class AtomicIntegerTest {

    private static AtomicLong count = new AtomicLong(0);

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        Runnable task = new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count.incrementAndGet();
                }
            }
        };

        Thread t1 = new Thread(task);
        Thread t2 = new Thread(task);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3));
    }

}
複製代碼

在個人電腦上運行這段程序最後輸出的結果是2.44s,看起來有點長了,那麼咱們再看一下若是直接在單個線程中對一個整型變量累加兩億次會是什麼結果。下面是一個在單個線程中累加兩億次的程序代碼:緩存

public class SingleThreadTest {

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        long count = 0;
        for (int i = 0; i < 2e8; ++i) {
            count += 1;
        }
        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3));
    }

}
複製代碼

這段代碼運行耗時只有0.33s,咱們經過兩個線程進行累加的代碼居然比單線程的代碼還要慢得多,這不是就和咱們使用多線程加快程序運行的初衷相違背了嗎?安全

多線程程序的線程間同步是影響多線程程序性能的關鍵所在,一方面程序中必須串行化的部分會使系統總體的耗時顯著增長,另外一方面同步行爲自己的開銷也比較大,特別是在發生衝突的狀況下。在上文的代碼中,多線程累加的程序之因此會比單線程還慢得多就是由於在AtomicLong類型的靜態變量count上有兩個線程同時調用incrementAndGet方法進行累加,這就會致使在這個靜態變量上存在很嚴重的衝突。性能優化

當一個線程成功修改了變量count的值後,另一個正在修改的線程就會修改失敗而且會再次重試累加操做。而且由於AtomicLong類型的對象中是用一個volatile變量來保存實際的整型值的,而咱們在以前的文章《多線程中那些看不到的陷阱》中能夠了解到,對volatile變量的修改操做必定要把修改後的數據從高速緩存寫回內存當中,這也是用AtomicLong進行累加的耗時比單線程累加版本還要多這麼多的主要緣由。bash

那麼咱們有沒有更好的方法能夠解決這個問題呢?數據結構

使用任務拆分進行優化

在上面的例子中,咱們須要的只是最終累加的結果,因此爲了減少線程間同步的開銷,咱們能夠將累加任務拆分到不一樣的線程中執行,到最後再把每一個線程的結果加在一塊兒就能夠獲得最終的結果了。在下面的代碼中咱們就使用了這種方法,t1在count1上累加一億次,t2在count2上累加一億次,最後把count1和count2相加獲得最終的結果,咱們來一塊兒運行一下,看看效果如何。多線程

public class TwoThreadTest {

    private static long count1 = 0;
    private static long count2 = 0;

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count1 += 1;
                }
            }
        });
        Thread t2 = new Thread(new Runnable() {
            public void run() {
                for (int i = 0; i < 1e8; ++i) {
                    count2 += 1;
                }
            }
        });

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        long count = count1 + count2;
        System.out.println("count = " + count);

        long endTime = System.currentTimeMillis();
        System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3));
    }

}
複製代碼

這段程序在個人電腦上的耗時是0.20s,比以前單線程的0.33s有了不小的提升,更是遙遙領先原先用兩個線程累加AtomicLong類型變量版本的2.44s。這說明咱們以前的分析是正確的,CAS重試和volatile寫回內存兩個操做所引發的開銷是AtomicLong版本程序性能低下的罪魁禍首。併發

可是這個版本的結構仍是略顯原始了,在應付累加這種簡單的需求時可能還比較容易,可是一旦面臨複雜的併發任務,那可能就要寫不少複雜的代碼,而且很容易出現錯誤了。例如咱們若是想把任務拆分到10個線程中運行,那麼咱們就要首先把兩億次的累加任務拆分爲10份,而後還要建立一個包含10個Thread對象的數組讓他們分別的對不一樣範圍進行累加,最後還要經過join方法等待這10個線程都執行完成,這個任務聽起來就不太容易。不要緊,下面咱們將會介紹一種目前比較經常使用的任務拆分與運行框架來解決這個問題,經過這個框架咱們能夠很容易地寫出易於編寫和擴展的任務拆分式程序。

使用ForkJoinPool進行任務

JDK 1.7中引入了一個新的多線程任務執行框架,被稱爲ForkJoinPoolForkJoinPool是一個Java類,它實現了表明線程池功能的ExecutorService接口,因此它在使用方法上和經常使用的線程池類ThreadPoolExecutor類似,但在本節中咱們並不須要瞭解線程池的詳細用法,不過感興趣的讀者能夠參考這篇文章從0到1玩轉線程池來了解一下。

線程池就是一個線程的集合,其中的線程會一直等待執行任務,因此咱們能夠把任務以任務對象的形式提交到線程池,而後線程池就會利用其中的線程來執行任務。在ForkJoinPool的使用中,線程池指的就是ForkJoinPool類型的對象,而任務對象指的就是繼承自ForkJoinTask的類的對象。在下面的示例代碼中,咱們使用了自定義的RecursiveTask的子類來做爲任務類,RecursiveTask類就繼承自ForkJoinTask類。

Recursive的意思是遞歸,也就是說咱們在這個任務類的執行過程當中可能會建立新的任務類對象來表明當前任務的子任務,而後經過結合多個子任務的結果來返回當前任務的結果。好比一開始的任務是累加兩億次,但那麼咱們就能夠把它分爲兩個分別累加一億次的子任務的結果之和,一樣的道理,累加一億次的子任務也能夠再被分爲兩個累加五千萬次的子子任務。這樣的拆分會一直持續到咱們認爲任務規模已經足夠小的時候,這時子任務的結果就會被計算,而後再返回給上層任務進行處理以後就獲得上層任務的結果了。

若是前面這一段文字描述看不明白也不要緊,咱們在代碼中找一找答案:

public class ForkJoinTest {

    private static class AccumulateTask extends RecursiveTask<Long> {

        private long start;
        private long end;
        private long threshold;

        /**
         * 任務的構造函數
         *
         * @param start         任務處理範圍的起始點(包含)
         * @param end           任務處理範圍的結束點(不包含)
         * @param threshold     任務拆分的閾值
         */
        public AccumulateTask(long start, long end, long threshold) {
            this.start = start;
            this.end = end;
            this.threshold = threshold;
        }

        @Override
        protected Long compute() {
            long left = start;
            long right = end;

            // 終止條件:若是當前處理的範圍小於等於閾值(threshold),
            //                    那麼就直接經過循環執行累加操做
            if (right - left <= (int) threshold) {
                long result = 0;
                for (long i = left; i < right; ++i) {
                    result += 1;
                }
                return result;
            }

            // 獲取當前處理範圍的中心點
            long mid = (start + end) / 2;

            // 拆分出兩個子任務,一個從start到mid,一個從mid到end
            ForkJoinTask<Long> leftTask = new AccumulateTask(start, mid, threshold);
            ForkJoinTask<Long> rightTask = new AccumulateTask(mid, end, threshold);

            // 經過當前線程池運行兩個子任務
            leftTask.fork();
            rightTask.fork();

            try {
                // 獲取兩個子任務的結果並返回
                return leftTask.get() + rightTask.get();
            } catch (Exception e) {
                return 0L;
            }
        }
    }

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();

        // 建立總任務,範圍是從1到兩億(包含),閾值爲10的7次方,因此最終至少會有10個任務進行for循環的累加
        AccumulateTask forkJoinTask = new AccumulateTask(1, (int) 2e8+1, (long) 1e7);
        // 使用一個新建立的ForkJoinPool任務池運行ForkJoin任務
        new ForkJoinPool().submit(forkJoinTask);

        // 打印任務結果
        System.out.println("count = " + forkJoinTask.get());

        // 計算程序耗時並打印
        long endTime = System.currentTimeMillis();
        System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3));
    }

}
複製代碼

在上面的代碼中,咱們會不斷建立在指定範圍內累加的子任務,直到任務範圍小於閾值threshold(在代碼中是10的7次方)時纔再也不拆分子任務,而是經過循環來獲得累加結果。以後子任務的返回結果在上層任務中相加並做爲上層任務的結果返回。到最後咱們就能夠獲得累加兩億次的結果了。

在這個程序中,最重要的是對任務對象的三個操做:

  1. 建立任務對象,代碼中使用的是new AccumulateTask(start, mid, threshold)new AccumulateTask(mid, end, threshold),這兩段代碼會建立除了範圍不一樣其餘邏輯與父任務徹底一致的子任務,每一個子任務會負責執行父任務範圍中的一半;
  2. 執行子任務,經過調用任務對象的fork()方法可讓子任務被提交到當前ForkJoinPool中執行;
  3. 等待子任務返回結果,經過調用子任務任務對象的get()方法,父任務將會等待子任務執行完成並返回結果,而後將兩個子任務的結果相加獲得父任務的執行結果。

爲何一樣都是線程池,可是ThreadPoolExecutor類就很難實現這樣的執行方式呢?細心的讀者可能已經發現了,咱們在一個任務中會拆分出兩個子任務,而且要等待這兩個子任務都執行完成才能返回父任務的結果。若是是在ThreadPoolExecutor中,在等待子任務運行完成獲得結果時,父任務會一直阻塞而且佔用一個線程,這樣的話若是父任務太多就會致使子任務沒有線程可供使用了,這個運行流程就沒辦法繼續執行下去了。而ForkJoinPool這個特殊的線程池就解決了這個問題,父任務在等待子任務執行時可讓出線程給其餘任務,這樣就不會致使線程都被阻塞狀態的父任務所阻塞了。

這種將任務拆分爲互不依賴的子任務,而後分別在不一樣的線程上執行,最後再將結果進行逐步合併的方法就被稱爲Map-Reduce。這種方法在離線大數據技術中被普遍應用,甚至能夠說大數據相關技術就是在Map-Reduce思想基礎上發展起來的也不爲過。

線程內變量

經過上面的幾個例子,咱們能夠看到,對於多線程程序來講,共享數據就是最大的問題。共享數據不但可能引發數據競爭問題,致使程序出現問題;並且隨着引入的線程同步操做又會拉低程序的性能,甚至可能使多線程程序的執行時間比單線程程序還長得多。在上面的例子中,咱們經過使用ForkJoinPool拆分並執行了一個累加任務,各個子任務之間基本徹底獨立,作到了最大程度的並行化。可是在一些狀況下,咱們可能沒辦法作到如此理想的方案,在一些狀況下仍是會留有必定的線程同步操做和對應的代碼臨界區。

那麼在這些狀況下咱們如何處理能讓程序的性能儘量高呢?

假設咱們如今要統計一個方法的調用次數,若是可能有多個線程同時調用該方法,那麼就須要對多個線程的調用同時計數。這種狀況下咱們能夠考慮在每一個線程裏各自保留一個整型變量用於保存每一個線程內的調用次數,而後在獲取總數時只須要把每一個線程中的數量加在一塊兒就能計算出來了。而在累加計數時咱們只須要修改當前線程對應的變量就能夠了,天然就沒有了數據競爭問題。java.util.concurrent.atomic包中的累加器LongAdder類採用的就是這樣的思路,這種思路也有本身專門的專業術語,被稱爲**「線程封閉」**,線程封閉指的就是這種經過線程內變量來避免線程間共享數據的優化方式。

Java中也有專門的ThreadLocal類能夠處理線程內變量,只是由於性能和線程銷燬時的數據保存之類的緣由通常不會用於多線程累加這樣的數據聚合場景,可是在保存和獲取數據方面很是的便利,有興趣的讀者能夠了解一下。

ConcurrentHashMap

java.util.concurrent包爲咱們提供了鎖、原子類、線程池、ForkJoinPool等一大批併發編程工具。最後,咱們來了解一下java.util.concurrent包中爲咱們提供的一種線程安全數據結構。

在Java中,咱們經常使用的Map類是HashMap,可是這個類並非線程安全的,若是咱們在多個線程中同時對HashMap對象進行讀寫,那麼就有可能引起一些程序問題。還有一個從JDK 1.0起就已經存在的Hashtable類能夠保證線程安全,可是咱們打開這個類的源代碼能夠看到,這個類中的大部分方法上都加上了synchronized標記,其中包括了最經常使用的getput方法,這意味着Hashtable類的對象同一時間幾乎只能被一個線程所使用,這樣的效率相對是比較低的。

可是其實熟悉HashMap結構的朋友可能會知道,HashMap內部的結構是分爲不少的桶的,每一個鍵值對都會根據key值的hashCode值被放到不一樣的桶中。其實在作修改操做時咱們只須要對對應的一個桶加鎖就能夠了,而在執行讀操做時,在大多數狀況下是不用加鎖的。JDK 1.5中引入的ConcurrentHashMap基本能達到這兩點。

在這個類中,咱們經過兩種方式來優化了併發的性能:

  1. 經過限制鎖保護的代碼範圍來減小了鎖衝突發生的可能性,並且也減小了須要的鎖數量,減小了同步產生的開銷;
  2. 另外一方面由於讀取的時候並不須要加鎖,而只有寫操做才須要加鎖,在一些讀操做較多可是寫操做較少的狀況下,咱們就能大大下降讀操做的成本,從而提升了程序的性能。

並且ConcurrentHashMap中的大多數方法不只優化了同步機制的效率,並且提供了不少原子性的相似於CAS的操做方法,下面是ConcurrentHashMap類經常使用的操做:

  • V putIfAbsent(K key, V value),原子性操做,若是map中不包含key,則執行map.put(key, value)並將put方法的返回值返回,不然直接返回map.get(key)的值,即當前值;
  • boolean remove(Object key, Object value),原子性操做,若是知足條件 (map中包含key對應的鍵值對 && value參數等於鍵值對中當前的value值) 則移除key對應的鍵值對並返回true,不然返回false;
  • boolean replace(K key, V oldValue, V newValue),原子性操做,當知足 (map中包含key對應的鍵值對 && oldValue參數等於鍵值對中當前的value值)時,將key對應的值改成newValue並返回true,不然返回false。

總結

咱們在這篇文章中從對一個使用AtomicLong進行多線程累加的程序的性能測試開始,經過Map-Reduce思想大大優化了這個程序的性能,在此過程當中還涉及到了ForkJoinPool類的使用。以後咱們經過線程內變量正式提出了**「線程封閉」的概念,若是咱們能作到線程封閉**,那麼由於減小了線程間同步的開銷,因此線程的性能必定會有很大的提升。最後咱們介紹了java.util.concurrent包中爲咱們提供的併發安全數據類ConcurrentHashMap。相信經過這篇文章你們可以瞭解到多種多線程性能優化方法,但最重要的仍是要找出多線程程序性能的瓶頸所在,這樣才能在實際的實踐場景中根據不一樣的狀況因時制宜,使用合適的方法解決不一樣的瓶頸問題。本文中的觀點是多線程程序的瓶頸主要在於共享數據引發的數據競爭問題,若是可以讓不一樣的線程間不存在或者儘量少地存在共享數據與臨界區代碼,就可以對多線程程序的性能起到正面的影響。

相關文章
相關標籤/搜索