java多線程——併發測試

這是多線程系列第六篇,其餘請關注如下:java

java 多線程—線程怎麼來的?編程

java多線程-內存模型數組

java多線程——volatile緩存

java多線程——鎖安全

java多線程——CAS多線程

 

編寫併發程序時候,能夠採起和串行程序相同的編程方式。惟一的難點在於,併發程序存在不肯定性,這種不肯定性會令程序出錯的地方遠比串行程序多,出現的方式也沒有固定規則。這對程序的應用會形成一些困難,那麼如何在測試中,儘量的暴露出這些問題,而且瞭解其性能瓶頸,這也是對開發者帶來新的挑戰。併發

本篇基於多線程知識,梳理一些多線程測試須要掌握的方法和原則,以指望可能的在開發階段,就暴露出併發程序的安全性和性能問題,爲多線程可以高效安全的運行提供幫助。性能

本篇主要包含如下內容:測試

1. 併發測試分類優化

2. 正確性測試

3. 安全性測試

4. 性能測試

 

併發測試分類

 

測試流程

併發測試和串行測試有相同的部分,好比都須要線測試其在串行狀況下的正確性,這個是保證後續測試的基礎,固然了,正確性測試和咱們的串行測試方沒有什麼不一樣,都是在保證其程序在單線程狀況下執行和串行執行有相同的結果,這個咱們再也不陳述。 

通常的併發測試,咱們按照如下流程來進行。

 

分類

併發測試大體能夠分爲兩類:安全性測試與活躍性測試。

安全性測試咱們能夠定義爲「不發生任何錯誤的行爲」,也能夠理解爲保持一致性。好比i++操做,但單線程狀況下,循環20次,i=20,但是在多線程狀況下,若是總共循環20次,結果不爲20,那麼這個結果就是錯誤的,說明出現了錯誤的線程安全問題。咱們在測試這種問題的時候,必需要增長一個」test point」保證其原子性同時又不影響程序的正確性。以此爲判斷條件執行測試代碼,關於「test point」如何作,咱們後續再討論。

活躍性測試定義爲「某個良好的行爲終究會發生」,也能夠爲理解爲程序運行有必然的結果,不會出現因某個方法阻塞,而運行緩慢,或者是發生了線程死鎖,致使一直等待的狀態等。

與活躍性測試相關的是性能測試。主要有如下幾個方面進行衡量:吞吐量,響應性,可伸縮性。

  • 吞吐量:一組併發任務中已完成任務所佔的比例。或者說是必定時間內完成任務的數量。
  • 響應性:請求從發出到完成之間的時間
  • 可伸縮性:在增長更多資源(CPU,IO,內存),吞吐量的提高狀況。

 

安全性測試

 

安全性測試,如前面所說是「不發生任何錯誤的行爲」,也是要對其數據競爭可能引起的錯誤進行測試。這也是咱們須要找到一個功能中併發的的「test point」,並對其額外的構造一些測試。並且這些測試最好不須要任何同步機制。

咱們經過一個例子來進行說明。

好比ArrayBlockingQueue,咱們知道這個class是採用一個有界的阻塞隊列來實現的生產-消費模式的。若是對其測試併發問題的,重要的就是對put和take方法進行測試,一種有效的方法就是檢查被放入隊列中和出隊列中的各個元素是否相等。若是出現數據安全性的問題,那麼必然入隊列的值和出隊列的值沒有發生對應,結果也不盡相同。好比多線程狀況下,咱們把全部入列元素和出列元素的校檢和進行比較,若是兩者相等,那麼代表測試成功。

爲了保證其可以測試到全部要點,須要對入隊的值進行隨機生成,令每次測試獲得的結果不盡相同。另外爲了保證其公平性,要保證全部的線程一塊兒開始運算,防止先進行的程序進行串行運算。

public class PutTakeTest {

    protected static final ExecutorService pool = Executors.newCachedThreadPool();



    //柵欄,經過它能夠實現讓一組線程等待至某個狀態以後再所有同時執行

    protected CyclicBarrier barrier;

    protected final ArrayBlockingQueue<Integer> bb;

    protected final int nTrials, nPairs;

    //入列總和

    protected final AtomicInteger putSum = new AtomicInteger(0);

    //出列總和

    protected final AtomicInteger takeSum = new AtomicInteger(0);



