java併發之同步輔助類(Semphore、CountDownLatch、CyclicBarrier、Phaser)

線程同步輔助類,主要學習兩點:數組

一、上述幾種同步輔助類的做用以及經常使用的方法
二、適用場景,若是有適當的場景能夠用到,那無疑是最好的

semaphore(seməˌfôr)

含義
信號量就是能夠聲明多把鎖(包括一把鎖:此時爲互斥信號量)。
舉個例子:一個房間若是隻能容納5我的,多出來的人必須在門外面等着。如何去作呢?一個解決辦法就是:房間外面掛着五把鑰匙,每進去一我的就取走一把鑰匙,沒有鑰匙的不能進入該房間而是在外面等待。每出來一我的就把鑰匙放回原處以方便別人再次進入。
經常使用方法
acquire():獲取信號量,信號量內部計數器減1
release():釋放信號量,信號量內部計數器加1
tryAcquire():這個方法試圖獲取信號量,若是可以獲取返回true,不然返回false
信號量控制的線程數量在聲明時肯定。例如:
Semphore s = new Semphore(2);
 
一個例子
實現一個功能:一個打印隊列,被三臺打印機打印
public class PrintQueue {
    private Semaphore semaphore;
    private boolean freePrinters[];
    private Lock lockPrinters;
    
    public PrintQueue(){
        semaphore=new Semaphore(3);
        freePrinters=new boolean[3];
        for (int i=0; i<3; i++){
            freePrinters[i]=true;
        }
        lockPrinters=new ReentrantLock();
    }
    
