(三)線程同步工具集_4---在一個約定點同步任務

在一個約定點同步任務(Synchronizing tasks in a  common point)

Java 併發API中提供了一個同步工具CyclicBarrier類能夠使多個線程在約定點進行任務同步,該類和CountDownLatch類有點相似,CountDownLatch是等待多個併發事件,在上節有解釋;java

CyclicBarrier類初始化有兩個參數,第一個是要同步的線程個數,第二個要同步的任務(該參數實現了Runnable接口);當這些要同步的線程到達到了這個約定的同步點,它將調用await()方法進入睡眠狀態,當全部要同步的線程都到達了這個約定的點後,CyclicBarrier將會喚醒全部睡眠的這些線程,而後執行要同步的任務(傳入的第二個參數);算法

CyclicBarrier最好的就是它能夠傳入一個實現了Runnable接口的對象,在全部線程到達約定點後,在執行這個對象;這個特性很是適合分治算法 的思想;數組

再接下來的例子中,簡單的使用CyclicBarrier模擬一個簡單的分治算法;併發

問題:即模擬一個矩陣,從該矩陣中查找一個給定的數,並統計該數值在該矩陣中出現的次數;dom

解決思路:ide

開啓多個線程,每一個線程負責查找若干行,並把每一行該數值出現的次數放置到一個數組中,做爲結果,每行對應與該數組的索引;在全部線程都查找完成後,執行同步任務,這裏僅僅簡單的輸出最終的結果;工具

動手實現

1.輔助類,用來模擬矩陣this

public class MatrixMock {
    private int data[][];
    public MatrixMock(int rows,int cols,int number){
        int counter=0;
        data=new int[rows][cols];
        Random random=new Random();
        for (int i = 0; i < rows; i++) {
            for(int j=0;j<cols;j++){
                data[i][j]=random.nextInt(10);
                if (data[i][j] == number) {
                    counter++;
                }
            }

        }
        System.out.printf("Mock: There are %d occurrence of %d in generated data.\n",counter,number);
    }

    public int[] getRow(int row) {
        if ((row >= 0) && (row < data.length)) {
            return data[row];
        }
        return null;
    }
}
2.輔助類,用來放置每一個線程處理結果,索引對應矩陣行數,列對應每行查找到指定數值的次數

public class Result {
    private int data[];

    public Result(int size) {
        this.data = new int[size];
    }

    public void setData(int position,int value){
        data[position]=value;
    }

    public int[] getData(){
        return data;
    }
}

3.用來執行計算的線程

public class Searcher implements Runnable {
    private int firstRow;
    private int lastRow;
    private MatrixMock mock;
    private Result result;
    private int number;
    private final CyclicBarrier barrier;

    public Searcher(int firstRow, int lastRow, MatrixMock mock, Result result,
                    int number, CyclicBarrier barrier) {
        this.firstRow = firstRow;
        this.lastRow = lastRow;
        this.mock = mock;
        this.result = result;
        this.number = number;
        this.barrier = barrier;
    }

    @Override
    public void run() {
        int counter;
        System.out.printf("%s: Processing lines from %d to %d.\n",
                Thread.currentThread().getName(), firstRow, lastRow);
        for (int i = firstRow; i < lastRow; i++) {
            int row[] = mock.getRow(i);
            counter = 0;
            for (int j = 0; j < row.length; j++) {
                if (row[j] == number) {
                    counter++;
                }
            }
            result.setData(i, counter);
        }
        System.out.printf("%s: Lines processed.\n", Thread.currentThread().getName());

        try {
            barrier.await();
        } catch (BrokenBarrierException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}
4.同步任務,當全部線程完成以後執行,相似於分治算法中的 結果合併

public class Grouper implements Runnable {
    private Result result;

    public Grouper(Result result) {
        this.result = result;
    }


    @Override
    public void run() {
        int finalResult=0;
        System.out.printf("Grouper: Processing results...\n");
        int data[]=result.getData();
        for(int number:data){
            finalResult+=number;
        }
        System.out.printf("Grouper: Total result: %d.\n",finalResult);
    }
}

5.Main

public class Main {
    public static void main(String[] args) {
        final int rows=10000;
        final int cols=1000;
        final int search=5;
        final int participants=5;
        final int linesParticipant=2000;
        MatrixMock mock=new MatrixMock(rows, cols,search);
        Result result=new Result(rows);
        Grouper grouper=new Grouper(result);

        CyclicBarrier barrier=new CyclicBarrier(participants,grouper);

        Searcher searchers[]=new Searcher[participants];

        for (int i=0; i<participants; i++){
            // Every searching thread searches 2000 rows
            searchers[i]=new Searcher(i*linesParticipant,
                    (i*linesParticipant) + linesParticipant, mock, result, 5,barrier);
            Thread thread=new Thread(searchers[i]);
            thread.start();
        }
        System.out.printf("Main: The main thread has finished.\n");

    }
}
一次運行結果:

Mock: There are 1001252 occurrence of 5 in generated data.
Main: The main thread has finished.
Thread-0: Processing lines from 0 to 2000.
Thread-4: Processing lines from 8000 to 10000.
Thread-1: Processing lines from 2000 to 4000.
Thread-3: Processing lines from 6000 to 8000.
Thread-2: Processing lines from 4000 to 6000.
Thread-1: Lines processed.
Thread-3: Lines processed.
Thread-2: Lines processed.
Thread-0: Lines processed.
Thread-4: Lines processed.
Grouper: Processing results...
Grouper: Total result: 1001252.spa

要點

1.CyclicBarrier還提供了getNumberWaiting()方法,用來獲取當前被阻塞的線程個數;線程

2.利用該類執行分治任務是一個很不錯的選擇;

相關文章
相關標籤/搜索