    public static void main(String[] args) throws Exception {

        new PutTakeTest(10, 10, 100000).test(); // 10個承載因子,10個線程,運行100000

        pool.shutdown();

    }



    /**

     *

     * @param capacity 承載因子(緩存)

     * @param npairs 線程數量

     * @param ntrials 單個線程執行數量(吞吐量)

     */

    public PutTakeTest(int capacity, int npairs, int ntrials) {

        this.bb = new ArrayBlockingQueue<Integer>(capacity);

        this.nTrials = ntrials;

        this.nPairs = npairs;

        this.barrier = new CyclicBarrier(npairs * 2 + 1);

    }



    void test() {

        try {

            for (int i = 0; i < nPairs; i++) {

                pool.execute(new Producer());

                pool.execute(new Consumer());

            }

            barrier.await(); // 等待全部的線程就緒

            barrier.await(); // 等待全部的線程執行完成

            System.out.println("result,put==take :"+(putSum.get()==takeSum.get()));

        } catch (Exception e) {

            throw new RuntimeException(e);

        }

    }



    static int xorShift(int y) {

        y ^= (y << 6);

        y ^= (y >>> 21);

        y ^= (y << 7);

        return y;

    }



    //生產者

    class Producer implements Runnable {

        public void run() {

            try {

                int seed = (this.hashCode() ^ (int) System.nanoTime());

                int sum = 0;

                barrier.await();

                for (int i = nTrials; i > 0; --i) {

                    bb.put(seed);

                    sum += seed;

                    seed = xorShift(seed);

                }

                putSum.getAndAdd(sum);

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }



    //消費者

    class Consumer implements Runnable {

        public void run() {

            try {

                barrier.await();

                int sum = 0;

                for (int i = nTrials; i > 0; --i) {

                    sum += bb.take();

                }

                takeSum.getAndAdd(sum);

                barrier.await();

            } catch (Exception e) {

                throw new RuntimeException(e);

            }

        }

    }

}

 

以上程序中,咱們增長putSum和takeSum變量,用來統計put和take數據的校檢和。同時採用CyclicBarrier(迴環柵欄)令全部的線程同一時間從相同的位置開始執行。每一個線程的入列數據,爲了保證其惟一性,都生成一個惟一的seed,在下列代碼執行出,必然是多線程競爭的地方,

    for (int i = nTrials; i > 0; --i) {

         bb.put(seed);

                    sum += seed;

                    seed = xorShift(seed);

       }

若是此處出現線程安全問題,那麼最終take出來的數據和put的數據必然是不相同的,最終putSum和takeSum的值必然不一樣,相反則相同。

因爲併發代碼中大多數錯誤都是一些低機率的事件,所以在測試的時候,仍是須要反覆測試屢次,以提升發現錯誤的機率。

 

性能測試

 

性能測試一般是功能測試的延伸。雖然性能測試與功能測試之間會有重疊之處,但它們的目標是不一樣的。

首先性能測試須要反映出被測試對象在應用程序中的實際用法以及它的吞吐量。另外須要根據經驗值來調整各類不一樣的限值,好比線程數,併發數等,從而令程序更好的在系統上運行。

咱們對上述的PutTakeTest進行擴展,增長如下功能:

 

一、增長一個記錄運行一次分組的運行時間,爲了保證時間精確性。

採用BarrierTimer來維護,它implements Runnable,在計數達到柵欄(CyclicBarrier)指定的數量以後,會調用一次該回調,設置結束時間。

咱們用它來記錄,單個測試運行的時間。有總時間了,單次操做的時間就能夠計算出來了。如此咱們就能夠計算出單個測試的吞吐量。

吞吐量=1ms/單次操做的時間=每秒能夠執行的次數。

如下是基於柵欄的計時器。

public class BarrierTimer implements Runnable{

    private boolean started;

    private long startTime, endTime;



    public synchronized void run() {

        long t = System.nanoTime();

        if (!started) {

            started = true;

            startTime = t;

        } else

            endTime = t;

    }



    public synchronized void clear() {

        started = false;

    }



    public synchronized long getTime() {

        return endTime - startTime;

    }


}

 

二、性能測試須要針對不一樣參數組合進行測試。

 

經過不一樣參數來進行組合測試,以此來得到在不一樣參數下的吞吐率,以及不一樣線程數量下的可伸縮性。在putTakeTest裏面,咱們只只針對安全性測試。

咱們看增強版本的TimedPutTakeTest,這裏咱們把ArrayBlockingQueue的容量分別設置爲一、十、100、1000,令其在線程數量分別爲一、二、四、八、1六、3二、6四、128的狀況下,看其鏈表的吞吐率。

public class TimedPutTakeTest extends PutTakeTest {