    public void printJob (Object document){
        try {
            semaphore.acquire();
            
            int assignedPrinter=getPrinter();
            
            Long duration=(long)(Math.random()*10);
            System.out.printf("%s: PrintQueue: Printing a Job in Printer %d during %d seconds\n",Thread.currentThread().getName(),assignedPrinter,duration);
            TimeUnit.SECONDS.sleep(duration);
            
            freePrinters[assignedPrinter]=true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // Free the semaphore
            semaphore.release();            
        }
    }
    private int getPrinter() {
        int ret=-1;
        
        try {
            lockPrinters.lock();
            for (int i=0; i<freePrinters.length; i++) {
                if (freePrinters[i]){
                    ret=i;
                    freePrinters[i]=false;
                    break;
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lockPrinters.unlock();
        }
        return ret;
    }
}
聲明一個Job類,使用打印隊列
 1 public class Job implements Runnable {
 2     private PrintQueue printQueue;
 3     
 4     public Job(PrintQueue printQueue){
 5         this.printQueue=printQueue;
 6     }
 7     
 8     @Override
 9     public void run() {
10         System.out.printf("%s: Going to print a job\n",Thread.currentThread().getName());
11         printQueue.printJob(new Object());
12         System.out.printf("%s: The document has been printed\n",Thread.currentThread().getName());        
13     }
14 }
Main方法
public static void main (String args[]){
        PrintQueue printQueue=new PrintQueue();
        Thread thread[]=new Thread[12];
        for (int i=0; i<12; i++){
            thread[i]=new Thread(new Job(printQueue),"Thread "+i);
        }
        for (int i=0; i<12; i++){
            thread[i].start();
        }
    }
 
須要注意的地方
一、對於信號量聲明的臨界區,雖然能夠控制線程訪問的數量,可是不能保證代碼塊之間是線程安全的。因此上面的例子在方法printJob()方法裏面使用了鎖保證數據安全性。
二、信號量也涉及到公平性問題。和鎖公平性同樣,這裏默認是非公平的。能夠經過構造器顯示聲明鎖的公平性。
public Semaphore(int permits, boolean fair)
 
應用場景
流量控制,即控制可以訪問的最大線程數。

CountDownLatch

含義
CountDownLatch能夠理解爲一個計數器在初始化時設置初始值,當一個線程須要等待某些操做先完成時,須要調用await()方法。這個方法讓線程進入休眠狀態直到等待的全部線程都執行完成。每調用一次countDown()方法內部計數器減1,直到計數器爲0時喚醒。這個能夠理解爲特殊的CyclicBarrier。線程同步點比較特殊,爲內部計數器值爲0時開始。
 
方法
核心方法兩個:countDown()和await()
countDown():使CountDownLatch維護的內部計數器減1,每一個被等待的線程完成的時候調用
await():線程在執行到CountDownLatch的時候會將此線程置於休眠
 
例子
開會的例子:會議室裏等與會人員到齊了會議才能開始。
 1 public class VideoConference implements Runnable{
 2     private final CountDownLatch controller;
 3     
 4     public VideoConference(int number) {
 5         controller=new CountDownLatch(number);
 6     }
 7     public void arrive(String name){
 8         System.out.printf("%s has arrived.\n",name);
 9 
10         controller.countDown();//調用countDown()方法,使內部計數器減1
11         System.out.printf("VideoConference: Waiting for %d participants.\n",controller.getCount());
12     }
13     
14     @Override
15     public void run() {
16         System.out.printf("VideoConference: Initialization: %d participants.\n",controller.getCount());
17         try {
18 
19             controller.await();//等待,直到CoutDownLatch計數器爲0
20 
21             System.out.printf("VideoConference: All the participants have come\n");
22             System.out.printf("VideoConference: Let's start...\n");
23         } catch (InterruptedException e) {
24             e.printStackTrace();
25         }
26     }
27 }
參加會議人員類
 1 public class Participant implements Runnable {
 2     private VideoConference conference;
 3     
 4     private String name;
 5 
 6     public Participant(VideoConference conference, String name) {
 7         this.conference=conference;
 8         this.name=name;
 9     }
10     @Override
11     public void run() {
12         Long duration=(long)(Math.random()*10);
13         try {
14             TimeUnit.SECONDS.sleep(duration);
15         } catch (InterruptedException e) {
16             e.printStackTrace();
17         }    
18         conference.arrive(name);//每到一我的員,CountDownLatch計數器就減小1
19     }
20 }
主函數
 1 public static void main(String[] args) {
 2         VideoConference conference = new VideoConference(10);
 3         Thread threadConference = new Thread(conference);
 4         threadConference.start();//開啓await()方法,在內部計數器爲0以前線程處於等待狀態
 5         for (int i = 0; i < 10; i++) {
 6             Participant p = new Participant(conference, "Participant " + i);
 7             Thread t = new Thread(p);
 8             t.start();
 9         }
10     }
 
須要注意的地方
CountDownLatch比較容易記憶的是他的功能,是一個線程計數器。等計數器爲0時那些先前因調用await()方法休眠的線程被喚醒。
CountDownLatch可以控制的線程是哪些?是那些調用了CountDownLatch的await()方法的線程
具體使用方式,容易忘記:先運行await()方法的線程,例子中是視頻會議的線程。而後是執行與會者 線程,這裏的處理是每到一位(每建立一個線程並運行run()方法時就使計數器減1)就讓計數器減1,等計數器減爲0時喚醒因調用await()方法進入休眠的線程。這裏的這些與會者就是要等待的線程。
 
應用場景
等人到齊了才能開始開會;

CyclicBarrier

含義
柵欄容許兩個或者多個線程在某個集合點同步。當一個線程到達集合點時,它將調用await()方法等待其它的線程。線程調用await()方法後,CyclicBarrier將阻塞這個線程並將它置入休眠狀態等待其它線程的到來。等最後一個線程調用await()方法時,CyclicBarrier將喚醒全部等待的線程而後 這些線程將繼續執行CyclicBarrier能夠傳入另外一個Runnable對象做爲初始化參數。當全部的線程都到達集合點後,CyclicBarrier類將Runnable對象做爲線程執行。
 
方法
await():使線程置入休眠直到最後一個線程的到來以後喚醒全部休眠的線程
 
例子
在矩陣(二維數組)中查找一個指定的數字。矩陣將被分爲多個子集,每一個子集交給一個線程去查找。當全部線程查找完畢後交給最後的線程彙總結果。
查找類:在一個子集中查找指定數字,找到以後把結果存儲後調用await()方法置入休眠等待最後一個線程的到來喚醒
 1 public class Searcher implements Runnable {
 2     private final CyclicBarrier barrier;
 3     @Override
 4     public void run() {
 5         int counter;
 6         System.out.printf("%s: Processing lines from %d to %d.\n",Thread.currentThread().getName(),firstRow,lastRow);
 7         for (int i=firstRow; i<lastRow; i++){
 8             int row[]=mock.getRow(i);
 9             counter=0;
10             for (int j=0; j<row.length; j++){
11                 if (row[j]==number){
12                     counter++;
13                 }
14             }
15             results.setData(i, counter);
16         }
17         System.out.printf("%s: Lines processed.\n",Thread.currentThread().getName());        
18         try {
19             barrier.await();
20         } catch (InterruptedException e) {
21             e.printStackTrace();
22         } catch (BrokenBarrierException e) {
23             e.printStackTrace();
24         }
25     }
26 }
彙總類:彙總每一個Searcher找到的結果
 1 public class Grouper implements Runnable {
 2     private Results results;
 3     
 4     public Grouper(Results results){
 5         this.results=results;
 6     }
 7     @Override
 8     public void run() {
 9         int finalResult=0;
10         System.out.printf("Grouper: Processing results...\n");
11         int data[]=results.getData();
12         for (int number:data){
13             finalResult+=number;
14         }
15         System.out.printf("Grouper: Total result: %d.\n",finalResult);
16     }
17 }
主函數,如何把Searcher和Grouper類配合起來呢??
 1 public static void main(String[] args) {
 2         final int ROWS=10000;
 3         final int NUMBERS=1000;
 4         final int SEARCH=5; 
 5         final int PARTICIPANTS=5;
 6         final int LINES_PARTICIPANT=2000;
 7         MatrixMock mock=new MatrixMock(ROWS, NUMBERS,SEARCH);//矩陣的聲明
 8         
 9         Results results=new Results(ROWS);//結果集
10         
11         Grouper grouper=new Grouper(results);//彙總線程
12         
13         CyclicBarrier barrier=new CyclicBarrier(PARTICIPANTS,grouper);//柵欄,傳入參數含義:線程同步個數,彙總線程
14         
15         Searcher searchers[]=new Searcher[PARTICIPANTS];
16         for (int i=0; i<PARTICIPANTS; i++){
17             searchers[i]=new Searcher(i*LINES_PARTICIPANT, (i*LINES_PARTICIPANT)+LINES_PARTICIPANT, mock, results, 5,barrier);
18             Thread thread=new Thread(searchers[i]);
19             thread.start();
20         }
21         System.out.printf("Main: The main thread has finished.\n");
22     }
運行結果:
Mock: There are 999286 ocurrences of number in generated data.
Thread-0: Processing lines from 0 to 2000.
Main: The main thread has finished.
Thread-0: Lines processed.
Thread-1: Processing lines from 2000 to 4000.
Thread-1: Lines processed.
Thread-3: Processing lines from 6000 to 8000.
Thread-3: Lines processed.
Thread-2: Processing lines from 4000 to 6000.
Thread-2: Lines processed.
Thread-4: Processing lines from 8000 to 10000.
Thread-4: Lines processed.
Grouper: Processing results...
Grouper: Total result: 999286.
 
須要注意的地方
線程完成任務後調用CyclicBarrier的await()方法休眠等待。在全部線程在集合點均到達時,柵欄調用傳入的Runnable對象進行最後的執行。
與CountDownLatch的區別:
  • 在全部線程到達集合點後接受一個Runnable類型的對象做爲後續的執行
  • 沒有顯示調用CountDown()方法
  • CountDownLatch通常只能使用一次,CyclicBarrier能夠屢次使用
應用場景
多個線程作任務,等到達集合點同步後交給後面的線程作彙總

Phaser

含義
更加複雜和強大的同步輔助類。它容許併發執行多階段任務。當咱們有併發任務而且須要分解成幾步執行時,(CyclicBarrier是分紅兩步),就能夠選擇使用Phaser。Phaser類機制是在每一步結束的位置對線程進行同步,當全部的線程都完成了這一步,才容許執行下一步。
跟其餘同步工具同樣,必須對Phaser類中參與同步操做的任務數進行初始化,不一樣的是, 能夠動態的增長或者減小任務數。
 
函數
arriveAndAwaitAdvance():相似於CyclicBarrier的await()方法,等待其它線程都到來以後同步繼續執行
arriveAndDeregister():把執行到此的線程從Phaser中註銷掉
isTerminated():判斷Phaser是否終止
register():將一個新的參與者註冊到Phaser中,這個新的參與者將被當成沒有執行完本階段的線程
forceTermination():強制Phaser進入終止態
... ...
 
例子
使用Phaser類同步三個併發任務。這三個任務將在三個不一樣的文件夾及其子文件夾中查找過去24小時內修改過擴展爲爲.log的文件。這個任務分紅如下三個步驟:
一、在執行的文件夾及其子文件夾中獲取擴展名爲.log的文件
二、對每一步的結果進行過濾,刪除修改時間超過24小時的文件
三、將結果打印到控制檯
在第一步和第二步結束的時候,都會檢查所查找到的結果列表是否是有元素存在。若是結果列表是空的,對應的線程將結束執行,並從Phaser中刪除。(也就是動態減小任務數)
文件查找類
  1 public class FileSearch implements Runnable {
  2     private String initPath;
  3 
  4     private String end;
  5     
  6     private List<String> results;
  7 
  8     private Phaser phaser;
  9 
 10     public FileSearch(String initPath, String end, Phaser phaser) {
 11         this.initPath = initPath;
 12         this.end = end;
 13         this.phaser=phaser;
 14         results=new ArrayList<>();
 15     }
 16     @Override
 17     public void run() {
 18 
 19         phaser.arriveAndAwaitAdvance();//等待全部的線程建立完成,確保在進行文件查找的時候全部的線程都已經建立完成了
 20         
 21         System.out.printf("%s: Starting.\n",Thread.currentThread().getName());
 22         
 23         // 1st Phase: 查找文件
 24         File file = new File(initPath);
 25         if (file.isDirectory()) {
 26             directoryProcess(file);
 27         }
 28         
 29         // 若是查找結果爲false,那麼就把該線程從Phaser中移除掉而且結束該線程的運行
 30         if (!checkResults()){
 31             return;
 32         }
 33         
 34         // 2nd Phase: 過濾結果,過濾出符合條件的(一天內的)結果集
 35         filterResults();
 36         
 37         // 若是過濾結果集結果是空的,那麼把該線程從Phaser中移除,不讓它進入下一階段的執行
 38         if (!checkResults()){
 39             return;
 40         }
 41         
 42         // 3rd Phase: 顯示結果
 43         showInfo();
 44         phaser.arriveAndDeregister();//任務完成,註銷掉全部的線程
 45         System.out.printf("%s: Work completed.\n",Thread.currentThread().getName());
 46     }
 47     private void showInfo() {
 48         for (int i=0; i<results.size(); i++){
 49             File file=new File(results.get(i));
 50             System.out.printf("%s: %s\n",Thread.currentThread().getName(),file.getAbsolutePath());
 51         }
 52         // Waits for the end of all the FileSearch threads that are registered in the phaser
 53         phaser.arriveAndAwaitAdvance();
 54     }
 55     private boolean checkResults() {
 56         if (results.isEmpty()) {
 57             System.out.printf("%s: Phase %d: 0 results.\n",Thread.currentThread().getName(),phaser.getPhase());
 58             System.out.printf("%s: Phase %d: End.\n",Thread.currentThread().getName(),phaser.getPhase());
 59             //結果爲空,Phaser完成並把該線程從Phaser中移除掉
 60             phaser.arriveAndDeregister();
 61             return false;
 62         } else {
 63             // 等待全部線程查找完成
 64             System.out.printf("%s: Phase %d: %d results.\n",Thread.currentThread().getName(),phaser.getPhase(),results.size());
 65             phaser.arriveAndAwaitAdvance();
 66             return true;
 67         }        
 68     }
 69     private void filterResults() {
 70         List<String> newResults=new ArrayList<>();
 71         long actualDate=new Date().getTime();
 72         for (int i=0; i<results.size(); i++){
 73             File file=new File(results.get(i));
 74             long fileDate=file.lastModified();
 75             
 76             if (actualDate-fileDate<TimeUnit.MILLISECONDS.convert(1,TimeUnit.DAYS)){
 77                 newResults.add(results.get(i));
 78             }
 79         }
 80         results=newResults;
 81     }
 82     private void directoryProcess(File file) {
 83         // Get the content of the directory
 84         File list[] = file.listFiles();
 85         if (list != null) {
 86             for (int i = 0; i < list.length; i++) {
 87                 if (list[i].isDirectory()) {
 88                     // If is a directory, process it
 89                     directoryProcess(list[i]);
 90                 } else {
 91                     // If is a file, process it
 92                     fileProcess(list[i]);
 93                 }
 94             }
 95         }
 96     }
 97     private void fileProcess(File file) {
 98         if (file.getName().endsWith(end)) {
 99             results.add(file.getAbsolutePath());
100         }
101     }
102 }
主函數:
 1 public static void main(String[] args) {
 2         Phaser phaser = new Phaser(3);
 3 
 4         FileSearch system = new FileSearch("C:\\Windows", "log", phaser);
 5         FileSearch apps = new FileSearch("C:\\Program Files", "log", phaser);
 6         FileSearch documents = new FileSearch("C:\\Documents And Settings", "log", phaser);
 7 
 8         Thread systemThread = new Thread(system, "System");
 9         systemThread.start();
10         Thread appsThread = new Thread(apps, "Apps");
11         appsThread.start();        
12         Thread documentsThread = new Thread(documents, "Documents");
13         documentsThread.start();
14         try {
15             systemThread.join();
16             appsThread.join();
17             documentsThread.join();
18         } catch (InterruptedException e) {
19             e.printStackTrace();
20         }
21         System.out.printf("Terminated: %s\n", phaser.isTerminated());
22     }
 
注意的地方
例子中Phaser分了三個步驟:查找文件、過濾文件、打印結果。而且在查找文件和過濾文件結束後對結果進行分析,若是是空的,將此線程從Phaser中註銷掉。也就是說,下一階段,該線程將不參與運行。
在run()方法中,開頭調用了phaser的arriveAndAwaitAdvance()方法來保證全部線程都啓動了以後再開始查找文件。在查找文件和過濾文件階段結束以後,都對結果進行了處理。即:若是結果是空的,那麼就把該條線程移除,若是不空,那麼等待該階段全部線程都執行完該步驟以後在統一執行下一步。最後,任務執行完後,把Phaser中的線程均註銷掉。
Phaser其實有兩個狀態:活躍態和終止態。當存在參與同步的線程時,Phaser就是活躍的。而且在每一個階段結束的時候同步。當全部參與同步的線程都取消註冊的時候,Phase就處於終止狀態。在這種狀態下,Phaser沒有任務參與者。
Phaser主要功能就是執行多階段任務,並保證每一個階段點的線程同步。在每一個階段點還能夠條件或者移除參與者。主要涉及方法arriveAndAwaitAdvance()和register()和arriveAndDeregister()
 
使用場景
多階段任務
相關文章
相關標籤/搜索