Java併發編程學習四:CountDownLatch,CyclicBarrier,Semaphore以及原子類

上篇文章線程同步的關鍵字以及理解中介紹了一下多線程同步協做之間常用的關鍵字,今天這篇文章就介紹一下一些同步類以及原子類的使用吧。Java中提供了很多的同步類,如:CountDownLatch,CyclicBarrier,Semaphore等,下面就對每一個類作個學習的記錄。html


CountDownLatch

CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行。其擁有的方法以下:java

public CountDownLatch(int count);  //參數count爲計數值
public void await() throws InterruptedException;   //調用await()方法的線程會被掛起,它會等待直到count值爲0才繼續執行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;  //等待必定的時間後count值還沒變爲0的話就會繼續執行
public void countDown();  //將count減1

CountDownLatch比較簡單,這裏寫個Demo就應該都懂了:算法

val countDownLatch = CountDownLatch(3)

fun main(args: Array<String>) {
    val thread = Thread(Runnable {
        Thread.sleep(3000)
        countDownLatch.countDown()
        System.out.println(ThreadTest.timeStamp2Date() + " 線程一執行完畢")
    })
    val thread1 = Thread(Runnable {
        Thread.sleep(1000)
        countDownLatch.countDown()
        System.out.println(ThreadTest.timeStamp2Date() + " 線程二執行完畢")
    })
    val thread2 = Thread(Runnable {
        Thread.sleep(2000)
        countDownLatch.countDown()
        System.out.println(ThreadTest.timeStamp2Date() + " 線程三執行完畢")
    })
    thread2.start()
    thread.start()
    thread1.start()
    countDownLatch.await()
    System.out.print(ThreadTest.timeStamp2Date() + " 主線程繼續執行")

}
//--------------------------------------
2018-11-20 14:09:37 線程二執行完畢
2018-11-20 14:09:38 線程三執行完畢
2018-11-20 14:09:39 線程一執行完畢
2018-11-20 14:09:39 主線程繼續執行

因爲CountDownLatch的存在,而且計數值爲3,因此主線程須要等到CountDownLatch內部計數值減爲0了再執行。segmentfault


CyclicBarrier

CyclicBarrier能夠稱做同步屏障,其意義是讓一組線程達到一個屏障時被阻塞,直到最後一個線程達到屏障時,全部被阻塞的線程才能繼續執行。先看下他的主要方法:api

public CyclicBarrier(int parties);//計數值
    public CyclicBarrier(int parties, Runnable barrierAction);//barrierAction表示當最後線程都達到屏障後會執行的Runnable
	public int await();//阻塞直到最後一個線程遇到屏障
	public int await(long timeout, TimeUnit unit);//阻塞直到最後一個線程遇到屏障或者超出timeout時間

首先測試一下第一種構造方法的運行代碼:數組

val barrier = CyclicBarrier(4)

fun main(args: Array<String>) {
    val thread1 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread1代碼")
        Thread.sleep(1000)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread1代碼")
    })
    val thread2 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread2代碼")
        Thread.sleep(3000)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread2代碼")
    })
    val thread3 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread3代碼")
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread3代碼")
    })
    val thread4 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread4代碼")
        Thread.sleep(800)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread4代碼")
    })
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
}
//--------------------------------------
2018-11-20 14:32:18 執行thread3代碼
2018-11-20 14:32:18 執行thread1代碼
2018-11-20 14:32:18 執行thread4代碼
2018-11-20 14:32:18 執行thread2代碼
2018-11-20 14:32:21 屏障事後執行thread2代碼
2018-11-20 14:32:21 屏障事後執行thread4代碼
2018-11-20 14:32:21 屏障事後執行thread1代碼
2018-11-20 14:32:21 屏障事後執行thread3代碼

能夠看到在thread2中調用到barrier.await()後,屏障消失,全部線程開始繼續執行。接着咱們再看下第二種構造方法,傳入的barrierAction會從某個線程中隨機選取一個來執行代碼:安全

val barrier = CyclicBarrier(4, Runnable {
    System.out.println("當前線程=" + Thread.currentThread().name)
})

fun main(args: Array<String>) {
    val thread1 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread1代碼"+" name="+Thread.currentThread().name)
        Thread.sleep(1000)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread1代碼"+" name="+Thread.currentThread().name)
    })
    val thread2 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread2代碼"+" name="+Thread.currentThread().name)
        Thread.sleep(3000)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread2代碼"+" name="+Thread.currentThread().name)
    })
    val thread3 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread3代碼"+" name="+Thread.currentThread().name)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread3代碼"+" name="+Thread.currentThread().name)
    })
    val thread4 = Thread(Runnable {
        System.out.println(ThreadTest.timeStamp2Date() + " 執行thread4代碼"+" name="+Thread.currentThread().name)
        Thread.sleep(800)
        barrier.await()
        System.out.println(ThreadTest.timeStamp2Date() + " 屏障事後執行thread4代碼"+" name="+Thread.currentThread().name)
    })
    thread1.start()
    thread2.start()
    thread3.start()
    thread4.start()
}
//-----------------------------------------------
2018-11-20 14:42:40 執行thread3代碼 name=Thread-2
2018-11-20 14:42:40 執行thread2代碼 name=Thread-1
2018-11-20 14:42:40 執行thread1代碼 name=Thread-0
2018-11-20 14:42:40 執行thread4代碼 name=Thread-3
當前線程=Thread-3
2018-11-20 14:42:40 屏障事後執行thread4代碼 name=Thread-3
2018-11-20 14:42:40 屏障事後執行thread3代碼 name=Thread-2
2018-11-20 14:42:40 屏障事後執行thread2代碼 name=Thread-1
2018-11-20 14:42:40 屏障事後執行thread1代碼 name=Thread-0