    private BarrierTimer timer = new BarrierTimer();



    public TimedPutTakeTest(int cap, int pairs, int trials) {

        super(cap, pairs, trials);

        barrier = new CyclicBarrier(nPairs * 2 + 1, timer);

    }



    public void test() {

        try {

            timer.clear();

            for (int i = 0; i < nPairs; i++) {

                pool.execute(new PutTakeTest.Producer());

                pool.execute(new PutTakeTest.Consumer());

            }

            barrier.await();

            barrier.await();

            long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);

            System.out.print("Throughput: " + nsPerItem + " ns/item");

            System.out.println("result:"+(putSum.get()==takeSum.get()));

        } catch (Exception e) {

            throw new RuntimeException(e);

        }

    }



    public static void main(String[] args) throws Exception {

        int tpt = 100000; // trials per thread

        for (int cap = 1; cap <= 1000; cap *= 10) {

            System.out.println("Capacity: " + cap);

            for (int pairs = 1; pairs <= 128; pairs *= 2) {

                TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);

                System.out.print("Pairs: " + pairs + "\t");

                t.test();

                System.out.print("\t");

                Thread.sleep(1000);

                t.test();

                System.out.println();

                Thread.sleep(1000);

            }

        }

        PutTakeTest.pool.shutdown();

    }

}

 

如下是咱們針對ArrayBlockingQueue的性能測試結果,個人電腦硬件環境是:

cpu i7 4核8線程

memory  16G

硬盤 SSD110G

 

jdk 環境

java version 「1.8.0_45"

Java HotSpot(TM) 64-Bit Server VM (build 25.45-b02, mixed mode)

 

從上面能夠看到如下幾點狀況

一、在ArrayBlockingQueue的緩存容量在1的狀況下,不管線性併發數爲多少,都不能顯著的提高其吞吐率。這是由於每一個線程在阻塞等待另外線程執行任務。

二、當嘗試把緩存容量提高至十、100、1000的時候,吞吐率都獲得了極大的提升,特別是在1000的時候,最高可達到900w次/s。

三、當線程增長到16個的時候,吞吐率會達到頂峯,而後再增長線程吞吐率不生反而降低,固然沒有很大的降低,這是由於,當線程增多的時候,大部分時間耗費在阻塞和解除阻塞上面了。

其餘阻塞隊列的比較

 

如下是針對ArrayBlockingQueue、LinkedBlockingQueue、LinkedBlockingDeQue、PriorityBlockingQueue幾種阻塞隊列進行的橫向測評。硬件環境仍是和上述相同。jdk仍是採用1.8的API。

每一個隊列的緩存容量是1000。而後分別在一、二、四、八、1六、3二、6四、128的線程併發下,查看其吞吐率。

從上述數據中,咱們能夠看到:

一、ArrayBlockingQueue在jdk1.8的優化下性能高於LinkedBlockingQueue,雖然二者差異不是太大,這個是1.6以前,LinkedBlockingQueue是要優於ArrayBlockingQueue的。

二、PriorityBlockingQueue在達到290w的吞吐高峯以後,性能開始持續的降低,這是由於優先隊列須要不斷的優化優先列表,而須要必定的排序時間。

以上測試的主要目的是,測試生產者和消費者在經過有界put和take傳送數據時,那些約束條件將對整個吞吐量產生影響。因此會忽略了許多實際的因素。另外因爲jit的動態編譯,會直接將編譯後的代碼直接編譯爲機器代碼。因此以上測試須要通過預熱處理,運行更多的次數,以保證全部的代碼都是編譯完成以後,才統計測試的運行時間。

 

最後

測試併發程序的正確性可能會特別困難的,由於併發程序的許多故障都是一些低機率的事情,而且它們對執行時序、負載狀況以及其餘難以重現的條件比較敏感。要想盡量的發現這些錯誤,就須要咱們作更多的工做來進行分析測試,期待今天的介紹可以幫助你們開闊一些思路。

 

引用:

以上測試代碼,引用自《java Concurrency in Practice》

相關文章
相關標籤/搜索