j360-jdk調試功能html
https://github.com/xuminwlt/j360-jdkjava
如下內容部分選摘自互聯網及前人總結,若有問題請指正,我會及時更正,謝謝!git
- 繼承Thread
程序員
- 實現Runnable接口github
- 使用ExecutorService、Callable、Future實現有返回結果的多線程數據庫
- ExecutorService、Callable、Future這個對象實際上都是屬於Executor框架中的功能類
編程
public class MyThread extends Thread{ @Override public void run(){ System.out.println("MyThread.run()"); } } MyThread myThread = new MyThread(); myThread.start();
public class MyThreadRun implements Runnable{ @Override public void run() { System.out.println("MyThreadRun.run()"); } } MyThreadRun myThreadRun = new MyThreadRun(); Thread thread = new Thread(myThreadRun); thread.start();
Runnable target參數給Thread後,Thread的run()方法就會調用target.run(),參考JDK源代碼:
public void run() {
if (target != null) {
target.run();
}
}小程序
在JDK1.5以前,Java中要進行業務併發時,一般須要有程序員獨立完成代碼實現,固然也有一些開源的框架提供了這些功能,可是這些依然沒有JDK自帶的功能使用起來方便。而當針對高質量Java多線程併發程序設計時,爲防止死蹦等現象的出現,好比使用java以前的wait()、notify()和synchronized等,往往須要考慮性能、死鎖、公平性、資源管理以及如何避免線程安全性方面帶來的危害等諸多因素,每每會採用一些較爲複雜的安全策略,加劇了程序員的開發負擔.萬幸的是,在JDK1.5出現以後,Sun大神(Doug Lea)終於爲咱們這些可憐的小程序員推出了java.util.concurrent工具包以簡化併發完成。開發者們藉助於此,將有效的減小競爭條件(race conditions)和死鎖線程。concurrent包很好的解決了這些問題,爲咱們提供了更實用的併發程序模型。數組
package me.j360.jdk.thread; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created with j360-jdk -> me.j360.jdk.thread. * User: min_xu * Date: 2015/10/17 * Time: 12:22 * 說明: */ public class ExecutorTest { public static void main(String[] args){ System.out.println("----程序開始運行----"); Date date1 = new Date(); int taskSize = 5; // 建立一個線程池 ExecutorService pool = Executors.newFixedThreadPool(taskSize); // 建立多個有返回值的任務 List<Future> list = new ArrayList<Future>(); for (int i = 0; i < taskSize; i++) { Callable c = new MyCallable(i + " "); // 執行任務並獲取Future對象 Future f = pool.submit(c); //System.out.println(">>>" + f.get().toString()); list.add(f); } // 關閉線程池 pool.shutdown(); for (Future f : list) { System.out.println(">>>" + f.isDone()); } Date date2 = new Date(); System.out.println("----程序結束運行----,程序運行時間【" + (date2.getTime() - date1.getTime()) + "毫秒】"); } } class MyCallable implements Callable<Object> { private String taskNum; MyCallable(String taskNum) { this.taskNum = taskNum; } public Object call() throws Exception { System.out.println(">>>" + taskNum + "任務啓動"); Date dateTmp1 = new Date(); Thread.sleep(1000); Date dateTmp2 = new Date(); long time = dateTmp2.getTime() - dateTmp1.getTime(); System.out.println(">>>" + taskNum + "任務終止"); return taskNum + "任務返回運行結果,當前任務時間【" + time + "毫秒】"; } }
可返回值的任務必須實現Callable接口,相似的,無返回值的任務必須Runnable接口。執行Callable任務後,能夠獲取一個Future的對象,在該對象上調用get就能夠獲取到Callable任務返回的Object了,再結合線程池接口ExecutorService就能夠實現傳說中有返回結果的多線程了。緩存
上述代碼中Executors類,提供了一系列工廠方法用於創先線程池,返回的線程池都實現了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
建立固定數目線程的線程池。
public static ExecutorService newCachedThreadPool()
建立一個可緩存的線程池,調用execute 將重用之前構造的線程(若是線程可用)。若是現有線程沒有可用的,則建立一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newSingleThreadExecutor()
建立一個單線程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
建立一個支持定時及週期性的任務執行的線程池,多數狀況下可用來替代Timer類。
ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。若是Executor後臺線程池尚未完成Callable的計算,這調用返回Future對象的get()方法,會阻塞直到計算完成。
ExecutorService擴展了Executor並添加了一些生命週期管理的方法。一個Executor的生命週期有三種狀態,運行 ,關閉 ,終止。Executor建立時處於運行狀態。當調用ExecutorService.shutdown()後,處於關閉狀態,isShutdown()方法返回true。這時,不該該再想Executor中添加任務,全部已添加的任務執行完畢後,Executor處於終止狀態,isTerminated()返回true。
若是Executor處於關閉狀態,往Executor提交任務會拋出unchecked exception RejectedExecutionException。
ExecutorService executorService = (ExecutorService) executor; while (!executorService.isShutdown()) { try { executorService.execute(task); } catch (RejectedExecutionException ignored) { } } executorService.shutdown();
Future<V>表明一個異步執行的操做,經過get()方法能夠得到操做的結果,若是異步操做尚未完成,則,get()會使當前線程阻塞。FutureTask<V>實現了Future<V>和Runable<V>。Callable表明一個有返回值得操做。
Callable<Integer> func = new Callable<Integer>(){ public Integer call() throws Exception { System.out.println("inside callable"); Thread.sleep(1000); return new Integer(8); } }; FutureTask<Integer> futureTask = new FutureTask<Integer>(func); Thread newThread = new Thread(futureTask); newThread.start(); try { System.out.println("blocking here"); Integer result = futureTask.get(); System.out.println(result); } catch (InterruptedException ignored) { } catch (ExecutionException ignored) { }
咱們都知道,在JDK1.5以前,Java中要進行業務併發時,一般須要有程序員獨立完成代碼實現,固然也有一些開源的框架提供了這些功能,可是這些依然沒有JDK自帶的功能使用起來方便。而當針對高質量Java多線程併發程序設計時,爲防止死蹦等現象的出現,好比使用java以前的wait()、notify()和synchronized等,往往須要考慮性能、死鎖、公平性、資源管理以及如何避免線程安全性方面帶來的危害等諸多因素,每每會採用一些較爲複雜的安全策略,加劇了程序員的開發負擔.萬幸的是,在JDK1.5出現以後,Sun大神(Doug Lea)終於爲咱們這些可憐的小程序員推出了java.util.concurrent工具包以簡化併發完成。開發者們藉助於此,將有效的減小競爭條件(race conditions)和死鎖線程。concurrent包很好的解決了這些問題,爲咱們提供了更實用的併發程序模型。
Executor :具體Runnable任務的執行者。
ExecutorService :一個線程池管理者,其實現類有多種,我會介紹一部分。咱們能把Runnable,Callable提交到池中讓其調度。
Semaphore :一個計數信號量
ReentrantLock :一個可重入的互斥鎖定 Lock,功能相似synchronized,但要強大的多。
Future :是與Runnable,Callable進行交互的接口,好比一個線程執行結束後取返回的結果等等,還提供了cancel終止線程。
BlockingQueue :阻塞隊列。
CompletionService : ExecutorService的擴展,能夠得到線程執行結果的
CountDownLatch :一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。
CyclicBarrier :一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點
Future :Future 表示異步計算的結果。
ScheduledExecutorService :一個 ExecutorService,可安排在給定的延遲後運行或按期執行的命令。
接下來逐一介紹
Executors主要方法說明
newFixedThreadPool(固定大小線程池)
建立一個可重用固定線程集合的線程池,以共享的無界隊列方式來運行這些線程(只有要請求的過來,就會在一個隊列裏等待執行)。若是在關閉前的執行期間因爲失敗而致使任何線程終止,那麼一個新線程將代替它執行後續的任務(若是須要)。
newCachedThreadPool(無界線程池,能夠進行自動線程回收)
建立一個可根據須要建立新線程的線程池,可是在之前構造的線程可用時將重用它們。對於執行不少短時間異步任務的程序而言,這些線程池一般可提升程序性能。調用 execute 將重用之前構造的線程(若是線程可用)。若是現有線程沒有可用的,則建立一個新線程並添加到池中。終止並從緩存中移除那些已有 60 秒鐘未被使用的線程。所以,長時間保持空閒的線程池不會使用任何資源。注意,可使用 ThreadPoolExecutor 構造方法建立具備相似屬性但細節不一樣(例如超時參數)的線程池。
newSingleThreadExecutor(單個後臺線程)
建立一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,若是由於在關閉前的執行期間出現失敗而終止了此單個線程,那麼若是須要,一個新線程將代替它執行後續的任務)。可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的。與其餘等效的 newFixedThreadPool(1) 不一樣,可保證無需從新配置此方法所返回的執行程序便可使用其餘的線程。
這些方法返回的都是ExecutorService對象,這個對象能夠理解爲就是一個線程池。
這個線程池的功能仍是比較完善的。能夠提交任務submit()能夠結束線程池shutdown()。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MyExecutor extends Thread { private int index; public MyExecutor(int i){ this.index=i; } public void run(){ try{ System.out.println("["+this.index+"] start...."); Thread.sleep((int)(Math.random()*1000)); System.out.println("["+this.index+"] end."); } catch(Exception e){ e.printStackTrace(); } } public static void main(String args[]){ ExecutorService service=Executors.newFixedThreadPool(4); for(int i=0;i<10;i++){ service.execute(new MyExecutor(i)); //service.submit(new MyExecutor(i)); } System.out.println("submit finish"); service.shutdown(); } }
雖然打印了一些信息,可是看的不是很是清晰,這個線程池是如何工做的,咱們來將休眠的時間調長10倍。
Thread.sleep((int)(Math.random()*10000));
再來看,會清楚看到只能執行4個線程。當執行完一個線程後,纔會又執行一個新的線程,也就是說,咱們將全部的線程提交後,線程池會等待執行完最後shutdown。咱們也會發現,提交的線程被放到一個「無界隊列裏」。這是一個有序隊列(BlockingQueue,這個下面會說到)。
另外它使用了Executors的靜態函數生成一個固定的線程池,顧名思義,線程池的線程是不會釋放的,即便它是Idle。
這就會產生性能問題,好比若是線程池的大小爲200,當所有使用完畢後,全部的線程會繼續留在池中,相應的內存和線程切換(while(true)+sleep循環)都會增長。
若是要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。能夠像通用的線程池同樣設置「最大線程數」、「最小線程數」和「空閒線程keepAlive的時間」。
這個就是線程池基本用法。
Semaphore
一個計數信號量。從概念上講,信號量維護了一個許可集合。若有必要,在許可可用前會阻塞每個 acquire(),而後再獲取該許可。每一個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。可是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數,並採起相應的行動。
Semaphore 一般用於限制能夠訪問某些資源(物理或邏輯的)的線程數目。例如,下面的類使用信號量控制對內容池的訪問:
這裏是一個實際的狀況,你們排隊上廁所,廁所只有兩個位置,來了10我的須要排隊。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class MySemaphore extends Thread { Semaphore position; private int id; public MySemaphore(int i,Semaphore s){ this.id=i; this.position=s; } public void run(){ try{ if(position.availablePermits()>0){ System.out.println("顧客["+this.id+"]進入廁所,有空位"); } else{ System.out.println("顧客["+this.id+"]進入廁所,沒空位,排隊"); } position.acquire(); System.out.println("顧客["+this.id+"]得到坑位"); Thread.sleep((int)(Math.random()*1000)); System.out.println("顧客["+this.id+"]使用完畢"); position.release(); } catch(Exception e){ e.printStackTrace(); } } public static void main(String args[]){ ExecutorService list=Executors.newCachedThreadPool(); Semaphore position=new Semaphore(2); for(int i=0;i<10;i++){ list.submit(new MySemaphore(i+1,position)); } list.shutdown(); position.acquireUninterruptibly(2); System.out.println("使用完畢,須要清掃了"); position.release(2); } }
ReentrantLock
一個可重入的互斥鎖定 Lock,它具備與使用 synchronized 方法和語句所訪問的隱式監視器鎖定相同的一些基本行爲和語義,但功能更強大。
ReentrantLock 將由最近成功得到鎖定,而且尚未釋放該鎖定的線程所擁有。當鎖定沒有被另外一個線程所擁有時,調用 lock 的線程將成功獲取該鎖定並返回。若是當前線程已經擁有該鎖定,此方法將當即返回。可使用 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此狀況是否發生。
此類的構造方法接受一個可選的公平參數。
當設置爲 true時,在多個線程的爭用下,這些鎖定傾向於將訪問權授予等待時間最長的線程。不然此鎖定將沒法保證任何特定訪問順序。
與採用默認設置(使用不公平鎖定)相比,使用公平鎖定的程序在許多線程訪問時表現爲很低的整體吞吐量(即速度很慢,經常極其慢),可是在得到鎖定和保證鎖定分配的均衡性時差別較小。不過要注意的是,公平鎖定不能保證線程調度的公平性。所以,使用公平鎖定的衆多線程中的一員可能得到多倍的成功機會,這種狀況發生在其餘活動線程沒有被處理而且目前並未持有鎖定時。還要注意的是,未定時的 tryLock 方法並無使用公平設置。由於即便其餘線程正在等待,只要該鎖定是可用的,此方法就能夠得到成功。
建議老是 當即實踐,使用 try 塊來調用 lock,在以前/以後的構造中,最典型的代碼以下:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
個人例子:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantLock; public class MyReentrantLock extends Thread{ TestReentrantLock lock; private int id; public MyReentrantLock(int i,TestReentrantLock test){ this.id=i; this.lock=test; } public void run(){ lock.print(id); } public static void main(String args[]){ ExecutorService service=Executors.newCachedThreadPool(); TestReentrantLock lock=new TestReentrantLock(); for(int i=0;i<10;i++){ service.submit(new MyReentrantLock(i,lock)); } service.shutdown(); } } class TestReentrantLock{ private ReentrantLock lock=new ReentrantLock(); public void print(int str){ try{ lock.lock(); System.out.println(str+"得到"); Thread.sleep((int)(Math.random()*1000)); } catch(Exception e){ e.printStackTrace(); } finally{ System.out.println(str+"釋放"); lock.unlock(); } } }
BlockingQueue
支持兩個附加操做的 Queue,這兩個操做是:檢索元素時等待隊列變爲非空,以及存儲元素時等待空間變得可用。
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用做指示 poll 操做失敗的警惕值。
BlockingQueue 能夠是限定容量的。它在任意給定時間均可以有一個 remainingCapacity,超出此容量,便沒法無阻塞地 put 額外的元素。
沒有任何內部容量約束的 BlockingQueue 老是報告 Integer.MAX_VALUE 的剩餘容量。
BlockingQueue 實現主要用於生產者-使用者隊列,但它另外還支持 Collection 接口。所以,舉例來講,使用 remove(x) 從隊列中移除任意一個元素是有可能的。
然而,這種操做一般不 會有效執行,只能有計劃地偶爾使用,好比在取消排隊信息時。
BlockingQueue 實現是線程安全的。全部排隊方法均可以使用內部鎖定或其餘形式的併發控制來自動達到它們的目的。
然而,大量的 Collection 操做(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。
所以,舉例來講,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(拋出一個異常)。
BlockingQueue 實質上不 支持使用任何一種「close」或「shutdown」操做來指示再也不添加任何項。
這種功能的需求和使用有依賴於實現的傾向。例如,一種經常使用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 對象,並根據使用者獲取這些對象的時間來對它們進行解釋。
下面的例子演示了這個阻塞隊列的基本功能。
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class MyBlockingQueue extends Thread { public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3); private int index; public MyBlockingQueue(int i) { this.index = i; } public void run() { try { queue.put(String.valueOf(this.index)); System.out.println("{" + this.index + "} in queue!"); } catch (Exception e) { e.printStackTrace(); } } public static void main(String args[]) { ExecutorService service = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { service.submit(new MyBlockingQueue(i)); } Thread thread = new Thread() { public void run() { try { while (true) { Thread.sleep((int) (Math.random() * 1000)); if(MyBlockingQueue.queue.isEmpty()) break; String str = MyBlockingQueue.queue.take(); System.out.println(str + " has take!"); } } catch (Exception e) { e.printStackTrace(); } } }; service.submit(thread); service.shutdown(); } }
---------------------執行結果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!
-----------------------------------------
CompletionService
將生產新的異步任務與使用已完成任務的結果分離開來的服務。生產者 submit 執行的任務。使用者 take 已完成的任務,
並按照完成這些任務的順序處理它們的結果。例如,CompletionService 能夠用來管理異步 IO ,執行讀操做的任務做爲程序或系統的一部分提交,
而後,當完成讀操做時,會在程序的不一樣部分執行其餘操做,執行操做的順序可能與所請求的順序不一樣。
一般,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種狀況下,
CompletionService 只管理一個內部完成隊列。ExecutorCompletionService 類提供了此方法的一個實現。
import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class MyCompletionService implements Callable<String> { private int id; public MyCompletionService(int i){ this.id=i; } public static void main(String[] args) throws Exception{ ExecutorService service=Executors.newCachedThreadPool(); CompletionService<String> completion=new ExecutorCompletionService<String>(service); for(int i=0;i<10;i++){ completion.submit(new MyCompletionService(i)); } for(int i=0;i<10;i++){ System.out.println(completion.take().get()); } service.shutdown(); } public String call() throws Exception { Integer time=(int)(Math.random()*1000); try{ System.out.println(this.id+" start"); Thread.sleep(time); System.out.println(this.id+" end"); } catch(Exception e){ e.printStackTrace(); } return this.id+":"+time; } }
CountDownLatch
一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。
用給定的計數 初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。
以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。這種現象只出現一次——計數沒法被重置。若是須要重置計數,請考慮使用 CyclicBarrier。
CountDownLatch 是一個通用同步工具,它有不少用途。將計數 1 初始化的 CountDownLatch 用做一個簡單的開/關鎖存器,
或入口:在經過調用 countDown() 的線程打開入口前,全部調用 await 的線程都一直在入口處等待。
用 N 初始化的 CountDownLatch 可使一個線程在 N 個線程完成某項操做以前一直等待,或者使其在某項操做完成 N 次以前一直等待。
CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,
而在全部線程都能經過以前,它只是阻止任何線程繼續經過一個 await。
一下的例子是別人寫的,很是形象。
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCountDownLatch { public static void main(String[] args) throws InterruptedException { // 開始的倒數鎖 final CountDownLatch begin = new CountDownLatch(1); // 結束的倒數鎖 final CountDownLatch end = new CountDownLatch(10); // 十名選手 final ExecutorService exec = Executors.newFixedThreadPool(10); for (int index = 0; index < 10; index++) { final int NO = index + 1; Runnable run = new Runnable() { public void run() { try { begin.await();//一直阻塞 Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + NO + " arrived"); } catch (InterruptedException e) { } finally { end.countDown(); } } }; exec.submit(run); } System.out.println("Game Start"); begin.countDown(); end.await(); System.out.println("Game Over"); exec.shutdown(); } }
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,後者是等待倒數到0,若是沒有到達0,就只有阻塞等待了。
CyclicBarrier
一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。
在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),
該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做 頗有用。
示例用法:下面是一個在並行分解設計中使用 barrier 的例子,很經典的旅行團例子:
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestCyclicBarrier { // 徒步須要的時間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan private static int[] timeWalk = { 5, 8, 15, 15, 10 }; // 自駕遊 private static int[] timeSelf = { 1, 3, 4, 4, 5 }; // 旅遊大巴 private static int[] timeBus = { 2, 4, 6, 6, 7 }; static String now() { SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); return sdf.format(new Date()) + ": "; } static class Tour implements Runnable { private int[] times; private CyclicBarrier barrier; private String tourName; public Tour(CyclicBarrier barrier, String tourName, int[] times) { this.times = times; this.tourName = tourName; this.barrier = barrier; } public void run() { try { Thread.sleep(times[0] * 1000); System.out.println(now() + tourName + " Reached Shenzhen"); barrier.await(); Thread.sleep(times[1] * 1000); System.out.println(now() + tourName + " Reached Guangzhou"); barrier.await(); Thread.sleep(times[2] * 1000); System.out.println(now() + tourName + " Reached Shaoguan"); barrier.await(); Thread.sleep(times[3] * 1000); System.out.println(now() + tourName + " Reached Changsha"); barrier.await(); Thread.sleep(times[4] * 1000); System.out.println(now() + tourName + " Reached Wuhan"); barrier.await(); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { } } } public static void main(String[] args) { // 三個旅行團 CyclicBarrier barrier = new CyclicBarrier(3); ExecutorService exec = Executors.newFixedThreadPool(3); exec.submit(new Tour(barrier, "WalkTour", timeWalk)); exec.submit(new Tour(barrier, "SelfTour", timeSelf)); //當咱們把下面的這段代碼註釋後,會發現,程序阻塞了,沒法繼續運行下去。 exec.submit(new Tour(barrier, "BusTour", timeBus)); exec.shutdown(); } }
CyclicBarrier最重要的屬性就是參與者個數,另外最要方法是await()。當全部線程都調用了await()後,就表示這些線程均可以繼續執行,不然就會等待。
Future
Future 表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。
計算完成後只能使用 get 方法來檢索結果,若有必要,計算完成前能夠阻塞此方法。取消則由 cancel 方法來執行。
還提供了其餘方法,以肯定任務是正常完成仍是被取消了。一旦計算完成,就不能再取消計算。
若是爲了可取消性而使用 Future但又不提供可用的結果,則能夠聲明 Future<?> 形式類型、並返回 null 做爲基礎任務的結果。
這個咱們在前面CompletionService已經看到了,這個Future的功能,並且這個能夠在提交線程的時候被指定爲一個返回對象的。
ScheduledExecutorService
一個 ExecutorService,可安排在給定的延遲後運行或按期執行的命令。
schedule 方法使用各類延遲建立任務,並返回一個可用於取消或檢查執行的任務對象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法建立並執行某些在取消前一直按期運行的任務。
用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,經過所請求的 0 延遲進行安排。
schedule 方法中容許出現 0 和負數延遲(但不是週期),並將這些視爲一種當即執行的請求。
全部的 schedule 方法都接受相對 延遲和週期做爲參數,而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉換成要求的形式很容易。
例如,要安排在某個之後的日期運行,可使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
可是要注意,因爲網絡時間同步協議、時鐘漂移或其餘因素的存在,所以相對延遲的期滿日期沒必要與啓用任務的當前 Date 相符。
Executors 類爲此包中所提供的 ScheduledExecutorService 實現提供了便捷的工廠方法。
一下的例子也是網上比較流行的。
import static java.util.concurrent.TimeUnit.SECONDS; import java.util.Date; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; public class TestScheduledThread { public static void main(String[] args) { final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2); final Runnable beeper = new Runnable() { int count = 0; public void run() { System.out.println(new Date() + " beep " + (++count)); } }; // 1秒鐘後運行,並每隔2秒運行一次 final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS); // 2秒鐘後運行,並每次在上次任務運行完後等待5秒後從新運行 final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS); // 30秒後結束關閉任務,而且關閉Scheduler scheduler.schedule(new Runnable() { public void run() { beeperHandle.cancel(true); beeperHandle2.cancel(true); scheduler.shutdown(); } }, 30, SECONDS); } }
這樣咱們就把concurrent包下比較重要的功能都已經總結完了,但願對咱們理解能有幫助。
http://www.ibm.com/developerworks/cn/java/j-5things4.html
http://www.ibm.com/developerworks/cn/java/j-5things5.html
Concurrent Collections 是 Java™ 5 的巨大附加產品,可是在關於註釋和泛型的爭執中不少 Java 開發人員忽視了它們。此外(或者更老實地說),許多開發人員避免使用這個數據包,由於他們認爲它必定很複雜,就像它所要解決的問題同樣。
事實上,java.util.concurrent
包含許多類,可以有效解決普通的併發問題,無需複雜工序。閱讀本文,瞭解 java.util.concurrent
類,好比 CopyOnWriteArrayList
和BlockingQueue
如何幫助您解決多線程編程的棘手問題。
儘管本質上 不是 Collections 類,但 java.util.concurrent.TimeUnit
枚舉讓代碼更易讀懂。使用 TimeUnit
將使用您的方法或 API 的開發人員從毫秒的 「暴政」 中解放出來。
TimeUnit
包括全部時間單位,從 MILLISECONDS
和 MICROSECONDS
到 DAYS
和 HOURS
,這就意味着它可以處理一個開發人員所需的幾乎全部的時間範圍類型。同時,由於在列舉上聲明瞭轉換方法,在時間加快時,將 HOURS
轉換回 MILLISECONDS
甚至變得更容易。
建立數組的全新副本是過於昂貴的操做,不管是從時間上,仍是從內存開銷上,所以在一般使用中不多考慮;開發人員每每求助於使用同步的ArrayList
。然而,這也是一個成本較高的選擇,由於每當您跨集合內容進行迭代時,您就不得不一樣步全部操做,包括讀和寫,以此保證一致性。
這又讓成本結構回到這樣一個場景:需多讀者都在讀取 ArrayList
,可是幾乎沒人會去修改它。
CopyOnWriteArrayList
是個巧妙的小寶貝,能解決這一問題。它的 Javadoc 將 CopyOnWriteArrayList
定義爲一個 「ArrayList
的線程安全變體,在這個變體中全部易變操做(添加,設置等)能夠經過複製全新的數組來實現」。
集合從內部將它的內容複製到一個沒有修改的新數組,這樣讀者訪問數組內容時就不會產生同步成本(由於他們歷來不是在易變數據上操做)。
本質上講,CopyOnWriteArrayList
很適合處理 ArrayList
常常讓咱們失敗的這種場景:讀取頻繁,但不多有寫操做的集合,例如 JavaBean 事件的 Listener
s。
BlockingQueue
接口表示它是一個 Queue
,意思是它的項以先入先出(FIFO)順序存儲。在特定順序插入的項以相同的順序檢索 — 可是須要附加保證,從空隊列檢索一個項的任未嘗試都會阻塞調用線程,直到這個項準備好被檢索。同理,想要將一個項插入到滿隊列的嘗試也會致使阻塞調用線程,直到隊列的存儲空間可用。
BlockingQueue
乾淨利落地解決了如何將一個線程收集的項「傳遞」給另外一線程用於處理的問題,無需考慮同步問題。Java Tutorial 的 Guarded Blocks 試用版就是一個很好的例子。它構建一個單插槽綁定的緩存,當新的項可用,並且插槽也準備好接受新的項時,使用手動同步和wait()
/notifyAll()
在線程之間發信。(詳見 Guarded Blocks 實現。)
儘管 Guarded Blocks 教程中的代碼有效,可是它耗時久,混亂,並且也並不是徹底直觀。退回到 Java 平臺較早的時候,沒錯,Java 開發人員不得不糾纏於這種代碼;但如今是 2010 年 — 狀況難道沒有改善?
清單 1 顯示了 Guarded Blocks 代碼的重寫版,其中我使用了一個 ArrayBlockingQueue
,而不是手寫的 Drop
。
import java.util.*; import java.util.concurrent.*; class Producer implements Runnable { private BlockingQueue<String> drop; List<String> messages = Arrays.asList( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "Wouldn't you eat ivy too?"); public Producer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { for (String s : messages) drop.put(s); drop.put("DONE"); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } class Consumer implements Runnable { private BlockingQueue<String> drop; public Consumer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { String msg = null; while (!((msg = drop.take()).equals("DONE"))) System.out.println(msg); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } public class ABQApp { public static void main(String[] args) { BlockingQueue<String> drop = new ArrayBlockingQueue(1, true); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
ArrayBlockingQueue
還體現了「公平」 — 意思是它爲讀取器和編寫器提供線程先入先出訪問。這種替代方法是一個更有效,但又冒窮盡部分線程風險的政策。(即,容許一些讀取器在其餘讀取器鎖定時運行效率更高,可是您可能會有讀取器線程的流持續不斷的風險,致使編寫器沒法進行工做。)
順便說一句,若是您注意到 Guarded Blocks 包含一個重大 bug,那麼您是對的 — 若是開發人員在 main()
中的Drop
實例上同步,會出現什麼狀況呢?
BlockingQueue
還支持接收時間參數的方法,時間參數代表線程在返回信號故障以插入或者檢索有關項以前須要阻塞的時間。這麼作會避免非綁定的等待,這對一個生產系統是致命的,由於一個非綁定的等待會很容易致使須要重啓的系統掛起。
Map
有一個微妙的併發 bug,這個 bug 將許多不知情的 Java 開發人員引入歧途。ConcurrentMap
是最容易的解決方案。
當一個 Map
被從多個線程訪問時,一般使用 containsKey()
或者 get()
來查看給定鍵是否在存儲鍵/值對以前出現。可是即便有一個同步的Map
,線程仍是能夠在這個過程當中潛入,而後奪取對 Map
的控制權。問題是,在對 put()
的調用中,鎖在 get()
開始時獲取,而後在能夠再次獲取鎖以前釋放。它的結果是個競爭條件:這是兩個線程之間的競爭,結果也會因誰先運行而不一樣。
若是兩個線程幾乎同時調用一個方法,二者都會進行測試,調用 put,在處理中丟失第一線程的值。幸運的是,ConcurrentMap
接口支持許多附加方法,它們設計用於在一個鎖下進行兩個任務:putIfAbsent()
,例如,首先進行測試,而後僅當鍵沒有存儲在 Map
中時進行 put。
根據 Javadoc,SynchronousQueue
是個有趣的東西:
這是一個阻塞隊列,其中,每一個插入操做必須等待另外一個線程的對應移除操做,反之亦然。一個同步隊列不具備任何內部容量,甚至不具備 1 的容量。
本質上講,SynchronousQueue
是以前提過的 BlockingQueue
的又一實現。它給咱們提供了在線程之間交換單一元素的極輕量級方法,使用 ArrayBlockingQueue
使用的阻塞語義。在清單 2 中,我重寫了 清單 1 的代碼,使用 SynchronousQueue
替代ArrayBlockingQueue
:
import java.util.*; import java.util.concurrent.*; class Producer implements Runnable { private BlockingQueue<String> drop; List<String> messages = Arrays.asList( "Mares eat oats", "Does eat oats", "Little lambs eat ivy", "Wouldn't you eat ivy too?"); public Producer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { for (String s : messages) drop.put(s); drop.put("DONE"); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } class Consumer implements Runnable { private BlockingQueue<String> drop; public Consumer(BlockingQueue<String> d) { this.drop = d; } public void run() { try { String msg = null; while (!((msg = drop.take()).equals("DONE"))) System.out.println(msg); } catch (InterruptedException intEx) { System.out.println("Interrupted! " + "Last one out, turn out the lights!"); } } } public class SynQApp { public static void main(String[] args) { BlockingQueue<String> drop = new SynchronousQueue<String>(); (new Thread(new Producer(drop))).start(); (new Thread(new Consumer(drop))).start(); } }
實現代碼看起來幾乎相同,可是應用程序有額外獲益:SynchronousQueue
容許在隊列進行一個插入,只要有一個線程等着使用它。
在實踐中,SynchronousQueue
相似於 Ada 和 CSP 等語言中可用的 「會合通道」。
併發 Collections 提供了線程安全、通過良好調優的數據結構,簡化了併發編程。然而,在一些情形下,開發人員須要更進一步,思考如何調節和/或限制線程執行。因爲 java.util.concurrent
的整體目標是簡化多線程編程,您可能但願該包包含同步實用程序,而它確實包含。
本文是 第 1 部分 的延續,將介紹幾個比核心語言原語(監視器)更高級的同步結構,但它們還未包含在 Collection 類中。一旦您瞭解了這些鎖和門的用途,使用它們將很是直觀。
您以爲本身懂 Java 編程?事實是,大多數開發人員都只領會到了 Java 平臺的皮毛,所學也只夠應付工做。在本系列 中,Ted Neward 深度挖掘 Java 平臺的核心功能,揭示一些不爲人知的事實,幫助您解決最棘手的編程困難。
在一些企業系統中,開發人員常常須要限制未處理的特定資源請求(線程/操做)數量,事實上,限制有時候可以提升系統的吞吐量,由於它們減小了對特定資源的爭用。儘管徹底能夠手動編寫限制代碼,但使用 Semaphore 類能夠更輕鬆地完成此任務,它將幫您執行限制,如清單 1 所示:
import java.util.*;import java.util.concurrent.*; public class SemApp { public static void main(String[] args) { Runnable limitedCall = new Runnable() { final Random rand = new Random(); final Semaphore available = new Semaphore(3); int count = 0; public void run() { int time = rand.nextInt(15); int num = count++; try { available.acquire(); System.out.println("Executing " + "long-running action for " + time + " seconds... #" + num); Thread.sleep(time * 1000); System.out.println("Done with #" + num + "!"); available.release(); } catch (InterruptedException intEx) { intEx.printStackTrace(); } } }; for (int i=0; i<10; i++) new Thread(limitedCall).start(); } }
即便本例中的 10 個線程都在運行(您能夠對運行 SemApp
的 Java 進程執行 jstack
來驗證),但只有 3 個線程是活躍的。在一個信號計數器釋放以前,其餘 7 個線程都處於空閒狀態。(實際上,Semaphore
類支持一次獲取和釋放多個 permit,但這不適用於本場景。)
若是 Semaphore
是容許一次進入一個(這可能會勾起一些流行夜總會的保安的記憶)線程的併發性類,那麼 CountDownLatch
就像是賽馬場的起跑門柵。此類持有全部空閒線程,直到知足特定條件,這時它將會一次釋放全部這些線程。
import java.util.*; import java.util.concurrent.*; class Race { private Random rand = new Random(); private int distance = rand.nextInt(250); private CountDownLatch start; private CountDownLatch finish; private List<String> horses = new ArrayList<String>(); public Race(String... names) { this.horses.addAll(Arrays.asList(names)); } public void run() throws InterruptedException { System.out.println("And the horses are stepping up to the gate..."); final CountDownLatch start = new CountDownLatch(1); final CountDownLatch finish = new CountDownLatch(horses.size()); final List<String> places = Collections.synchronizedList(new ArrayList<String>()); for (final String h : horses) { new Thread(new Runnable() { public void run() { try { System.out.println(h + " stepping up to the gate..."); start.await(); int traveled = 0; while (traveled < distance) { // In a 0-2 second period of time.... Thread.sleep(rand.nextInt(3) * 1000); // ... a horse travels 0-14 lengths traveled += rand.nextInt(15); System.out.println(h + " advanced to " + traveled + "!"); } finish.countDown(); System.out.println(h + " crossed the finish!"); places.add(h); } catch (InterruptedException intEx) { System.out.println("ABORTING RACE!!!"); intEx.printStackTrace(); } } }).start(); } System.out.println("And... they're off!"); start.countDown(); finish.await(); System.out.println("And we have our winners!"); System.out.println(places.get(0) + " took the gold..."); System.out.println(places.get(1) + " got the silver..."); System.out.println("and " + places.get(2) + " took home the bronze."); } } public class CDLApp { public static void main(String[] args) throws InterruptedException, java.io.IOException { System.out.println("Prepping..."); Race r = new Race( "Beverly Takes a Bath", "RockerHorse", "Phineas", "Ferb", "Tin Cup", "I'm Faster Than a Monkey", "Glue Factory Reject" ); System.out.println("It's a race of " + r.getDistance() + " lengths"); System.out.println("Press Enter to run the race...."); System.in.read(); r.run(); } }
注意,在 清單 2 中,CountDownLatch
有兩個用途:首先,它同時釋放全部線程,模擬馬賽的起點,但隨後會設置一個門閂模擬馬賽的終點。這樣,「主」 線程就能夠輸出結果。 爲了讓馬賽有更多的輸出註釋,能夠在賽場的 「轉彎處」 和 「半程」 點,好比賽馬跨過跑道的四分之1、二分之一和四分之三線時,添加 CountDownLatch
。
清單 1 和 清單 2 中的示例都存在一個重要的缺陷,它們要求您直接建立 Thread
對象。這能夠解決一些問題,由於在一些 JVM 中,建立Thread
是一項重量型的操做,重用現有 Thread
比建立新線程要容易得多。而在另外一些 JVM 中,狀況正好相反:Thread
是輕量型的,能夠在須要時很容易地新建一個線程。固然,若是 Murphy 擁有本身的解決辦法(他一般都會擁有),那麼您不管使用哪一種方法對於您最終將部署的平臺都是不對的。
JSR-166 專家組(參見 參考資料)在必定程度上預測到了這一情形。Java 開發人員無需直接建立 Thread
,他們引入了 Executor
接口,這是對建立新線程的一種抽象。如清單 3 所示,Executor
使您沒必要親自對 Thread
對象執行 new
就可以建立新線程:
Executor exec = getAnExecutorFromSomeplace(); exec.execute(new Runnable() { ... });
使用 Executor
的主要缺陷與咱們在全部工廠中遇到的同樣:工廠必須來自某個位置。不幸的是,與 CLR 不一樣,JVM 沒有附帶一個標準的 VM 級線程池。
Executor
類實際上 充當着一個提供 Executor
實現實例的共同位置,但它只有 new
方法(例如用於建立新線程池);它沒有預先建立實例。因此您能夠自行決定是否但願在代碼中建立和使用 Executor
實例。(或者在某些狀況下,您將可以使用所選的容器/平臺提供的實例。)
儘管沒必要擔憂 Thread
來自何處,但 Executor
接口缺少 Java 開發人員可能指望的某種功能,好比結束一個用於生成結果的線程並以非阻塞方式等待結果可用。(這是桌面應用程序的一個常見需求,用戶將執行須要訪問數據庫的 UI 操做,而後若是該操做花費了很長時間,可能但願在它完成以前取消它。)
對於此問題,JSR-166 專家建立了一個更加有用的抽象(ExecutorService
接口),它將線程啓動工廠建模爲一個可集中控制的服務。例如,無需每執行一項任務就調用一次 execute()
,ExecutorService
能夠接受一組任務並返回一個表示每項任務的將來結果的將來列表。
儘管 ExecutorService
接口很是有用,但某些任務仍須要以計劃方式執行,好比以肯定的時間間隔或在特定時間執行給定的任務。這就是ScheduledExecutorService
的應用範圍,它擴展了 ExecutorService
。
若是您的目標是建立一個每隔 5 秒跳一次的 「心跳」 命令,使用 ScheduledExecutorService
能夠輕鬆實現,如清單 4 所示:
import java.util.concurrent.*; public class Ping { public static void main(String[] args) { ScheduledExecutorService ses = Executors.newScheduledThreadPool(1); Runnable pinger = new Runnable() { public void run() { System.out.println("PING!"); } }; ses.scheduleAtFixedRate(pinger, 5, 5, TimeUnit.SECONDS); } }
這項功能怎麼樣?不用過於擔憂線程,不用過於擔憂用戶但願取消心跳時會發生什麼,也不用明確地將線程標記爲前臺或後臺;只需將全部的計劃細節留給 ScheduledExecutorService
。
順便說一下,若是用戶但願取消心跳,scheduleAtFixedRate
調用將返回一個 ScheduledFuture
實例,它不只封裝告終果(若是有),還擁有一個 cancel
方法來關閉計劃的操做。
爲阻塞操做設置一個具體的超時值(以免死鎖)的能力是 java.util.concurrent
庫相比起早期併發特性的一大進步,好比監控鎖定。
這些方法幾乎老是包含一個 int
/TimeUnit
對,指示這些方法應該等待多長時間才釋放控制權並將其返回給程序。它須要開發人員執行更多工做 — 若是沒有獲取鎖,您將如何從新獲取? — 但結果幾乎老是正確的:更少的死鎖和更加適合生產的代碼。(關於編寫生產就緒代碼的更多信息,請參見 參考資料 中 Michael Nygard 編寫的 Release It!。)
java.util.concurrent
包還包含了其餘許多好用的實用程序,它們很好地擴展到了 Collections 以外,尤爲是在 .locks
和 .atomic
包中。深刻研究,您還將發現一些有用的控制結構,好比 CyclicBarrier
等。