Semaphore

Semaphore即信號量的意思,經過Semaphore能夠控同時訪問的線程個數,經過acquire()獲取一個許可,若是沒有就等待,而release()釋放一個許可。Semaphore使用的情景是資源有限,而線程消費又多的情景。多線程

舉個生活例子就是醫院有5名醫生(資源),而看病的有20我的(Thread)。併發

Semaphore主要方法以下:dom

public Semaphore(int permits);//參數爲許可數量
 public Semaphore(int permits, boolean fair);//fair表示是不是公平的,即等待時間越久的越先獲取許可
 public void acquire() throws InterruptedException {  }     //獲取一個許可
 public void acquire(int permits) throws InterruptedException { }    //獲取permits個許可
 public void release() { }          //釋放一個許可
 public void release(int permits) { }    //釋放permits個許可
 public boolean tryAcquire() { };    //嘗試獲取一個許可,若獲取成功,則當即返回true,若獲取失敗,則當即返回false
 public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  //嘗試獲取一個許可,若在指定的時間內獲取成功,則當即返回true,不然則當即返回false
 public boolean tryAcquire(int permits) { }; //嘗試獲取permits個許可,若獲取成功,則當即返回true,若獲取失敗,則當即返回false
 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; //嘗試獲取permits個許可,若在指定的時間內獲取成功,則當即返回true,不然則當即返回false

能夠看到Semaphore的方法跟ReetrantLock的方法很像,其做用也是類似的,也容易理解,下面就簡單寫個例子測試一下部分api,有興趣的同窗能夠自行測試其他的。

val semaphore = Semaphore(3)

fun main(args: Array<String>) {

    var count = 0
    val random = Random()
    while (count < 20) {
        count++
        val thread = Thread(Runnable {
            semaphore.acquire()
            System.out.println(Thread.currentThread().name + " 獲取許可,正在看病中")
            synchronized(random) {
                Thread.sleep((random.nextInt(2000) + 1000).toLong())
            }
            System.out.println(Thread.currentThread().name + " 看病完畢,釋放許可")
            semaphore.release()
        })
        thread.start()
    }
}

//-------------------------------------------------------
Thread-0 獲取許可,正在看病中
Thread-1 獲取許可,正在看病中
Thread-2 獲取許可,正在看病中
Thread-0 看病完畢,釋放許可
Thread-3 獲取許可,正在看病中
Thread-2 看病完畢,釋放許可
Thread-4 獲取許可,正在看病中
Thread-1 看病完畢,釋放許可
Thread-5 獲取許可,正在看病中
Thread-4 看病完畢,釋放許可
Thread-6 獲取許可,正在看病中
Thread-3 看病完畢,釋放許可
Thread-7 獲取許可,正在看病中
Thread-6 看病完畢,釋放許可
Thread-8 獲取許可,正在看病中
Thread-5 看病完畢,釋放許可
Thread-9 獲取許可,正在看病中
Thread-8 看病完畢,釋放許可
Thread-10 獲取許可,正在看病中
Thread-7 看病完畢,釋放許可
Thread-11 獲取許可,正在看病中
Thread-10 看病完畢,釋放許可
Thread-12 獲取許可,正在看病中
Thread-9 看病完畢,釋放許可
Thread-13 獲取許可,正在看病中
Thread-12 看病完畢,釋放許可
Thread-14 獲取許可,正在看病中
Thread-11 看病完畢,釋放許可
Thread-15 獲取許可,正在看病中
Thread-14 看病完畢,釋放許可
Thread-16 獲取許可,正在看病中
Thread-13 看病完畢,釋放許可
Thread-17 獲取許可,正在看病中
Thread-16 看病完畢,釋放許可
Thread-18 獲取許可,正在看病中
Thread-15 看病完畢,釋放許可
Thread-19 獲取許可,正在看病中
Thread-18 看病完畢,釋放許可
Thread-17 看病完畢,釋放許可
Thread-19 看病完畢,釋放許可

Ok,關於同步類的介紹就講那麼多,除了這些Java中還提供了更多其餘的同步類,像Phaser這個類,這裏就不介紹了,詳細的可仔細查閱博客,也能夠看我推薦的這兩篇博客:

http://www.javashuo.com/article/p-gholfgmo-de.html

http://www.javashuo.com/article/p-ogizybjv-ep.html


