同步工具類能夠是任何一個對象。阻塞隊列能夠做爲同步工具類,其餘類型的同步工具類還包括信號量(Semaphore)、柵欄(Barrier)、以及閉鎖(Latch)。java
全部的同步工具類都包含一些特定的結構化屬性:它們封裝了一些狀態,這些狀態將決定執行同步工具類的線程是繼續執行仍是等待,此外還提供了一些方法對狀態進行操做,以及另外一些方法用於高效地等待同步工具類進入到預期狀態。算法
閉鎖是一種同步工具類,能夠延遲線程進度直到其到達終止狀態。閉鎖的做用至關於一扇門:在閉鎖到達結束狀態以前,這扇門一直是關閉的,而且沒有任何線程能經過,當到達結束狀態時容許全部的線程經過。當閉鎖到達結束狀態後,將不會再改變狀態,所以這扇門將永遠打開。閉鎖能夠用來確保某些活動直到其餘活動都完成才繼續執行。安全
CountDownLatch是一種靈活的閉鎖實現,它可使一個或多個線程等待一組線程。閉鎖狀態包括一個計數器,該計數器被初始化爲一個正數,表示須要等待的事件數量。countDown遞減計數器,表示一個事件已經發生,而await方法等待計數器達到零,這表示全部須要等待的事件都已經發生。若是計數器的值非零,那麼await會一直阻塞直到計數器爲零,或者等待中的線程中斷,或者等待超時。併發
查看源碼發現:咱們傳進去的參數至關於內部Sync的狀態,每次調用countDown的時候將狀態值減一,狀態值爲0表示結束狀態(await會解除阻塞)ide
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void countDown() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
查看sync的源碼:函數
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } ... }
例如:實現一個統計多個線程併發執行任務的用時功能:工具
當線程執行run中代碼的時候會阻塞到startLatch.await(); 直到主線程調用startLatch.countDown(); 將計數器減一。這時全部線程開始執行任務。測試
當線程執行完的時候endLatch.countDown();將結束必鎖的計數器減一,此時主線程阻塞在endLatch.await();,直到5個線程都執行完主線程也解除阻塞。ui
package cn.qlq.thread.tone; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo4 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class); public static void main(String[] args) throws InterruptedException { final CountDownLatch startLatch = new CountDownLatch(1); final CountDownLatch endLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { Thread.sleep(1 * 1000); new Thread(new Runnable() { @Override public void run() { try { startLatch.await();// 起始閉鎖的計數器阻塞等到計數器減到零(標記第一個線程開始執行) Thread.sleep(1 * 1000); endLatch.countDown(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } // 實現計時 long startTime = System.nanoTime(); startLatch.countDown();// 將起始閉鎖的計數器減一 endLatch.await();// 結束閉鎖阻塞直到計數器爲零 long endTime = System.nanoTime(); LOGGER.error("結束,用時{}", endTime - startTime); } }
FutureTask也能夠用作閉鎖。(Futuretask實現了Future的語義,表示一種抽象的可生成計算結果的計算)。FutureTask的計算是經過Callable實現的,至關於一種可生產運算結果的Runnable,而且能夠處於如下三種狀態:等待運行、正在運行和運行完成。執行完成表示計算的全部可能結束方式,包括正常結束、異常取消和運行完成。當FutureTask進入完成狀態後,它會永遠中止在這個狀態。this
Future.get取決於任務的狀態。若是任務已經完成,那麼get會當即返回結果;不然get將阻塞直到任務進入完成狀態,而後返回結果或者拋出異常。FutureTask將計算結果從執行計算的線程傳遞到獲取這個結果的線程,而FutureTask的規範確保了這種傳遞過程能實現結果的安全發佈。
package threadTest; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 實現callable接口,實現Callable接口 * * */ public class MyCallable implements Callable<String> { /** * 實現call方法,接口中拋出異常。由於子類不能夠比父類幹更多的壞事,因此子類能夠不拋出異常 */ @Override public String call() { System.out.println(Thread.currentThread().getName() + " 執行callable的call方法"); return "result"; } public static void main(String[] args) { test1(); } /** * 單個線程 */ public static void test1() { // 1.建立固定大小的線程池 ExecutorService es = Executors.newFixedThreadPool(1); // 2.提交線程任務,用Future接口接受返回的實現類 Future<String> future = es.submit(new MyCallable()); // 3.關閉線程池 es.shutdown(); // 4.調用future.get()獲取callable執行完成的返回結果 String result; try { result = future.get(); System.out.println(Thread.currentThread().getName() + "\t" + result); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
結果:
pool-1-thread-1 執行callable的call方法
main result
查看源碼:
public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } ...}
計數信號量(counting Semaphore)用來控制同時訪問某個資源的數量,或者同時執行某個操做的數量。計數信號量還能夠實現某種資源池,或者對容器實施邊界。
信號量是1個的Semaphore意味着只能被1個線程佔用,能夠用來設計同步(至關於互斥鎖)。信號量大於1的Semaphore能夠用來設計控制併發數,或者設計有界容器。
Semaphore中管理着一組虛擬的許可(permit),許可的初始數量可由構造函數指定。在執行操做時首先得到許可(只要還有剩餘的許可),並在使用後釋放。若是沒有許可,那麼acquire將會一直阻塞直到有許可(或者直到中斷或者操做超時)。release方法將返回一個許可給信號量。計算信號量的一種簡化形式是二值信號量,即初始值爲1的Semaphore。二值信號量能夠用做互斥體(mutex),並具有不可重入的加鎖語義:誰擁有了這個惟一的許可誰就擁有了互斥鎖。
例如:例如信號量構造一個有界阻塞容器:
信號量的計數值初始化爲容器的最大值。add操做在向底層容器添加一個元素以前,首先要獲取一個許可。若是add沒有添加任何元素,那麼會馬上釋放信號量。一樣,remove操做釋放一個許可,使更多的元素能加到容器中。
class BoundedHashSet<T> { private Set<T> set; private Semaphore semaphore; public BoundedHashSet(int bound) { set = Collections.synchronizedSet(new HashSet()); semaphore = new Semaphore(bound); } public boolean add(T o) throws InterruptedException { semaphore.acquire();// 嘗試獲取信號量 boolean wasAdded = false; try { wasAdded = set.add(o); return wasAdded; } finally { if (!wasAdded) {// 若是添加失敗就釋放信號量,添加成功就佔用一個信號量 semaphore.release(); } } } public boolean remove(T o) throws InterruptedException { boolean remove = set.remove(o); if (remove)// 若是刪除成功以後就釋放一個信號量 semaphore.release(); return remove; } }
測試代碼:
BoundedHashSet<String> boundedHashSet = new BoundedHashSet<String>(3); System.out.println(boundedHashSet.add("1")); System.out.println(boundedHashSet.add("2")); System.out.println(boundedHashSet.add("2")); System.out.println(boundedHashSet.add("3")); System.out.println(boundedHashSet.add("4"));// 將會一直阻塞到這裏 System.out.println("=========");
結果:(JVM不會關閉)
注意:
1.Semaphore能夠指定公平鎖仍是非公平鎖,默認是非公平鎖
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
2.acquire方法和release方法是能夠有參數的,表示獲取/返還的信號量個數
public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
柵欄(Barrier)相似於閉鎖(一種同步工具,能夠延遲線程直到其達到其終止狀態),它能阻塞一組線程直到某個事件發生。柵欄與閉鎖的區別在於全部線程必須同時到達柵欄位置,才能繼續執行。閉鎖等於等待事件,而柵欄用於等待其餘線程。柵欄能夠用於實現一些協議,例如幾個家庭成員決定在某個地方集合:"全部人6:00到達目的地,而後討論下一步的事情"。
CyclicBarrier可使必定數量的參與方反覆地在柵欄位置聚集,它在並行迭代算法中很是有用:這種算法一般將一個問題劃分紅一系列相互獨立的子問題。當線程到達柵欄位置時將調用await方法,這個方法將阻塞到全部線程到達柵欄位置。若是全部線程都到達柵欄,那麼柵欄將打開全部線程被釋放,而柵欄將被重置以便下次使用。若是對await的調用超時,或者await阻塞的線程被中斷,那麼柵欄就被認爲是打破了,全部阻塞的await調用都將終止並拋出BrokenBarrierException。若是成功的經過柵欄,那麼await將爲每一個線程返回一個惟一的到達索引號,咱們能夠用這些索引號"選舉"產生一個領導線程,並在下一次迭代中由該領導線程執行一些特殊的工做。CyclicBarrier還可使你將一個柵欄操做傳遞給構造函數,這是一個Runnable,當成功的經過柵欄時會(在一個子線程)執行它,但在阻塞過程被釋放以前是不能執行的。
CyclicBarrier的構造方法能夠傳入參與的數量(也就是被柵欄攔截的線程的數量),也能夠傳入一個Runnable對象。
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
例如:
package cn.qlq.thread.tone; import java.util.concurrent.CyclicBarrier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo2 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class); public static void main(String[] args) throws InterruptedException { final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); for (int i = 0; i < 4; i++) { Thread.sleep(2 * 1000); new Thread(new Runnable() { @Override public void run() { LOGGER.info("threadName -> {}", Thread.currentThread().getName()); try { cyclicBarrier.await(); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("threadName -> {}", Thread.currentThread().getName()); } }).start(); } } }
結果:
18:08:00 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:04 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
00的時候0線程到達柵欄進入阻塞,02的時候1線程到達柵欄,因爲柵欄的參與者是2因此此時至關於全部線程到達柵欄,柵欄放開,而後柵欄被重置。
04的時候2線程到達柵欄進入阻塞,06的時候3線程到達柵欄,因爲柵欄的參與者是2因此此時至關於全部參與者線程到達柵欄,而後柵欄放開。
咱們將柵欄的參與者改成5查看結果:
final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
結果:4個線程會阻塞到await方法處,並且JVM不會關閉,由於柵欄的參與者不夠5個因此被一直阻塞。
Exchanger至關於一個兩方(Two-party)柵欄,各方在柵欄位置上交換數據。當兩方執行不對稱的操做時,Exchanger很是有用。例如:一個線程向緩衝區寫東西,另外一個線程從緩衝區讀數據。Exchanger至關於參與者只有兩個的CyclicBarrier。
兩個線程會阻塞在exchanger.exchange方法上,泛型能夠指定其交換的數據類型。
例如:兩個線程交換本身的線程名稱
package cn.qlq.thread.tone; import java.util.concurrent.Exchanger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * * @author Administrator * */ public class Demo3 { private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class); public static void main(String[] args) throws InterruptedException { final Exchanger<String> exchanger = new Exchanger<String>();// 泛型指定交換的數據 for (int i = 0; i < 4; i++) { Thread.sleep(2 * 1000); new Thread(new Runnable() { @Override public void run() { LOGGER.info("threadName -> {}", Thread.currentThread().getName()); try { String exchange = exchanger.exchange(Thread.currentThread().getName()); LOGGER.error("threadName -> {},exchange->{}", Thread.currentThread().getName(), exchange); } catch (Exception e) { e.printStackTrace(); } LOGGER.info("threadName -> {}", Thread.currentThread().getName()); } }).start(); } } }
結果:
18:28:33 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-018:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-1,exchange->Thread-018:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-0,exchange->Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-118:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-018:28:37 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-218:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-3,exchange->Thread-218:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-2,exchange->Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-318:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2