Java併發編程中級篇(四):使用CyclicBarrier實現併發線程在集合點同步

上一節講到了CountDownLatch這個併發輔助類,它可以讓一個線程等待其餘併發線程執行完一組任務後再繼續執行,也能夠說是實現了併發線程在集合點同步。可是Java又給出了一個更強大的併發輔助類CyclicBarrier。java

CyclicBarrier也使用一個整形參數進行初始化,這個參數是須要再某點同步的線程數。當線程調用await()方法後CyclicBarrier類將把這個線程編爲WAITING狀態,並等待直到其餘線程都到達集合點。當最後一個線程到達集合點後,調用CyclicBarrier類的await()方法時,CyclicBarrieer對象將喚醒全部經過await()方法進入等待的線程。算法

CyclicBarrier與CountDownLatch不一樣的地方在於,CountDownLatch經過countDown()方法對計數器減1來標記一個線程已經到達集合點,而且這個線程不會阻塞會繼續執行。而CyclicBarrier類則經過await()方法標記線程到達集合點,而且這個到達集合點的線程會被阻塞。另外CyclicBarrier還支持把一個Runnable對象做爲一個初始化參數,當全部的線程都到達集合點的時候,這個線程會被啓動。這很是相似以一個分治算法的實現,把一個大任務拆分紅若干個子任務,並等待全部子任務結束後,輸出執行結果。多線程

下面咱們用一個實例來演示如何使用CyclicBarrier來模擬使用分治算法在一個矩陣中查找一個數字出現的次數。併發

首先咱們建立一個矩陣類,構造函數接受矩陣的維度以及須要查找的數字,並採用隨機數的方式構建這個矩陣而後記錄下來構建矩陣過程當中這個待查找數字出現的次數。dom

public class MatrixMock {
    private int[][] data;

    public MatrixMock(int size, int length, int number) {
        int count = 0;
        data = new int[size][length];
        Random random = new Random();
        for (int i = 0; i < size; i++) {
            for (int j = 0; j < length; j++) {
                int temp = random.nextInt(10);
                data[i][j] = temp;
                if (temp == number) {
                    count++;
                }
            }
        }
        System.out.printf("Mock: There are %d occurences of %d in generate\n", count, number);
    }

    public int[] getRow(int row) {
        if(row >= 0 && row < data.length) {
            return data[row];
        }
        return null;
    }
}

查找結果類,定義了每行中待查找數字出現的次數。ide

public class Results {
    int data[];

    public Results(int size) {
        data = new int[size];
    }

    public void setData(int position, int value) {
        data[position] = value;
    }

    public int[] getData() {
        return data;
    }
}

接下來咱們建立查找任務線程,這個類接受幾個參數做爲構造方法。分別是MatrixMock(待查找矩陣),Results(查找結果),firstRow(此任務分配的開始查找位置),lastRow(此任務分配的結束查找位置),number(待查找數字),CyclicBarrier(併發輔助類)。而後咱們從firstRow開始查找,直到lastRow結束查找每行中待查找數字出現的次數並保存到Result中。函數

public class Searcher implements Runnable{
    private MatrixMock matrixMock;
    private Results results;
    private int firstRow;
    private int lastRow;
    private int number;
    private final CyclicBarrier cyclicBarrier;

    public Searcher(CyclicBarrier cyclicBarrier, MatrixMock matrixMock, Results results, int firstRow, int lastRow, int number) {
        this.cyclicBarrier = cyclicBarrier;
        this.matrixMock = matrixMock;
        this.results = results;
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.number = number;
    }

    @Override
    public void run() {
        System.out.printf("%s: Processing lines from %d to %d.\n",
                Thread.currentThread().getName(), firstRow, lastRow);

        for (int i = firstRow; i < lastRow; i++) {
            int row[] = matrixMock.getRow(i);
            int counter = 0;
            for (int j = 0; j < row.length; j++) {
                if(row[j] == number) {
                    counter++;
                }
            }
            results.setData(i, counter);
        }

        System.out.printf("%s: Line processed.\n", Thread.currentThread().getName());

        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

查找任務合併線程,也就是等待全部全部查找子任務完成後,把全部子任務查找的結果作一個合計,並打印出一共找到多少個待查找數字。this

public class Grouper implements Runnable{
    private Results results;

    public Grouper(Results results) {
        this.results = results;
    }

    @Override
    public void run() {
        System.out.printf("Grouper: Processing results...\n");
        int count = 0;
        for (int i = 0; i < results.getData().length; i++) {
            count += results.getData()[i];
        }
        System.out.printf("Grouper: Total result: %d\n", count);
    }
}

在主方法類中建立一個10000*10000的矩陣,並啓動5個Searcher線程去查找數字5在每行中出現的次數。而後使用Grouper線程做爲CyclicBarrier的初始化參數,等待全部Searcher任務執行完畢後執行Grouper線程來合計每一個Searcher線程的查找結果。線程

public class Main {
    public static void main(String[] args) {
        int size = 10000;
        int length = 10000;
        int search = 5;
        int participants = 5;
        int lines_participants = size / participants;

        MatrixMock matrixMock = new MatrixMock(size, length, search);
        Results results = new Results(size);
        Grouper grouper = new Grouper(results);

        CyclicBarrier cyclicBarrier = new CyclicBarrier(participants, grouper);
        Searcher[] searchers = new Searcher[participants];
        Thread[] searcherThreads = new Thread[participants];
        for (int i = 0; i < participants; i++) {
            int firstRow = i * lines_participants;
            int lastRow = (i + 1) * lines_participants;
            searchers[i] = new Searcher(cyclicBarrier, matrixMock, results, firstRow, lastRow, search);
            searcherThreads[i] = new Thread(searchers[i]);
            searcherThreads[i].start();
        }
    }
}

查看任務執行日誌,咱們發現每一個Searcher任務執行2000行數據的查找工做,最終找到9997834個帶查找數字。日誌

Mock: There are 9997834 occurences of 5 in generate
Thread-0: Processing lines from 0 to 2000.
Thread-1: Processing lines from 2000 to 4000.
Thread-2: Processing lines from 4000 to 6000.
Thread-3: Processing lines from 6000 to 8000.
Thread-0: Line processed.
Thread-2: Line processed.
Thread-3: Line processed.
Thread-4: Processing lines from 8000 to 10000.
Thread-1: Line processed.
Thread-4: Line processed.
Grouper: Processing results...
Grouper: Total result: 9997834

CyclicBarrier類提供了兩個方法用來查看在CyclicBarrier上面等待的線程數和同步的任務數。

System.out.printf("CyclicBarrier: %d, %d\n", cyclicBarrier.getNumberWaiting(), cyclicBarrier.getParties());

CyclicBarrier類還支持重置,經過reset()方法完成操做。當重置發生後,await()方法將接受一個BrokenBarrierException異常,你捕獲這個異常後能夠用來執行一些複雜的操做,好比回滾數據或者從新執行。

CyclicBarrier類有一個特有的狀態爲Broken,當多線程併發等待的時候,有一個線程被中斷,這個線程拋出InterruptedException異常,其餘等待的線程將拋出BrokenBarrierException異常,因而CyclicBarrier對象屬於Broken狀態。你可使用isBroken()方法來判斷CyclicBarrier對象是否處於損壞狀態。

相關文章
相關標籤/搜索