多線程必看之JAVA線程併發輔助類

前言:多線程併發在咱們的業務場景很常見,可是爲了解決多線程的併發問題並不容易,即使你是中高級程序員亦或是架構師,你都不必定可以吃透並解決多線程併發場景所暴露的問題,今天拋磚引玉來給你們介紹下多程序併發的五種同步輔助類,但願在線程併發上能爲你們之後提供一下思路以及解決問題。html

1.CountDownLatch java

CountDownLatch 是一種很是簡單、但很經常使用的同步輔助類。其做用是在完成一組正在其餘線程中執行的操做以前,容許一個或多個線程一直阻塞。用給定的計數初始化CountDownLatch。因爲調用了countDown()方法,因此在當前計數到達零以前,await方法會一直受阻塞。以後,會釋放全部等待的線程,await的全部後續調用都將當即返回程序員

示例代碼編程

 

  1. //建立時,就須要指定參與的parties個數  數組

  2. int parties = 12;  安全

  3. CountDownLatch latch = new CountDownLatch(parties);  多線程

  4. //線程池中同步task  架構

  5. ExecutorService executor = Executors.newFixedThreadPool(parties);  併發

  6. for(int i = 0; i < parties; i++) {  dom

  7.     executor.execute(new Runnable() {  

  8.         @Override  

  9.         public void run() {  

  10.             try {  

  11.                 //能夠在任務執行開始時執行,表示全部的任務都啓動後,主線程的await便可解除  

  12.                 //latch.countDown();  

  13.                 //run  

  14.                 //..  

  15.                 Thread.sleep(3000);  

  16.   

  17.             } catch (Exception e) {  

  18.   

  19.             }  

  20.             finally {  

  21.                 //任務執行完畢後:到達  

  22.                 //表示全部的任務都結束,主線程才能繼續  

  23.                 latch.countDown();  

  24.             }  

  25.         }  

  26.     });  

  27. }  

  28. latch.await();//主線程阻塞,直到全部的parties到達  

  29. //latch上全部的parties都達到後,再次執行await將不會有效,  

  30. //即barrier是不可重用的  

  31. executor.shutdown(); 

 

2.CyclicBarrier

CyclicBarrier 一種可重置的多路同步點,在某些併發編程場景頗有用。它容許一組線程互相等待,直到到達某個公共的屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier在釋放等待線程後能夠重用,因此稱它爲循環的barrier。

 

下面看看對應的方法。

  • public CyclicBarrier(int parties, Runnable barrierAction) 
    建立一個新的CycleBarrier,它將在給定數量的參與者(線程)處於等待狀態時候啓動,並在啓動barrier時執行給定的屏障操做,該操做由最後一個今日的屏障的線程執行。參數barrierAction是在啓動屏障的時候執行命令,若是不執行任何操做則該參數是null。

  • public int await() 
    在全部參與者都已經在此barrier上調用 await方法以前,將一直等待。若是當前線程部署將要到達的最後一個線程將禁用它。

  • reset 
    將屏障重置爲其初始化狀態,若是全部參與者目前都在屏障處等待,則它們將返回,並且會拋出一個異常。

  • getNumberWaiting 
    返回當前在屏障處等待的參與者數目。

 

 

 

3.Semaphore(信號量)

信號量是一類經典的同步工具。信號量一般用來限制線程能夠同時訪問的(物理或邏輯)資源數量。

 

咱們以一個停車場運做爲例來講明信號量的做用。假設停車場只有三個車位,一開始三個車位都是空的。這時若是同時來了三輛車,看門人容許其中它們進入進入,而後放下車攔。之後來的車必須在入口等待,直到停車場中有車輛離開。這時,若是有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開一輛,則又能夠放入一輛,如此往復。

在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。信號量是一個非負整數,表示了當前公共資源的可用數目(在上面的例子中能夠用空閒的停車位類比信號量),當一個線程要使用公共資源時(在上面的例子中能夠用車輛類比線程),首先要查看信號量,若是信號量的值大於1,則將其減1,而後去佔有公共資源。若是信號量的值爲0,則線程會將本身阻塞,直到有其它線程釋放公共資源。

在信號量上咱們定義兩種操做: acquire(獲取) 和 release(釋放)。當一個線程調用acquire操做時,它要麼經過成功獲取信號量(信號量減1),要麼一直等下去,直到有線程釋放信號量,或超時。release(釋放)實際上會將信號量的值加1,而後喚醒等待的線程。

信號量主要用於兩個目的,一個是用於多個共享資源的互斥使用,另外一個用於併發線程數的控制。

 