原子類

原子類具備原子性,也就是意味着在執行的過程當中是不能被打斷的,那麼更新原子類的值在多線程環境下就是安全的。Java中提供了許多的原子類:

  1. 基本類型原子類:AtomicBoolean,AtomicInteger,AtomicLong
  2. 數組類型原子類:AtomicLongArray,AtomicIntegerArray,AtomicReferenceArray
  3. 引用類型原子類:AtomicReference,AtomicReferenceUpdater,AtomicMarkReference

基本類型的原子類分別對應基本類型的boolean,int以及long類型,使用方式基本同樣,在Java線程的帶來的問題與內存模型(JMM)說過基本變量的i++其實並非一個原子操做,那麼經過使用AtomicInteger咱們就能保證原子操做了,主要方法以下:

public final int get();//獲取當前值
  public final void set(int newValue);//設置一個新值
  public final int getAndDecrement();//先獲取當前值,而後執行減一
   public final int getAndIncrement();//獲取當前值,而後加一
   ...

AtomicInteger的api很是容易理解,還有一些別的api可自行查閱,接下來就是測試的事了,下面是使用的Demo:

val atomicInteger = AtomicInteger(0)
var value = 0;
fun main(args: Array<String>) {
    var count = 0
    while (count < 100) {
        count++
        val thread = Thread(Runnable {
            atomicInteger.incrementAndGet()
            atomicInteger.incrementAndGet()
            atomicInteger.incrementAndGet()
            atomicInteger.incrementAndGet()
            atomicInteger.incrementAndGet()
            atomicInteger.incrementAndGet()
        })
        thread.start()
    }
    count = 0
    Thread.sleep(3000)
    System.out.println("原子類型值=$atomicInteger")
    while (count < 100) {
        count++
        val thread = Thread(Runnable {
            value++
            value++
            value++
            value++
            value++
            value++
        })
        thread.start()
    }
    Thread.sleep(3000)
    System.out.println("基本類型值=$value")
}
//輸出結果
原子類型值=600
基本類型值=561

數組類型原子類可以在多線程環境下正確的更新對應數組中的值,其主要的方法以下:

public final int length();//返回當前數組長度
 public final void set(int i, int newValue);//設置對應索引的值
 public final int getAndSet(int i, int newValue);//先獲取值,而後設置新值
 public final int getAndIncrement(int i);//獲取對應索引當前值,而後加一
 ...

主要是的方法就這些,下面看下測試效果:

private int[] values = {1, 2, 0};
   private AtomicIntegerArray mIntegerArray = new AtomicIntegerArray(values);

   public void testArray() {
       mIntegerArray.getAndIncrement(2);
       mIntegerArray.getAndIncrement(1);
       mIntegerArray.getAndIncrement(0);
   }

   public String getFinalValues() {
       return mIntegerArray.get(0) + "  " + mIntegerArray.get(1) + "  " +  mIntegerArray.get(2);
   }
	//-------
	fun main(args: Array<String>) {
		var count = 0
		val threadTest = ThreadTest()
		while (count < 100) {
			count++
			val thread = Thread(Runnable {
				threadTest.testArray()
			})
			thread.start()
		}
		Thread.sleep(3000)
		System.out.println(threadTest.finalValues)
	}
//測試結果
101  102  100

引用類型原子類提供了一種讀和寫都是原子性的對象引用變量,在大多數狀況下,咱們須要處理的狀態變量不止一個,那麼咱們能夠把這些變量臨時放在一個原子的引用裏面。以AtomicReference爲例,其中提供的主要方法有:

public final V getAndSet(V newValue);//獲取值再設置值
public final void set(V newValue);//設置值
public final V get() ;//獲取值
public final boolean compareAndSet(V expect, V update);//比較設置,設置成功返回true,不然返回false,這裏使用的是CAS
...

簡單例子以下:

public static AtomicReference<User> atomicUserRef = new AtomicReference<>();


   public static void main(String[] args) {

       User user = new User("conan", 15);

       atomicUserRef.set(user);

       User updateUser = new User("Shinichi", 17);

       boolean flag = atomicUserRef.compareAndSet(user, user);

       System.out.println(atomicUserRef.get().getName() + "  " + flag);

       System.out.println(atomicUserRef.get().getOld());

   }


   static class User {

       private String name;

       private int old;


       public User(String name, int old) {

           this.name = name;

           this.old = old;

       }


       public String getName() {

           return name;

       }


       public int getOld() {

           return old;

       }

   }

相比於使用鎖機制來處理併發的問題,使用原子類可以在代碼的可伸縮性以及活躍性上擁有很是的優點,原因就是原子類由非阻塞算法實現,而鎖機制由阻塞算法實現,非阻塞算法可使得多個線程在競爭相同的數據的時候不會發生阻塞的狀況,於是能在粒度更細的層次上進行協調,而且極大的減小調度的開銷。原子類的內部實現使用了CAS的操做,關於CAS的介紹,放在下一章進行學習總結,這裏就不介紹了。


參考資料

相關文章
相關標籤/搜索