在jdk中,爲併發編程提供了CyclicBarrier(柵欄),CountDownLatch(閉鎖),Semaphore(信號量),Exchanger(數據交換)等工具類,咱們在前面的學習中已經學習並分析了CountDownLatch工具類的使用方式和源碼實現,接下來咱們繼續學習CyclicBarrier,Semaphore,Exchanger的源碼實現。java
在實際的併發中,咱們可能須要等待全部線程到達一個數量點以後,纔會去繼續執行線程,這個點就是柵欄,好比開會的場景,咱們應該要作的是應該等到全部與會人員都到齊以後,咱們纔開始開會,使用CyclicBarrier就能夠很容易的實現這麼一個場景,示例源碼:node
package com.company.thread.t4; import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; public class CyclicBarrierDemo { public void meeting(CyclicBarrier cyclicBarrier) { Random random = new Random(); try { Thread.sleep(random.nextInt(4000)); } catch (InterruptedException e) { e.printStackTrace(); } // if (Thread.currentThread().getName().equals("Thread-1")) { // throw new RuntimeException("異常了"); // } System.out.println(Thread.currentThread().getName() + "進入會議室"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "開始彙報...."); } public static void main(String[] args) { final CyclicBarrierDemo cyclicBarrierDemo = new CyclicBarrierDemo(); final CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + "開始開會..."); } }); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { cyclicBarrierDemo.meeting(cyclicBarrier); } }).start(); } } }
meeting()方法傳入CyclicBarrier對象,調用await()方法等待,直到達到初始化CyclicBarrier對象是傳入的值時纔會喚醒全部線程併發執行剩下的任務。算法
main方法中構建等待的線程數爲10個,並傳入一個任務,該任務由最後進入的線程來執行,將cyclicBarrier傳入到meetting中,此時執行結果爲:編程
Thread-9進入會議室 Thread-6進入會議室 Thread-5進入會議室 Thread-4進入會議室 Thread-3進入會議室 Thread-1進入會議室 Thread-8進入會議室 Thread-7進入會議室 Thread-2進入會議室 Thread-0進入會議室 Thread-0開始開會... Thread-0開始彙報.... Thread-9開始彙報.... Thread-6開始彙報.... Thread-4開始彙報.... Thread-5開始彙報.... Thread-3開始彙報.... Thread-1開始彙報.... Thread-7開始彙報.... Thread-2開始彙報.... Thread-8開始彙報....
執行結果顯示,只有當是個線程都進入等待時,纔會執行咱們在構造CyclicBarrier對象時傳入的任務,此任務就至關因而一個柵欄點,達到線程等待數量後,喚醒全部線程去繼續執行,CyclicBarrier的使用方式就如上代碼因此,須要注意的是,若是執行任務期間有一個線程出現異常,致使沒法達到柵欄的點,那麼全部線程都加會一直阻塞。併發
首先看CyclicBarrier的構造方法:app
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
構造函數就是執行了一個賦值的操做,將咱們須要等待的線程個數進行初始化,而後看核心的方法await(),dom
await():ide
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
直接放回了dowait()方法,繼續進入dowait()方法,函數
dowait():工具
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
它是CyclicBarrier的核心方法,咱們來一步步分析:
第一步:首先獲取可重入鎖,判斷線程是否被中斷,若是被中斷,調用beakBarrier()方法,generation.broken = true,還原count爲初始值,並喚醒全部等待的線程。並拋出中斷異常
beakBarrier():
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
不然將count減減,count減到最後一個線程進入時。使用ranAction判斷是否執行,判斷初始化時是否傳入任務,傳入則執行該任務的run方法,這裏就是咱們前面說得柵欄任務由最後一個進入的線程執行的實現,將ranAction置爲true,nextGeneration()調用喚醒全部線程,並將全部參數重置爲初始化狀態。表示這一組任務已經等待完成並開始執行,
nextGeneration()方法:
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); }
當index不等於0,表示不是最後一個線程進入時,自旋使線程等待,當g.broken = true時,表示線程出現過錯誤,執行breakBarrier();方法,不然就中斷線程,接着就是校驗線程是否正確執行和是否超時等的一些校驗,dowait()方法源碼說完了。
主要的就是初始化一個狀態,每次調用自減,跟直到狀態爲0時,喚醒全部的線程,這點於CountDownLatch()相似,可是CyclicBarrier是可重用的,當到達柵欄的全部線程被喚醒以後,它會將CyclicBarrier實例還原成初始狀態,下一次能夠繼續使用。
信號量指當前空閒有三個資源,可是有一個欄杆同樣的東西在攔截線程,當前面三個線程進來獲取資源時,由於有資源因此容許三個線程進入,當資源被前面三個線程佔用時,其餘的線程就只能等待着正在使用資源的線程釋放資源,欄杆纔會隨機的放一個線程進入,這樣就達到了控制併發線程數量的一個目的;
直接看代碼:
package com.company.thread.t4; import java.util.concurrent.Semaphore; public class SemaphoreDemo { public void method (Semaphore semaphore) { try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "is running "); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } semaphore.release(); } public static void main(String[] args) { final SemaphoreDemo semaphoreDemo = new SemaphoreDemo(); final Semaphore semaphore = new Semaphore(10); for (int i = 0; i < 20; i++){ new Thread(new Runnable() { @Override public void run() { semaphoreDemo.method(semaphore); } }).start(); } } }
method()方法中調用 semaphore.acquire();獲取資源,當能獲取到資源後繼續執行,執行完成後調用 semaphore.release()方法釋放資源,main方法中能夠建立不少個線程,可是咱們經過觀察輸出能夠看到,每次都只能有是個線程在執行,這就是在初始化的時候控制了線程併發執行的數量只能是是個,全部多餘的將會等待前面執行的線程釋放資源,而後拿到資源,使用都是很是簡單的,看示例源碼來理解徹底沒有問題,接下來咱們看看信號量的實現原理
首先仍是看構造方法:
public Semaphore(int permits) { sync = new NonfairSync(permits); }
能夠看到它是實例化非公平內部同步器,並將信號量個數初始化,這裏就再也不深刻,主要的方法是acquire()/release()方法,
acquire()方法:
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
這裏調用的AQS的共享模式的acquireSharedInterruptibly(1)對線程獲取資源,因爲AQS的模式主要是看本身實現的tryAcquireShared()方法,因此咱們主要看tryAcquireShared()源碼:
protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
返回nonfairTryAcquireShared()的執行結果,
nonfairTryAcquireShared()方法是操做AQS狀態。
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
返回資源數量,咱們知道在AQS共享模式下,若是返回大於0時,它就能夠去等待隊列中喚醒線程,這裏是自旋直到返回狀態減acquires後的能拿到資源的狀態,便是自旋一直等待其餘線程釋放資源,此時當前線程就能夠拿到資源執行。
release()方法:
public void release() { sync.releaseShared(1); }
也是使用AQS的releaseShared()方法,因此咱們這裏依然只看本身實現的tryReleaseShared()方法:
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
它的實現就是,當釋放資源時,不斷修改AQS的stats增長releases資源,這樣當釋放成功後,等待中的線程就有機會獲取線程去執行了。
理解了AQS的狀況下,CyclicBarrier和信號量的實現其實也就不難理解了,都是繼續AQS提供的模式重寫tryXX方法來實現本身想要的功能。
Exchanger是能夠線程之間通訊的,它可使數據在兩個線程直接交換,也就是A線程能夠獲取到B線程中的某些數據,B線程也可能能夠獲取到B線程的數據。
直接上代碼:
package com.company.thread.t5; import java.util.concurrent.Exchanger; public class Demo { public void a (Exchanger<String> exchanger) { System.out.println("a 方法執行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } String res = "12345"; try { String value = exchanger.exchange(res); } catch (InterruptedException e) { e.printStackTrace(); } } public void b(Exchanger<String> exchanger) { System.out.println("b 方法開始執行"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } String res = "123456"; try { String value = exchanger.exchange(res); System.out.println("開始進行比對"); System.out.println("比對結果爲:" + value.equals(res)); } catch (InterruptedException e) { e.printStackTrace(); } } }
Exchanger是一個泛型類,他管理的是傳入的類型的變量,示例中使用一個Exchanger實例,在a方法中調用exchange()將須要交換的數據出入,b也同樣,建立兩個線程,分別調用a,b方法,傳入同一個Exchanger實例,這樣當兩個線程執行到一個零界點時,就會將兩個線程中的傳入的數據交換的兩外的線程中,這樣就能夠在另外的方法或線程中拿到其餘線程的某些咱們須要的數據。jdk文檔中介紹,它最經常使用的就是緩衝這一塊,有興趣能夠去了解一下。Exchanger的使用也是很是簡單的。
exchange()核心方法:
public V exchange(V x) throws InterruptedException { if (!Thread.interrupted()) { Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0); if (v == NULL_ITEM) return null; if (v != CANCEL) return (V)v; Thread.interrupted(); // Clear interrupt status on IE throw } throw new InterruptedException(); }
簡單明瞭的看到,它調用了doExchange()方法去執行了具體的改變操做,因此進入doExchange():
private Object doExchange(Object item, boolean timed, long nanos) { Node me = new Node(item); // Create in case occupying int index = hashIndex(); // Index of current slot int fails = 0; // Number of CAS failures for (;;) { Object y; // Contents of current slot Slot slot = arena[index]; if (slot == null) // Lazily initialize slots createSlot(index); // Continue loop to reread else if ((y = slot.get()) != null && // Try to fulfill slot.compareAndSet(y, null)) { Node you = (Node)y; // Transfer item if (you.compareAndSet(null, item)) { LockSupport.unpark(you.waiter); return you.item; } // Else cancelled; continue } else if (y == null && // Try to occupy slot.compareAndSet(null, me)) { if (index == 0) // Blocking wait for slot 0 return timed ? awaitNanos(me, slot, nanos) : await(me, slot); Object v = spinWait(me, slot); // Spin wait for non-0 if (v != CANCEL) return v; me = new Node(item); // Throw away cancelled node int m = max.get(); if (m > (index >>>= 1)) // Decrease index max.compareAndSet(m, m - 1); // Maybe shrink table } else if (++fails > 1) { // Allow 2 fails on 1st slot int m = max.get(); if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) index = m + 1; // Grow on 3rd failed slot else if (--index < 0) index = m; // Circularly traverse } } }
它的核心算法是for(;;)中的內容,當slot爲空時,建立一個Slot,不爲空,則獲取slot的value不爲空,則表示已經有線程進來了,使用cas將當前線程的value與前一個進入的value進行交換,成功後喚醒前面進入以後等待的線程。當前y爲空,則表示當前線程是第一個進來的線程,設置建立一個節點,將當前須要保存的value設置到slot中,以node的形式保存,當被喚醒時,則執行await()方法返回修改後的值,由於在上面代碼中第二個進來的線程已經交換了兩個須要交換的數據,因此當等待線程被叫醒後返回的被後一個線程交換後的數據。
Java的併發工具類,到這裏就算是學習了,理解不是很深入,望各位各位多多指點,共同進步,謝謝!