使用示例
papublic class SemaphoreDemo {	
       private Semaphore smp = new Semaphore(3); private Random rnd = new Random(); class TaskDemo implements Runnable{
       private String id; TaskDemo(String id){
                   this.id = id; }
                       public void run(){
                           try { smp.acquire(); System.out.println("Thread " + id + " is working"); Thread.sleep(rnd.nextInt(1000)); smp.release(); System.out.println("Thread " + id + " is over"); } catch (InterruptedException e) { } } } public static void main(String[] args){ SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); //注意我建立的線程池類型, ExecutorService se = Executors.newCachedThreadPool(); se.submit(semaphoreDemo.new TaskDemo("a")); se.submit(semaphoreDemo.new TaskDemo("b")); se.submit(semaphoreDemo.new TaskDemo("c")); se.submit(semaphoreDemo.new TaskDemo("d")); se.submit(semaphoreDemo.new TaskDemo("e")); se.submit(semaphoreDemo.new TaskDemo("f")); se.shutdown(); }}

運行結果

Thread c is working

Thread b is working

Thread a is working

Thread c is over

Thread d is working

Thread b is over

Thread e is working

Thread a is over

Thread f is working

Thread d is over

Thread e is over

Thread f is over

能夠看出,最多同時有三個線程併發執行,也能夠認爲有三個公共資源(好比計算機的三個串口)。

參考內容:http://www.cnblogs.com/nullzx/p/5270233.html

 



4.Phaser

 

在JAVA 1.7引入了一個新的併發API:Phaser,一個可重用的同步barrier。在此前,JAVA已經有CyclicBarrier、CountDownLatch這兩種同步barrier,可是Phaser更加靈活,並且側重於「重用」。

API簡述

     一、Phaser():構造函數,建立一個Phaser;默認parties個數爲0。此後咱們能夠經過register()、bulkRegister()方法來註冊新的parties。每一個Phaser實例內部,都持有幾個狀態數據:termination狀態、已經註冊的parties個數(registeredParties)、當前phase下已到達的parties個數(arrivedParties)、當前phase週期數,還有2個同步阻塞隊列Queue。Queue中保存了全部的waiter,即由於advance而等待的線程信息;這兩個Queue分別爲evenQ和oddQ,這兩個Queue在實現上沒有任何區別,Queue的元素爲QNode,每一個QNode保存一個waiter的信息,好比Thread引用、阻塞的phase、超時的deadline、是否支持interrupted響應等。兩個Queue,其中一個保存當前phase中正在使用的waiter,另外一個備用,當phase爲奇數時使用evenQ、oddQ備用,偶數時相反,即兩個Queue輪換使用。當advance事件觸發期間,新register的parties將會被放在備用的Queue中,advance只須要響應另外一個Queue中的waiters便可,避免出現混亂。

 

    二、Phaser(int parties):構造函數,初始必定數量的parties;至關於直接regsiter此數量的parties。

    三、arrive():到達,阻塞,等到當前phase下其餘parties到達。若是沒有register(即已register數量爲0),調用此方法將會拋出異常,此方法返回當前phase週期數,若是Phaser已經終止,則返回負數。

    四、arriveAndDeregister():到達,並註銷一個parties數量,非阻塞方法。註銷,將會致使Phaser內部的parties個數減一(隻影響當前phase),即下一個phase須要等待arrive的parties數量將減一。異常機制和返回值,與arrive方法一致。

    五、arriveAndAwaitAdvance():到達,且阻塞直到其餘parties都到達,且advance。此方法等同於awaitAdvance(arrive())。若是你但願阻塞機制支持timeout、interrupted響應,可使用相似的其餘方法(參見下文)。若是你但願到達後且註銷,並且阻塞等到當前phase下其餘的parties到達,可使用awaitAdvance(arriveAndDeregister())方法組合。此方法的異常機制和返回值同arrive()。

    六、awaitAdvance(int phase):阻塞方法,等待phase週期數下其餘全部的parties都到達。若是指定的phase與Phaser當前的phase不一致,則當即返回。

    七、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted響應,即waiter線程若是被外部中斷,則此方法當即返回,並拋出InterrutedException。

    八、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout類型的interrupted響應,即當前線程阻塞等待約定的時長,超時後以TimeoutException異常方式返回。

    九、forceTermination():強制終止,此後Phaser對象將不可用,即register等將再也不有效。此方法將會致使Queue中全部的waiter線程被喚醒。

    十、register():新註冊一個party,致使Phaser內部registerPaties數量加1;若是此時onAdvance方法正在執行,此方法將會等待它執行完畢後纔會返回。此方法返回當前的phase週期數,若是Phaser已經中斷,將會返回負數。

    十一、bulkRegister(int parties):批量註冊多個parties數組,規則同十、。

    十二、getArrivedParties():獲取已經到達的parties個數。

    1三、getPhase():獲取當前phase週期數。若是Phaser已經中斷,則返回負值。

    1四、getRegisteredParties():獲取已經註冊的parties個數。

    1五、getUnarrivedParties():獲取還沒有到達的parties個數。

代碼示例:

 

  1. //建立時,就須要指定參與的parties個數  

  2. int parties = 12;  

  3. //能夠在建立時不指定parties  

  4. // 而是在運行時,隨時註冊和註銷新的parties  

  5. Phaser phaser = new Phaser();  

  6. //主線程先註冊一個  

  7. //對應下文中,主線程能夠等待全部的parties到達後再解除阻塞(相似與CountDownLatch)  

  8. phaser.register();  

  9. ExecutorService executor = Executors.newFixedThreadPool(parties);  

  10. for(int i = 0; i < parties; i++) {  

  11.     phaser.register();//每建立一個task,咱們就註冊一個party  

  12.     executor.execute(new Runnable() {  

  13.         @Override  

  14.         public void run() {  

  15.             try {  

  16.                 int i = 0;  

  17.                 while (i < 3 && !phaser.isTerminated()) {  

  18.                     System.out.println("Generation:" + phaser.getPhase());  

  19.                     Thread.sleep(3000);  

  20.                     //等待同一週期內,其餘Task到達  

  21.                     //而後進入新的週期,並繼續同步進行  

  22.                     phaser.arriveAndAwaitAdvance();  

  23.                     i++;//咱們假定,運行三個週期便可  

  24.                 }  

  25.             } catch (Exception e) {  

  26.   

  27.             }  

  28.             finally {  

  29.                 phaser.arriveAndDeregister();  

  30.             }  

  31.         }  

  32.     });  

  33. }  

  34. //主線程到達,且註銷本身  

  35. //此後線程池中的線程便可開始按照週期,同步執行。  

  36. phaser.arriveAndDeregister();  

參考內容:http://shift-alt-ctrl.iteye.com/blog/2302923

 

5.Exchanger

 

類java.util.concurrent.Exchanger提供了一個同步點,在這個同步點,一對線程能夠交換數據。每一個線程經過exchange()方法的入口提供數據給他的夥伴線程,並接收他的夥伴線程提供的數據,並返回。

當在運行不對稱的活動時頗有用。好比說,一個線程向buffer中填充數據,另外一個線程從buffer中消費數據;這些線程能夠用Exchange來交換數據。這個交換對於兩個線程來講都是安全的。

 

 

package com.clzhang.sample.thread;import java.util.*;import java.util.concurrent.Exchanger;public class SyncExchanger {    

private static final Exchanger exchanger = new Exchanger();    

class DataProducer implements Runnable {        

private List list = new ArrayList();      public void run() {          
         for (int i = 0; i < 5; i++) {                System.out.println("生產了一個數據,耗時1秒");                list.add(new Date());                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            }            try {                list = (List) exchanger.exchange(list);            } catch (InterruptedException e) {                e.printStackTrace();            }          
         for (Iterator iterator = list.iterator(); iterator.hasNext();) {                System.out.println("Producer " + iterator.next());            }        }    }    

class DataConsumer implements Runnable {        

           private List list = new ArrayList();
           public void run() {            
            for (int i = 0; i < 5; i++) {                list.add("這是一個收條。");            }            

           try {                list = (List) exchanger.exchange(list);            } catch (InterruptedException e) {                e.printStackTrace();            }            

           for (Iterator iterator = list.iterator(); iterator.hasNext();) {                Date d = (Date) iterator.next();                System.out.println("Consumer: " + d);            }        }    }    

       public static void main(String[] args) {         SyncExchanger ins = new SyncExchanger();        
        new Thread(ins.new DataProducer()).start();        
        new Thread(ins.new DataConsumer()).start();    } }

watermark,size_14,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_20,type_ZmFuZ3poZW5naGVpdGk=

輸出
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
生產了一個數據,耗時1秒
Producer 這是一個收條。
Producer 這是一個收條。
Producer 這是一個收條。
Producer 這是一個收條。
Producer 這是一個收條。
Consumer: Thu Sep 12 17:21:39 CST 2013
Consumer: Thu Sep 12 17:21:40 CST 2013
Consumer: Thu Sep 12 17:21:41 CST 2013
Consumer: Thu Sep 12 17:21:42 CST 2013
Consumer: Thu Sep 12 17:21:43 CST 2013

https://mp.weixin.qq.com/s/JvORQc7fAS7AVIRsO8SdHA

相關文章
相關標籤/搜索