每一個正在系統上運行的程序都是一個進程。每一個進程包含一到多個線程。線程是一組指令的集合,或者是程序的特殊段,它能夠在程序裏獨立執行。也能夠把它理解爲代碼運行的上下文。因此線程基本上是輕量級的進程,它負責在單個程序裏執行多任務。一般由操做系統負責多個線程的調度和執行。html
使用線程能夠把佔據時間長的程序中的任務放到後臺去處理,程序的運行速度可能加快,在一些等待的任務實現上如用戶輸入、文件讀寫和網絡收發數據等,線程就比較有用了。在這種狀況下能夠釋放一些珍貴的資源如內存佔用等等。java
若是有大量的線程,會影響性能,由於操做系統須要在它們之間切換,更多的線程須要更多的內存空間,線程的停止須要考慮其對程序運行的影響。一般塊模型數據是在多個線程間共享的,須要防止線程死鎖狀況的發生。程序員
總結:進程是全部線程的集合,每個線程是進程中的一條執行路徑。算法
1. 初始(NEW):新建立了一個線程對象,但尚未調用start()方法。
2. 運行(RUNNABLE):Java線程中將就緒(ready)和運行中(running)兩種狀態籠統的稱爲「運行」。
線程對象建立後,其餘線程(好比main線程)調用了該對象的start()方法。該狀態的線程位於可運行線程池中,等待被線程調度選中,獲取CPU的使用權,此時處於就緒狀態(ready)。就緒狀態的線程在得到CPU時間片後變爲運行中狀態(running)。
3.阻塞(BLOCKED):表示線程阻塞於鎖。
4.等待(WAITING):進入該狀態的線程須要等待其餘線程作出一些特定動做(通知或中斷)。
5.超時等待(TIMED_WAITING):該狀態不一樣於WAITING,它能夠在指定的時間後自行返回。6. 終止(TERMINATED):表示該線程已經執行完畢。數據庫
實現Runnable接口和繼承Thread能夠獲得一個線程類,new一個實例出來,線程就進入了初始狀態。api
線程調度程序從可運行池中選擇一個線程做爲當前線程時線程所處的狀態。這也是線程進入運行狀態的惟一一種方式。數組
阻塞狀態是線程阻塞在進入synchronized關鍵字修飾的方法或代碼塊(獲取鎖)時的狀態。緩存
處於這種狀態的線程不會被分配CPU執行時間,它們要等待被顯式地喚醒,不然會處於無限期等待的狀態。安全
處於這種狀態的線程不會被分配CPU執行時間,不過無須無限期等待被其餘線程顯示地喚醒,在達到必定時間後它們會自動喚醒。網絡
傳統是相對於JDK1.5而言的
傳統線程技術與JDK1.5的線程併發庫
線程就是程序的一條執行線索/線路。
建立線程的兩種傳統方式
Thread thread = new Thread(){ @Override public void run(){ while (true){ //獲取當前線程對象 獲取線程名字
Thread.currentThread() threadObj.getName() //讓線程暫停,休眠,此方法會拋出中斷異常InterruptedException
Thread.sleep(毫秒值); } } }; thread.start();
Thread thread = new Thread(new Runnable(){ public void run(){ } }); thread.start();
問題:下邊的線程運行的是Thread子類中的方法仍是實現Runnable接口類的方法
new Thread(Runnable.run()){run()}.start();
由Thread類中的run方法源代碼中看出,兩種傳統建立線程的方式都是在調用Thread對象的run方法,若是Thread對象的run方法沒有被覆蓋,而且像上邊的問題那樣爲Thread對象傳遞了一個Runnable對象,就會調用Runnable對象的run方法。
多線程並不必定會提升程序的運行效率。
經常使用線程api方法 |
|
start() |
啓動線程 |
currentThread() |
獲取當前線程對象 |
getID() |
獲取當前線程ID Thread-編號 該編號從0開始 |
getName() |
獲取當前線程名稱 |
sleep(long mill) |
休眠線程 |
Stop() |
中止線程, |
經常使用線程構造函數 |
|
Thread() |
分配一個新的 Thread 對象 |
Thread(String name) |
分配一個新的 Thread對象,具備指定的 name正如其名。 |
Thread(Runable r) |
分配一個新的 Thread對象 |
Thread(Runable r, String name) |
分配一個新的 Thread對象 |
Java中有兩種線程,一種是用戶線程,另外一種是守護線程。用戶線程是指用戶自定義建立的線程,主線程中止,用戶線程不會中止。守護線程當進程不存在或主線程中止,守護線程也會被中止。
使用setDaemon(true)方法設置爲守護線程
/**
* 什麼是守護線程? 守護線程 進程線程(主線程掛了) 守護線程也會被自動銷燬. *
* @classDesc: 功能描述:(守護線程)*/
public class DaemonThread { public static void main(String[] args) { Thread thread = new Thread(new Runnable() { @Override public void run() { while (true) { try { Thread.sleep(100); } catch (Exception e) { // TODO: handle exception
} System.out.println("我是子線程..."); } } }); thread.setDaemon(true); thread.start(); for (int i = 0; i < 10; i++) { try { Thread.sleep(100); } catch (Exception e) { } System.out.println("我是主線程"); } System.out.println("主線程執行完畢!"); } }
傳統定時器的建立:直接使用定時器類Timer
new Timer().schedule(TimerTask定時任務, Date time定的時間);
new Timer().schedule(TimerTask定時任務, Long延遲(第一次執行)時間, Long間隔時間);
TimerTask與Runnable相似,有一個run方法,Timer是定時器對象,到時間後會觸發炸彈(TimerTask)對象
new Timer().schedule( new TimerTask()定時執行的任務 { public void run() { SOP(「bombing」); } 顯示計時信息 while (true) { SOP(new Date().getSeconds()); Thread.sleep(1000); } },10 定好的延遲時間,10秒之後執行任務 );
public class T { static class T2 extends TimerTask{ @Override public void run() { System.out.println("T1~boom~~~"); new Timer().schedule(new T1(), 2000); } } static class T1 extends TimerTask{ @Override public void run() { System.out.println("T2~boom~~~"); new Timer().schedule(new T2(), 5000); } } public static void main(String[] args) { new Timer().schedule(new T1(), 2000); while(true) { System.out.println(new Date().getSeconds()); try { Thread.sleep(1000); } catch (InterruptedException e) {
e.printStackTrace(); } } } }
互斥方法:
同步方法上邊用的鎖就是this對象
靜態同步方法使用的鎖是該方法所在的class文件對象
使用synchronized關鍵字實現互斥,要保證同步的地方使用的是同一個鎖對象
/** * 子線程循環10次,回主線程循環10次, * 再到子線程循環10次,再回主線程循環10次 * 如此循環50次 */
class Business { private boolean bShouleSub = true; public synchronized void sub(int x) throws InterruptedException { while (bShouleSub) { for (int i = 1; i < 11; i++) System.out.println("sub~x:" + x + ",i:" + i); bShouleSub = false; this.notify(); }this.wait(); } public synchronized void main(int x) throws InterruptedException { while (!bShouleSub) { for (int i = 1; i < 11; i++) System.out.println("main~x:" + x + ",i:" + i); bShouleSub = true; this.notify(); }
this.wait(); } }
public class T { public static void main(String[] args) throws InterruptedException { Business b = new Business(); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 50; i++) { try { b.sub(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); new Thread(new Runnable() { @Override public void run() { for (int i = 0; i < 50; i++) { try { b.main(i); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } }
要用到共同數據(包括同步鎖)或相同算法的多個方法要封裝在一個類中
鎖是上在表明要操做的資源類的內部方法中的,而不是上在線程代碼中的。這樣寫出來的類就是自然同步的,只要使用的是同一個new出來的對象,那麼這個對象就具備同步互斥特性
判斷喚醒等待標記時使用while增長程序健壯性,防止僞喚醒
線程範圍內共享數據圖解:
class ThreadScopeShareData{ 三個模塊共享數據,主線程模塊和AB模塊 private static int data = 0; 準備共享的數據 存放各個線程對應的數據 private Map<Thread, Integer> threadData = new HashMap<Thread, Integer>(); public static void main(String[] args){
建立兩個線程 for (int i=0; i<2; i++){ new Thread( new Runnable(){ public void run(){
如今當前線程中修改一下數據,給出修改信息 data = new Random().nextInt(); SOP(Thread.currentThread().getName()+將數據改成+data); 將線程信息和對應數據存儲起來 threadData.put(Thread.currentThread(), data); 使用兩個不一樣的模塊操做這個數據,看結果 new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ data = threadData.get(Thread.currentThread()); SOP(A+Thread.currentThread().getName()+拿到的數據+data); } } static class B{ public void get(){ data = threadData.get(Thread.currentThread()); SOP(B+Thread.currentThread().getName()+拿到的數據+data); } } }
若是每一個線程執行的代碼相同,可使用同一個Runnable對象,這個Runnable對象中有那個共享數據,例如,買票系統就能夠這麼作。
若是每一個線程執行的代碼不一樣,這時候須要用不一樣的Runnable對象,有以下兩種方式來實現這些Runnable對象之間的數據共享:
將共享數據封裝在另一個對象中,而後將這個對象逐一傳遞給各個Runnable對象。每一個線程對共享數據的操做方法也分配到那個對象身上去完成,這樣容易實現針對該數據進行的各個操做的互斥和通訊。
public class Test { public static void main(String[] args) { ShareData shareDate = new ShareData(); new Thread(new MyRunnable1(shareDate)).start(); new Thread(new MyRunnable2(shareDate)).start(); } } class MyRunnable1 implements Runnable{ private ShareData shareData; public MyRunnable1(ShareData shareData) { this.shareData = shareData; } @Override public void run() { while(true) { shareData.increase(); } } } class MyRunnable2 implements Runnable{ private ShareData shareData; public MyRunnable2(ShareData shareData) { this.shareData = shareData; } @Override public void run() { while(true) { shareData.decrease(); } } } class ShareData{ int count = 0; synchronized void increase() { count++; } synchronized void decrease() { count--; } }
將這些Runnable對象做爲某一個類中的內部類,共享數據做爲這個外部類中的成員變量,每一個線程對共享數據的操做方法也分配給外部類,以便實現對共享數據進行的各個操做的互斥和通訊,做爲內部類的各個Runnable對象調用外部類的這些方法。
上面兩種方式的組合:將共享數據封裝在另一個對象中,每一個線程對共享數據的操做方法也分配到那個對象身上去完成,對象做爲這個外部類中的成員變量或方法中的局部變量,每一個線程的Runnable對象做爲外部類中的成員內部類或局部內部類。
總之,要同步互斥的幾段代碼最好是分別放在幾個獨立的方法中,這些方法再放在同一個類中,這樣比較容易實現它們之間的同步互斥和通訊。
極端且簡單的方式,即在任意一個類中定義一個static的變量,這將被全部線程共享。
ThreadLocal的做用和目的:用於實現線程內的數據共享,即對於相同的程序代碼,多個模塊在同一個線程中運行時要共享一份數據,而在另外線程中運行時又共享另一份數據。
每一個線程調用全局ThreadLocal對象的set方法,就至關於往其內部的map中增長一條記錄,key分別是各自的線程,value是各自的set方法傳進去的值。在線程結束時能夠調用ThreadLocal.clear()方法,這樣會更快釋放內存,不調用也能夠,由於線程結束後也能夠自動釋放相關的ThreadLocal變量。
ThreadLocal的應用場景:
訂單處理包含一系列操做:減小庫存量、增長一條流水臺帳、修改總帳,這幾個操做要在同一個事務中完成,一般也即同一個線程中進行處理,若是累加公司應收款的操做失敗了,則應該把前面的操做回滾,不然,提交全部操做,這要求這些操做使用相同的數據庫鏈接對象,而這些操做的代碼分別位於不一樣的模塊類中。
銀行轉帳包含一系列操做: 把轉出賬戶的餘額減小,把轉入賬戶的餘額增長,這兩個操做要在同一個事務中完成,它們必須使用相同的數據庫鏈接對象,轉入和轉出操做的代碼分別是兩個不一樣的賬戶對象的方法。
例如Strut2的ActionContext,同一段代碼被不一樣的線程調用運行時,該代碼操做的數據是每一個線程各自的狀態和數據,對於不一樣的線程來講,getContext方法拿到的對象都不相同,對同一個線程來講,無論調用getContext方法多少次和在哪一個模塊中getContext方法,拿到的都是同一個。
實驗案例:定義一個全局共享的ThreadLocal變量,而後啓動多個線程向該ThreadLocal變量中存儲一個隨機值,接着各個線程調用另外其餘多個類的方法,這多個類的方法中讀取這個ThreadLocal變量的值,就能夠看到多個類在同一個線程中共享同一份數據。
實現對ThreadLocal變量的封裝,讓外界不要直接操做ThreadLocal變量。
對基本類型的數據的封裝,這種應用相對不多見。
對對象類型的數據的封裝,比較常見,即讓某個類針對不一樣線程分別建立一個獨立的實例對象。
一個ThreadLocal對象只能記錄一個線程內部的一個共享變量,須要記錄多個共享數據,能夠建立多個ThreadLocal對象,或者將這些數據進行封裝,將封裝後的數據對象存入ThreadLocal對象中。
將數據對象封裝成單例,同時提供線程範圍內的共享數據的設置和獲取方法,提供已經封裝好了的線程範圍內的對象實例,使用時只需獲取實例對象便可實現數據的線程範圍內的共享,由於該對象已是當前線程範圍內的對象了。
import java.util.Random; public class ThreadLocalShareDataDemo { /** * ThreadLocal類及應用技巧 將線程範圍內共享數據進行封裝,封裝到一個單獨的數據類中,提供設置獲取方法 * 將該類單例化,提供獲取實例對象的方法,獲取到的實例對象是已經封裝好的當前線程範圍內的對象 */
public static void main(String[] args) { for (int i = 0; i < 2; i++) { new Thread(new Runnable() { public void run() { int data = new Random().nextInt(889); System.out.println(Thread.currentThread().getName() + "產生數據:" + data); MyData myData = MyData.getInstance(); myData.setAge(data); myData.setName("Name:" + data); new A().get(); new B().get(); } }).start(); } } static class A { // 能夠直接使用獲取到的線程範圍內的對象實例調用相應方法
String name = MyData.getInstance().getName(); int age = MyData.getInstance().getAge(); public void get() { System.out.println(Thread.currentThread().getName() + "-- AA name:" + name + "...age:" + age); } } static class B { // 能夠直接使用獲取到的線程範圍內的對象實例調用相應方法
String name = MyData.getInstance().getName(); int age = MyData.getInstance().getAge(); public void get() { System.out.println(Thread.currentThread().getName() + "-- BB name:" + name + "...age:" + age); } } static class MyData { private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } // 單例
private MyData() { }; // 提供獲取實例方法
public static MyData getInstance() { // 從當前線程範圍內數據集中獲取實例對象
MyData instance = threadLocal.get(); if (instance == null) { instance = new MyData(); threadLocal.set(instance); } return instance; } // 將實例對象存入當前線程範圍內數據集中
static ThreadLocal<MyData> threadLocal = new ThreadLocal<MyData>(); } }
ExecutorService threadPool = Executors.newFixedThreadPool(3);
建立緩存線程池
ExecutorService threadPool = Executors.newCacheThreadPool();
建立單一線程池
ExecutorService threadPool = Executors.newSingleThreadExector();
threadPool.shutdown() 線程所有空閒,沒有任務就關閉線程池;threadPool.shutdownNow() 無論任務有沒有作完,都關掉
Executors.newScheduledThreadPool(線程數).schedule(Runnable, 延遲時間,時間單位);
Executors.newScheduledThreadPool(線程數). scheduleAtFixedRate (Runnable, 延遲時間,間隔時間,時間單位);
Future取得的結果類型和Callable返回的結果類型必須一致,這是經過泛型來實現的。
Callable要採用ExecutorSevice的submit方法提交,返回的future對象能夠取消任務。
CompletionService用於提交一組Callable任務,其take方法返回已完成的一個Callable任務對應的Future對象。
public static void main(String[] args) { ExecutorService threadPool = Executors.newScheduledThreadPool(5); Future<String> future = threadPool.submit( new Callable<String>() { @Override public String call() throws Exception { return "result"; } }); System.out.println("等待結果~~~"); try { String result = future.get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
CompletionService用於提交一組Callable任務,其take方法返回一個已完成的Callable任務對應的Future對象。比如同時種幾塊麥子等待收割,收割時哪塊先熟先收哪塊。
將生產新的異步任務與使用已完成任務的結果分離開來的服務。生產者 submit 執行的任務。使用者 take 已完成的任務,並按照完成這些任務的順序處理它們的結果。例如,CompletionService 能夠用來管理異步 IO ,執行讀操做的任務做爲程序或系統的一部分提交,而後,當完成讀操做時,會在程序的不一樣部分執行其餘操做,執行操做的順序可能與所請求的順序不一樣。
一般,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種狀況下,CompletionService 只管理一個內部完成隊列。ExecutorCompletionService 類提供了此方法的一個實現。
public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); CompletionService<Integer> completionService = new ExecutorCompletionService<>(service); //向線程池提交一組任務
for (int i = 0; i < 10; i++) { final int j = i; completionService.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { Thread.sleep(new Random().nextInt(5000)); return j; } }); } //哪一個線程先執行完,哪一個就輸出結果
for (int i = 0; i < 10; i++) { try { System.out.println(completionService.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
Lock比傳統線程模型中的synchronized方式更加面向對象,與生活中的鎖相似,鎖自己也應該是一個對象。兩個線程執行的代碼片斷要實現同步互斥的效果,它們必須用同一個Lock對象。
讀寫鎖:分爲讀鎖和寫鎖,多個讀鎖不互斥,讀鎖與寫鎖互斥,這是由jvm本身控制的,你只要上好相應的鎖便可。若是你的代碼只讀數據,能夠不少人同時讀,但不能同時寫,那就上讀鎖;若是你的代碼修改數據,只能有一我的在寫,且不能同時讀取,那就上寫鎖。總之,讀的時候上讀鎖,寫的時候上寫鎖!
在等待 Condition 時,容許發生「虛假喚醒」,這一般做爲對基礎平臺語義的讓步。對於大多數應用程序,這帶來的實際影響很小,由於 Condition 應該老是在一個循環中被等待,並測試正被等待的狀態聲明。某個實現能夠隨意移除可能的虛假喚醒,但建議應用程序程序員老是假定這些虛假喚醒可能發生,所以老是在一個循環中等待。
一個鎖內部能夠有多個Condition,即有多路等待和通知,能夠參看jdk1.5提供的Lock與Condition實現的可阻塞隊列的應用案例,從中除了要體味算法,還要體味面向對象的封裝。在傳統的線程機制中一個監視器對象上只能有一路等待和通知,要想實現多路等待和通知,必須嵌套使用多個同步監視器對象。(若是隻用一個Condition,兩個放的都在等,一旦一個放的進去了,那麼它通知可能會致使另外一個放接着往下走。)
java.util.concurrent.locks 爲鎖和等待條件提供一個框架的接口和類
接口摘要 |
||
Condition |
Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成大相徑庭的對象,以便經過將這些對象與任意 Lock 實現組合使用,爲每一個對象提供多個等待 set(wait-set)。 |
|
Lock |
Lock 實現提供了比使用 synchronized 方法和語句可得到的更普遍的鎖定操做。 |
|
ReadWriteLock |
ReadWriteLock 維護了一對相關的鎖,一個用於只讀操做,另外一個用於寫入操做。 |
|
類摘要 |
||
AbstractOwnableSynchronizer |
能夠由線程以獨佔方式擁有的同步器。 |
|
AbstractQueuedLongSynchronizer |
以 long 形式維護同步狀態的一個 AbstractQueuedSynchronizer 版本。 |
|
AbstractQueuedSynchronizer |
爲實現依賴於先進先出 (FIFO) 等待隊列的阻塞鎖和相關同步器(信號量、事件,等等)提供一個框架。 |
|
LockSupport |
用來建立鎖和其餘同步類的基本線程阻塞原語。 |
|
ReentrantLock |
一個可重入的互斥鎖 Lock,它具備與使用 synchronized 方法和語句所訪問的隱式監視器鎖相同的一些基本行爲和語義,但功能更強大。 |
|
ReentrantReadWriteLock |
支持與 ReentrantLock 相似語義的 ReadWriteLock 實現。 |
|
ReentrantReadWriteLock.ReadLock |
ReentrantReadWriteLock.readLock() 方法返回的鎖。 |
|
ReentrantReadWriteLock.WriteLock |
ReentrantReadWriteLock.writeLock() 方法返回的鎖。 |
Lock比傳統線程模型中的synchronized更加面向對象,鎖自己也是一個對象,兩個線程執行的代碼要實現同步互斥效果,就要使用同一個鎖對象。鎖要上在要操做的資源類的內部方法中,而不是線程代碼中。
public interface Lock
全部已知實現類:
隨着靈活性的增長,也帶來了更多的責任。不使用塊結構鎖就失去了使用 synchronized 方法和語句時會出現的鎖自動釋放功能。在大多數狀況下,應該使用如下語句:
Lock l = ...; l.lock(); try { // access the resource protected by this lock
} finally { l.unlock(); }
鎖定和取消鎖定出如今不一樣做用範圍中時,必須謹慎地確保保持鎖定時所執行的全部代碼用 try-finally 或 try-catch 加以保護,以確保在必要時釋放鎖。
方法摘要 |
|
void |
lock() 獲取鎖。 |
void |
lockInterruptibly() 若是當前線程未被中斷,則獲取鎖。 |
Condition |
newCondition() 返回綁定到此 Lock 實例的新 Condition 實例。 |
boolean |
tryLock() 僅在調用時鎖爲空閒狀態才獲取該鎖。 |
boolean |
tryLock(long time, TimeUnit unit) 若是鎖在給定的等待時間內空閒,而且當前線程未被中斷,則獲取鎖。 |
void |
unlock() 釋放鎖。 |
Lock與synchronized對比,打印字符串例子
class Outputer{ public void outpur(String name){ int len = name.length(); synchronized (Outputer.class){ for (int i=0; i<len; i++){
//不換行逐個打印字符 SO(name.charAt[i]); } SOP()換行 } } }
//使用Lock的代碼
class Outputer {
//Lock是接口,需new實現類對象 Lock lock = new ReentrantLock(); public void outpur(String name){ int len = name.length(); lock.lock() 鎖上 for (int i=0; i<len; i++){
//不換行逐個打印字符 SO(name.charAt[i]); } SOP()換行 lock.unlock(); 解鎖
} }
例子1:
import java.util.Random; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * -----------------------讀寫鎖的例子--------------------------- */
public class ReadWriteLockTest { public static void main(String[] args) { final Queue3 q3 = new Queue3(); for(int i=0;i<3;i++) { new Thread(){ public void run(){ while(true){ q3.get(); } } }.start(); } for(int i=0;i<3;i++) { new Thread(){ public void run(){ while(true){ q3.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue3{ private Object data = null;//共享數據,只能有一個線程能寫該數據,但能夠有多個線程同時讀該數據。
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ rwl.readLock().lock(); System.out.println(Thread.currentThread().getName() + " be ready to read data!"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "have read data :" + data); rwl.readLock().unlock(); } public void put(Object data){ rwl.writeLock().lock(); System.out.println(Thread.currentThread().getName() + " be ready to write data!"); try { Thread.sleep((long)(Math.random()*1000)); } catch (InterruptedException e) { e.printStackTrace(); } this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); rwl.writeLock().unlock(); } }
例子2:
class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { // Must release read lock before acquiring write lock
rwl.readLock().unlock(); rwl.writeLock().lock(); try { // Recheck state because another thread might have // acquired write lock and changed state before we did.
if (!cacheValid) { data = ... cacheValid = true; } // Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); // Unlock write, still hold read
} } try { use(data); } finally { rwl.readLock().unlock(); } } }
Condition的例子:實現兩個線程交替執行
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); final Business2 business = new Business2(); service.execute(new Runnable(){ public void run() { for(int i=0;i<50;i++){ business.sub(); } } }); for(int i=0;i<50;i++){ business.main(); } } } class Business2{ Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); boolean bShouldSub = true; public void sub(){ lock.lock(); if(!bShouldSub) try { //等待
condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { for(int i=0;i<10;i++){ System.out.println(Thread.currentThread().getName() + " : " + i); } bShouldSub = false; //喚醒
condition.signal(); }finally{ lock.unlock(); } } public void main(){ lock.lock(); if(bShouldSub) try { //等待
condition.await(); } catch (InterruptedException e) { e.printStackTrace(); } try { for(int i=0;i<5;i++){ System.out.println(Thread.currentThread().getName() + " : " + i); } bShouldSub = true; //喚醒
condition.signal(); }finally{ lock.unlock(); } } }
Condition的例子:2個condition實現可阻塞隊列
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
Condition的例子:實現三個線程交替運行的效果
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ThreeCondition { public static void main(String[] args) { new ThreeCondition().init(); } private void init() { final Business b = new Business(); new Thread() { public void run() { for (int i = 0; i < 50; i++) b.sub1(); } }.start(); new Thread() { public void run() { for (int i = 0; i < 50; i++) b.sub2(); } }.start(); new Thread() { public void run() { for (int i = 0; i < 50; i++) b.sub3(); } }.start(); } private class Business { int status = 1; Lock lock = new ReentrantLock(); Condition cond1 = lock.newCondition(); Condition cond2 = lock.newCondition(); Condition cond3 = lock.newCondition(); public void sub1() { lock.lock(); while (status != 1) { try { cond1.await(); } catch (Exception e) { } } for (int i = 1; i <= 5; i++) { try { Thread.sleep(200); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() + ":" + i); } status = 2; cond2.signal(); lock.unlock(); } public void sub2() { lock.lock(); while (status != 2) { try { cond2.await(); } catch (Exception e) { } } for (int i = 1; i <= 10; i++) { try { Thread.sleep(200); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() + ":" + i); } status = 3; cond3.signal(); lock.unlock(); } public void sub3() { lock.lock(); while (status != 3) { try { cond3.await(); } catch (Exception e) { } } for (int i = 1; i <= 10; i++) { try { Thread.sleep(200); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() + ":" + i); } status = 1; cond1.signal(); lock.unlock(); } } }
Semaphore能夠維護當前訪問自身的線程個數,並提供了同步機制。使用Semaphore能夠控制同時訪問資源的線程個數,例如,實現一個文件容許的併發訪問數。
單個信號量的Semaphore對象能夠實現互斥鎖的功能,而且能夠是由一個線程得到了「鎖」,再由另外一個線程釋放「鎖」,這可應用於死鎖恢復的一些場合。
構造方法摘要 |
||||
Semaphore(int permits) 建立具備給定的許可數和非公平的公平設置的 Semaphore。 |
||||
Semaphore(int permits, boolean fair) 建立具備給定的許可數和給定的公平設置的 Semaphore。 |
||||
方法摘要 |
||||
void |
acquire() 今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被中斷。 |
|||
void |
acquire(int permits) 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。 |
|||
void |
acquireUninterruptibly() 今後信號量中獲取許可,在有可用的許可前將其阻塞。 |
|||
void |
acquireUninterruptibly(int permits) 今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。 |
|||
int |
availablePermits() 返回此信號量中當前可用的許可數。 |
|||
int |
drainPermits() 獲取並返回當即可用的全部許可。 |
|||
protected Collection<Thread> |
getQueuedThreads() 返回一個 collection,包含可能等待獲取的線程。 |
|||
int |
getQueueLength() 返回正在等待獲取的線程的估計數目。 |
|||
boolean |
hasQueuedThreads() 查詢是否有線程正在等待獲取。 |
|||
boolean |
isFair() 若是此信號量的公平設置爲 true,則返回 true。 |
|||
protected void |
reducePermits(int reduction) 根據指定的縮減量減少可用許可的數目。 |
|||
void |
release() 釋放一個許可,將其返回給信號量。 |
|||
void |
release(int permits) 釋放給定數目的許可,將其返回到信號量。 |
|||
String |
toString() 返回標識此信號量的字符串,以及信號量的狀態。 |
|||
boolean |
tryAcquire() 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。 |
|||
boolean |
tryAcquire(int permits) 僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。 |
|||
boolean |
tryAcquire(int permits, long timeout, TimeUnit unit) 若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被中斷,則今後信號量獲取給定數目的許可。 |
|||
boolean |
tryAcquire(long timeout, TimeUnit unit) 若是在給定的等待時間內,此信號量有可用的許可而且當前線程未被中斷,則今後信號量獲取一個許可。 |
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3); for(int i=0;i<10;i++){ Runnable runnable = new Runnable(){ public void run(){ try { sp.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() +
"進入,當前已有" + (3-sp.availablePermits()) + "個併發"); try { Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程" + Thread.currentThread().getName() +
"即將離開"); sp.release(); //下面代碼有時候執行不許確,由於其沒有和上面的代碼合成原子單元
System.out.println("線程" + Thread.currentThread().getName() +
"已離開,當前已有" + (3-sp.availablePermits()) + "個併發"); } }; service.execute(runnable); } } }
一個同步輔助類,它容許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 頗有用。由於該 barrier 在釋放等待線程後能夠重用,因此稱它爲循環 的 barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最後一個線程到達以後(但在釋放全部線程以前),該命令只在每一個屏障點運行一次。若在繼續全部參與線程以前更新共享狀態,此屏障操做 頗有用。
構造方法摘要 |
|
CyclicBarrier(int parties) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,但它不會在啓動 barrier 時執行預約義的操做。 |
|
CyclicBarrier(int parties, Runnable barrierAction) 建立一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處於等待狀態時啓動,並在啓動 barrier 時執行給定的屏障操做,該操做由最後一個進入 barrier 的線程執行。 |
|
方法摘要 |
|
int |
await() 在全部參與者都已經在此 barrier 上調用 await 方法以前,將一直等待。 |
int |
await(long timeout, TimeUnit unit) 在全部參與者都已經在此屏障上調用 await 方法以前將一直等待,或者超出了指定的等待時間。 |
int |
getNumberWaiting() 返回當前在屏障處等待的參與者數目。 |
int |
getParties() 返回要求啓動此 barrier 的參與者數目。 |
boolean |
isBroken() 查詢此屏障是否處於損壞狀態。 |
void |
reset() 將屏障重置爲其初始狀態。 |
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() +
"即將到達集合地點1,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() +
"即將到達集合地點2,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() +
"即將到達集合地點3,當前已有" + cb.getNumberWaiting() + "個已經到達,正在等候"); cb.await(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } service.shutdown(); } }
一個同步輔助類,在完成一組正在其餘線程中執行的操做以前,它容許一個或多個線程一直等待。 用給定的計數 初始化 CountDownLatch。因爲調用了 countDown() 方法,因此在當前計數到達零以前,await 方法會一直受阻塞。以後,會釋放全部等待的線程,await 的全部後續調用都將當即返回。這種現象只出現一次——計數沒法被重置。若是須要重置計數,請考慮使用 CyclicBarrier。
CountDownLatch 是一個通用同步工具,它有不少用途。將計數 1 初始化的 CountDownLatch 用做一個簡單的開/關鎖存器,或入口:在經過調用 countDown() 的線程打開入口前,全部調用 await 的線程都一直在入口處等待。用 N 初始化的 CountDownLatch 可使一個線程在 N 個線程完成某項操做以前一直等待,或者使其在某項操做完成 N 次以前一直等待。
CountDownLatch 的一個有用特性是,它不要求調用 countDown 方法的線程等到計數到達零時才繼續,而在全部線程都能經過以前,它只是阻止任何線程繼續經過一個 await。
猶如倒計時計數器,調用CountDownLatch對象的countDown方法就將計數器減1,當計數到達0時,則全部等待者或單個等待者開始執行。
構造方法摘要 |
|
CountDownLatch(int count) 構造一個用給定計數初始化的 CountDownLatch。 |
|
方法摘要 |
|
void |
await() 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷。 |
boolean |
await(long timeout, TimeUnit unit) 使當前線程在鎖存器倒計數至零以前一直等待,除非線程被中斷或超出了指定的等待時間。 |
void |
countDown() 遞減鎖存器的計數,若是計數到達零,則釋放全部等待的線程。 |
long |
getCount() 返回當前計數。 |
String |
toString() 返回標識此鎖存器及其狀態的字符串。 |
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CountdownLatchTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CountDownLatch cdOrder = new CountDownLatch(1); final CountDownLatch cdAnswer = new CountDownLatch(3); for(int i=0;i<3;i++){ Runnable runnable = new Runnable(){ public void run(){ try { System.out.println("線程" + Thread.currentThread().getName() + "正準備接受命令"); cdOrder.await(); System.out.println("線程" + Thread.currentThread().getName() + "已接受命令"); Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "迴應命令處理結果"); cdAnswer.countDown(); } catch (Exception e) { e.printStackTrace(); } } }; service.execute(runnable); } try { Thread.sleep((long)(Math.random()*10000)); System.out.println("線程" + Thread.currentThread().getName() + "即將發佈命令"); cdOrder.countDown(); System.out.println("線程" + Thread.currentThread().getName() + "已發送命令,正在等待結果"); cdAnswer.await(); System.out.println("線程" + Thread.currentThread().getName() + "已收到全部響應結果"); } catch (Exception e) { e.printStackTrace(); } service.shutdown(); } }
用於實現兩我的之間的數據交換,每一個人在完成必定的事務後想與對方交換數據,第一個先拿出數據的人將一直等待第二我的拿着數據到來時,才能彼此交換數據。
構造方法摘要 |
|
Exchanger() 建立一個新的 Exchanger。 |
|
方法摘要 |
|
V |
exchange(V x) 等待另外一個線程到達此交換點(除非當前線程被中斷),而後將給定的對象傳送給該線程,並接收該線程的對象。 |
V |
exchange(V x, long timeout, TimeUnit unit) 等待另外一個線程到達此交換點(除非當前線程被中斷,或者超出了指定的等待時間),而後將給定的對象傳送給該線程,同時接收該線程的對象。 |
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Exchanger exchanger = new Exchanger(); service.execute(new Runnable(){ public void run() { try { Thread.sleep((long)(Math.random()*10000)); String data1 = "zxx"; System.out.println("線程" + Thread.currentThread().getName() + "正在把數據" + data1 +"換出去"); String data2 = (String)exchanger.exchange(data1); System.out.println("線程" + Thread.currentThread().getName() + "換回的數據爲" + data2); }catch(Exception e){ } } }); service.execute(new Runnable(){ public void run() { try { Thread.sleep((long)(Math.random()*10000)); String data1 = "lhm"; System.out.println("線程" + Thread.currentThread().getName() + "正在把數據" + data1 +"換出去"); String data2 = (String)exchanger.exchange(data1); System.out.println("線程" + Thread.currentThread().getName() + "換回的數據爲" + data2); }catch(Exception e){ } } }); } }
在併發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue爲表明的高性能隊列非阻塞隊列,一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queue。
阻塞隊列與普通隊列的區別在於,當隊列是空的時,從隊列中獲取元素的操做將會被阻塞,或者當隊列是滿時,往隊列裏添加元素的操做會被阻塞。試圖從空的阻塞隊列中獲取元素的線程將會被阻塞,直到其餘的線程往空的隊列插入新的元素。一樣,試圖往已滿的阻塞隊列中添加新元素的線程一樣也會被阻塞,直到其餘的線程使隊列從新變得空閒起來,如從隊列中移除一個或者多個元素,或者徹底清空隊列.
1.ArrayDeque, (數組雙端隊列)
2.PriorityQueue, (優先級隊列)
3.ConcurrentLinkedQueue, (基於鏈表的併發隊列)
4.DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口)
5.ArrayBlockingQueue, (基於數組的併發阻塞隊列)
6.LinkedBlockingQueue, (基於鏈表的FIFO阻塞隊列)
7.LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列)
8.PriorityBlockingQueue, (帶優先級的無界阻塞隊列)
9.SynchronousQueue (併發同步阻塞隊列)
ConcurrentLinkedQueue : 是一個適用於高併發場景下的隊列,經過無鎖的方式,實現了高併發狀態下的高性能,一般ConcurrentLinkedQueue性能好於BlockingQueue.它是一個基於連接節點的無界線程安全隊列。該隊列的元素遵循先進先出的原則。頭是最早加入的,尾是最近加入的,該隊列不容許null元素。
ConcurrentLinkedQueue重要方法:
add 和offer() 都是加入元素的方法(在ConcurrentLinkedQueue中這倆個方法沒有任何區別)poll() 和peek() 都是取頭元素節點,區別在於前者會刪除元素,後者不會。
ConcurrentLinkedDeque q = new ConcurrentLinkedDeque(); q.offer("a"); q.offer("b"); q.offer("c"); q.offer("d"); q.offer("e"); //從頭獲取元素,刪除該元素
System.out.println(q.poll()); //從頭獲取元素,不刪除該元素
System.out.println(q.peek()); //獲取總長度
System.out.println(q.size());
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:
在隊列爲空時,獲取元素的線程會等待隊列變爲非空。
當隊列滿時,存儲元素的線程會等待隊列可用。
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。
BlockingQueue即阻塞隊列,從阻塞這個詞能夠看出,在某些狀況下對阻塞隊列的訪問可能會形成阻塞。被阻塞的狀況主要有以下兩種:
1. 當隊列滿了的時候進行入隊列操做
2. 當隊列空了的時候進行出隊列操做
所以,當一個線程試圖對一個已經滿了的隊列進行入隊列操做時,它將會被阻塞,除非有另外一個線程作了出隊列操做;一樣,當一個線程試圖對一個空隊列進行出隊列操做時,它將會被阻塞,除非有另外一個線程進行了入隊列操做。
在Java中,BlockingQueue的接口位於java.util.concurrent 包中(在Java5版本開始提供),由上面介紹的阻塞隊列的特性可知,阻塞隊列是線程安全的。
在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲快速搭建高質量的多線程程序帶來極大的便利。。
經常使用的隊列主要有如下兩種:(固然經過不一樣的實現方式,還能夠延伸出不少不一樣類型的隊列,DelayQueue就是其中的一種)
先進先出(FIFO):先插入的隊列的元素也最早出隊列,相似於排隊的功能。從某種程度上來講這種隊列也體現了一種公平性。
後進先出(LIFO):後插入隊列的元素最早出隊列,這種隊列優先處理最近發生的事件。
多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發佈之前,在多線程環境下,咱們每一個程序員都必須去本身控制這些細節,尤爲還要兼顧效率和線程安全,而這會給咱們的程序帶來不小的複雜度。好在此時,強大的concurrent包橫空出世了,而他也給咱們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒)
ArrayBlockingQueue是一個有邊界的阻塞隊列,它的內部實現是一個數組。有邊界的意思是它的容量是有限的,咱們必須在其初始化的時候指定它的容量大小,容量大小一旦指定就不可改變。
ArrayBlockingQueue是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使用ArrayBlockingQueue的例子:
<String> arrays = new ArrayBlockingQueue<String>(3); arrays.add("李四");
arrays.add("張軍"); arrays.add("張軍"); // 添加阻塞隊列
arrays.offer("張三", 1, TimeUnit.SECONDS);
LinkedBlockingQueue阻塞隊列大小的配置是可選的,若是咱們初始化時指定一個大小,它就是有邊界的,若是不指定,它就是無邊界的。說是無邊界,實際上是採用了默認大小爲Integer.MAX_VALUE的容量 。它的內部實現是一個鏈表。
和ArrayBlockingQueue同樣,LinkedBlockingQueue 也是以先進先出的方式存儲數據,最新插入的對象是尾部,最新移出的對象是頭部。下面是一個初始化和使LinkedBlockingQueue的例子:
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(3); linkedBlockingQueue.add("張三"); linkedBlockingQueue.add("李四"); linkedBlockingQueue.add("李四"); System.out.println(linkedBlockingQueue.size());
PriorityBlockingQueue是一個沒有邊界的隊列,它的排序規則和 java.util.PriorityQueue同樣。須要注意,PriorityBlockingQueue中容許插入null對象。全部插入PriorityBlockingQueue的對象必須實現 java.lang.Comparable接口,隊列優先級的排序規則就 是按照咱們對這個接口的實現來定義的。另外,咱們能夠從PriorityBlockingQueue得到一個迭代器Iterator,但這個迭代器並不保證按照優先級順 序進行迭代。
下面舉個例子來講明一下,首先定義一個對象類型,這個對象須要實現Comparable接口:
SynchronousQueue隊列內部僅容許容納一個元素。當一個線程插入一個元素後會被阻塞,除非這個元素被另外一個線程消費。
class ProducerThread implements Runnable { private BlockingQueue<String> blockingQueue; private AtomicInteger count = new AtomicInteger(); private volatile boolean FLAG = true; public ProducerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "生產者開始啓動...."); while (FLAG) { String data = count.incrementAndGet() + ""; try { boolean offer = blockingQueue.offer(data, 2, TimeUnit.SECONDS); if (offer) { System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "成功.."); } else { System.out.println(Thread.currentThread().getName() + ",生產隊列" + data + "失敗.."); } Thread.sleep(1000); } catch (Exception e) { } } System.out.println(Thread.currentThread().getName() + ",生產者線程中止..."); } public void stop() { this.FLAG = false; } } class ConsumerThread implements Runnable { private volatile boolean FLAG = true; private BlockingQueue<String> blockingQueue; public ConsumerThread(BlockingQueue<String> blockingQueue) { this.blockingQueue = blockingQueue; } @Override public void run() { System.out.println(Thread.currentThread().getName() + "消費者開始啓動...."); while (FLAG) { try { String data = blockingQueue.poll(2, TimeUnit.SECONDS); if (data == null || data == "") { FLAG = false; System.out.println("消費者超過2秒時間未獲取到消息."); return; } System.out.println("消費者獲取到隊列信息成功,data:" + data); } catch (Exception e) { // TODO: handle exception
} } } } public class Test0008 { public static void main(String[] args) { BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(3); ProducerThread producerThread = new ProducerThread(blockingQueue); ConsumerThread consumerThread = new ConsumerThread(blockingQueue); Thread t1 = new Thread(producerThread); Thread t2 = new Thread(consumerThread); t1.start(); t2.start(); //10秒後 中止線程..
try { Thread.sleep(10*1000); producerThread.stop(); } catch (Exception e) { // TODO: handle exception
} } }
拋出異常 |
特殊值 |
阻塞 |
超時 |
|
插入 |
add(e) |
offer(e) |
put(e) |
offer(e, time, unit) |
移除 |
remove() |
poll() |
take() |
poll(time, unit) |
檢查 |
element() |
peek() |
不可用 |
不可用 |
構造方法摘要 |
|
ArrayBlockingQueue(int capacity) 建立一個帶有給定的(固定)容量和默認訪問策略的 ArrayBlockingQueue。 |
|
ArrayBlockingQueue(int capacity, boolean fair) 建立一個具備給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue。 |
|
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) 建立一個具備給定的(固定)容量和指定訪問策略的 ArrayBlockingQueue,它最初包含給定 collection 的元素,並以 collection 迭代器的遍歷順序添加元素。 |
|
方法摘要 |
|
boolean |
add(E e) 將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量),在成功時返回 true,若是此隊列已滿,則拋出 IllegalStateException。 |
void |
clear() 自動移除此隊列中的全部元素。 |
boolean |
contains(Object o) 若是此隊列包含指定的元素,則返回 true。 |
int |
drainTo(Collection<? super E> c) 移除此隊列中全部可用的元素,並將它們添加到給定 collection 中。 |
int |
drainTo(Collection<? super E> c, int maxElements) 最多今後隊列中移除給定數量的可用元素,並將這些元素添加到給定 collection 中。 |
Iterator<E> |
iterator() 返回在此隊列中的元素上按適當順序進行迭代的迭代器。 |
boolean |
offer(E e) 將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量),在成功時返回 true,若是此隊列已滿,則返回 false。 |
boolean |
offer(E e, long timeout, TimeUnit unit) 將指定的元素插入此隊列的尾部,若是該隊列已滿,則在到達指定的等待時間以前等待可用的空間。 |
E |
peek() 獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。 |
E |
poll() 獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。 |
E |
poll(long timeout, TimeUnit unit) 獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(若是有必要)。 |
void |
put(E e) 將指定的元素插入此隊列的尾部,若是該隊列已滿,則等待可用的空間。 |
int |
remainingCapacity() 返回在無阻塞的理想狀況下(不存在內存或資源約束)此隊列能接受的其餘元素數量。 |
boolean |
remove(Object o) 今後隊列中移除指定元素的單個實例(若是存在)。 |
int |
size() 返回此隊列中元素的數量。 |
E |
take() 獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要)。 |
Object[] |
toArray() 返回一個按適當順序包含此隊列中全部元素的數組。 |
<T> T[] |
toArray(T[] a) 返回一個按適當順序包含此隊列中全部元素的數組;返回數組的運行時類型是指定數組的運行時類型。 |
String |
toString() 返回此 collection 的字符串表示形式。 |
阻塞隊列的實現原理(Condition鎖中有提到await signal)
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { public static void main(String[] args) { final BlockingQueue queue = new ArrayBlockingQueue(3); for(int i=0;i<2;i++){ new Thread(){ public void run(){ while(true){ try { Thread.sleep((long)(Math.random()*1000)); System.out.println(Thread.currentThread().getName() + "準備放數據!"); queue.put(1); System.out.println(Thread.currentThread().getName() + "已經放了數據," + "隊列目前有" + queue.size() + "個數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } new Thread(){ public void run(){ while(true){ try { //將此處的睡眠時間分別改成100和1000,觀察運行結果
Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "準備取數據!"); queue.take(); System.out.println(Thread.currentThread().getName() + "已經取走數據," + "隊列目前有" + queue.size() + "個數據"); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); } }
Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。在開發過程當中,合理地使用線程池可以帶來3個好處。
第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
第三:提升線程的可管理性。線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,
還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須對其實現原理了如指掌。
線程池是爲忽然大量爆發的線程設計的,經過有限的幾個固定線程爲大量的操做服務,減小了建立和銷燬線程所需的時間,從而提升效率。
若是一個線程的時間很是長,就不必用線程池了(不是不能做長時間操做,而是不宜。),何況咱們還不能控制線程池中線程的開始、掛起、和停止。
Java是天生就支持併發的語言,支持併發意味着多線程,線程的頻繁建立在高併發及大數據量是很是消耗資源的,由於java提供了線程池。在jdk1.5之前的版本中,線程池的使用是及其簡陋的,可是在JDK1.5後,有了很大的改善。JDK1.5以後加入了java.util.concurrent包,java.util.concurrent包的加入給予開發人員開發併發程序以及解決併發問題很大的幫助。這篇文章主要介紹下併發包下的Executor接口,Executor接口雖然做爲一個很是舊的接口(JDK1.5 2004年發佈),可是不少程序員對於其中的一些原理仍是不熟悉,所以寫這篇文章來介紹下Executor接口,同時鞏固下本身的知識。若是文章中有出現錯誤,歡迎你們指出。
Executor框架的最頂層實現是ThreadPoolExecutor類,Executors工廠類中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其實也只是ThreadPoolExecutor的構造函數參數不一樣而已。經過傳入不一樣的參數,就能夠構造出適用於不一樣應用場景下的線程池,那麼它的底層原理是怎樣實現的呢,這篇就來介紹下ThreadPoolExecutor線程池的運行過程。
corePoolSize: 核心池的大小。 當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中
maximumPoolSize: 線程池最大線程數,它表示在線程池中最多能建立多少個線程;
keepAliveTime: 表示線程沒有任務執行時最多保持多久時間會終止。
unit: 參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
Java經過Executors(jdk1.5併發包)提供四種線程池,分別爲:
newCachedThreadPool建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
newFixedThreadPool 建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。
newScheduledThreadPool 建立一個定長線程池,支持定時及週期性任務執行。
newSingleThreadExecutor 建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。
建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。示例代碼以下:
// 無限大小線程池 jvm自動回收
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int temp = i; newCachedThreadPool.execute(new Runnable() { @Override public void run() { try { Thread.sleep(100); } catch (Exception e) { // TODO: handle exception
} System.out.println(Thread.currentThread().getName() + ",i:" + temp); } }); }
總結: 線程池爲無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程。
建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。示例代碼以下:
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int temp = i; newFixedThreadPool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getId() + ",i:" + temp); } }); }
總結:由於線程池大小爲3,每一個任務輸出index後sleep 2秒,因此每兩秒打印3個數字。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()
建立一個定長線程池,支持定時及週期性任務執行。延遲執行示例代碼以下:
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5); for (int i = 0; i < 10; i++) { final int temp = i; newScheduledThreadPool.schedule(new Runnable() { public void run() { System.out.println("i:" + temp); } }, 3, TimeUnit.SECONDS); }
表示延遲3秒執行。
建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。示例代碼以下:
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; newSingleThreadExecutor.execute(new Runnable() { @Override public void run() { System.out.println("index:" + index); try { Thread.sleep(200); } catch (Exception e) { // TODO: handle exception
} } }); }
注意: 結果依次輸出,至關於順序執行各個任務。
提交一個任務到線程池中,線程池的處理流程以下:
一、判斷線程池裏的核心線程是否都在執行任務,若是不是(核心線程空閒或者還有核心線程沒有被建立)則建立一個新的工做線程來執行任務。若是核心線程都在執行任務,則進入下個流程。
二、線程池判斷工做隊列是否已滿,若是工做隊列沒有滿,則將新提交的任務存儲在這個工做隊列裏。若是工做隊列滿了,則進入下個流程。
三、判斷線程池裏的線程是否都處於工做狀態,若是沒有,則建立一個新的工做線程來執行任務。若是已經滿了,則交給飽和策略來處理這個任務。
若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
若是隊列已經滿了,則在總線程數不大於maximumPoolSize的前提下,則建立新的線程
若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。
public class Test0007 { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3)); for (int i = 1; i <= 6; i++) { TaskThred t1 = new TaskThred("任務" + i); executor.execute(t1); } executor.shutdown(); } } class TaskThred implements Runnable { private String taskName; public TaskThred(String taskName) { this.taskName = taskName; } @Override public void run() { System.out.println(Thread.currentThread().getName()+taskName); } }
CPU密集的意思是該任務須要大量的運算,而沒有阻塞,CPU一直全速運行。
CPU密集任務只有在真正的多核CPU上纔可能獲得加速(經過多線程),而在單核CPU上,不管你開幾個模擬的多線程,該任務都不可能獲得加速,由於CPU總的運算能力就那些。
IO密集型,即該任務須要大量的IO,即大量的阻塞。在單線程上運行IO密集型的任務會致使浪費大量的CPU運算能力浪費在等待。因此在IO密集型任務中使用多線程能夠大大的加速程序運行,即時在單核CPU上,這種加速主要就是利用了被浪費掉的阻塞時間。
要想合理的配置線程池的大小,首先得分析任務的特性,能夠從如下幾個角度分析:
1. 任務的性質:CPU密集型任務、IO密集型任務、混合型任務。
2. 任務的優先級:高、中、低。
3. 任務的執行時間:長、中、短。
4. 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接等。
性質不一樣的任務能夠交給不一樣規模的線程池執行。
對於不一樣性質的任務來講,CPU密集型任務應配置儘量小的線程,如配置CPU個數+1的線程數,IO密集型任務應配置儘量多的線程,由於IO操做不佔用CPU,不要讓CPU閒下來,應加大線程數量,如配置兩倍CPU個數+1,而對於混合型的任務,若是能夠拆分,拆分紅IO密集型和CPU密集型分別處理,前提是二者運行的時間是差很少的,若是處理時間相差很大,則不必拆分了。
若任務對其餘系統資源有依賴,如某個任務依賴數據庫的鏈接返回的結果,這時候等待的時間越長,則CPU空閒的時間越長,那麼線程數量應設置得越大,才能更好的利用CPU。
固然具體合理線程池值大小,須要結合系統實際狀況,在大量的嘗試下比較才能得出,以上只是前人總結的規律。
最佳線程數目 = ((線程等待時間+線程CPU時間)/線程CPU時間 )* CPU數目
好比平均每一個線程CPU運行時間爲0.5s,而線程等待時間(非CPU運行時間,好比IO)爲1.5s,CPU核心數爲8,那麼根據上面這個公式估算獲得:((0.5+1.5)/0.5)*8=32。這個公式進一步轉化爲:
最佳線程數目 = (線程等待時間與線程CPU時間之比 + 1)* CPU數目
能夠得出一個結論:
線程等待時間所佔比例越高,須要越多線程。線程CPU時間所佔比例越高,須要越少線程。以上公式與以前的CPU和IO密集型任務設置線程數基本吻合。
CPU密集型時,任務能夠少配置線程數,大概和機器的cpu核數至關,這樣可使得每一個線程都在執行任務IO密集型時,大部分線程都阻塞,故須要多配置線程數,2*cpu核數
操做系統之名稱解釋:
某些進程花費了絕大多數時間在計算上,而其餘則在等待I/O上花費了大可能是時間,前者稱爲計算密集型(CPU密集型)computer-bound,後者稱爲I/O密集型,I/O-bound。
當多個請求同時操做數據庫時,首先將訂單狀態改成已支付,在金額加上200,在同時併發場景查詢條件下,會形成重複通知。
悲觀鎖:悲觀鎖悲觀的認爲每一次操做都會形成更新丟失問題,在每次查詢時加上排他鎖。
每次去拿數據的時候都認爲別人會修改,因此每次在拿數據的時候都會上鎖,這樣別人想拿這個數據就會block直到它拿到鎖。傳統的關係型數據庫裏邊就用到了不少這種鎖機制,好比行鎖,表鎖等,讀鎖,寫鎖等,都是在作操做以前先上鎖。
Select * from xxx for update;
樂觀鎖:樂觀鎖會樂觀的認爲每次查詢都不會形成更新丟失,利用版本字段控制
鎖做爲併發共享數據,保證一致性的工具,在JAVA平臺有多種實現(如 synchronized 和 ReentrantLock等等 ) 。這些已經寫好提供的鎖爲咱們開發提供了便利。
重入鎖,也叫作遞歸鎖,指的是同一線程 外層函數得到鎖以後 ,內層遞歸函數仍然有獲取該鎖的代碼,但不受影響。
在JAVA環境下 ReentrantLock 和synchronized 都是可重入鎖
public class Test implements Runnable { public synchronized void get() { System.out.println("name:" + Thread.currentThread().getName() + " get();"); set(); } public synchronized void set() { System.out.println("name:" + Thread.currentThread().getName() + " set();"); } @Override public void run() { get(); } public static void main(String[] args) { Test ss = new Test(); new Thread(ss).start(); new Thread(ss).start(); new Thread(ss).start(); new Thread(ss).start(); } }
public class Test02 extends Thread { ReentrantLock lock = new ReentrantLock(); public void get() { lock.lock(); System.out.println(Thread.currentThread().getId()); set(); lock.unlock(); } public void set() { lock.lock(); System.out.println(Thread.currentThread().getId()); lock.unlock(); } @Override public void run() { get(); } public static void main(String[] args) { Test ss = new Test(); new Thread(ss).start(); new Thread(ss).start(); new Thread(ss).start(); } }
(1)與鎖相比,使用比較交換(下文簡稱CAS)會使程序看起來更加複雜一些。但因爲其非阻塞性,它對死鎖問題天生免疫,而且,線程間的相互影響也遠遠比基於鎖的方式要小。更爲重要的是,使用無鎖的方式徹底沒有鎖競爭帶來的系統開銷,也沒有線程間頻繁調度帶來的開銷,所以,它要比基於鎖的方式擁有更優越的性能。
(2)無鎖的好處:
第一,在高併發的狀況下,它比有鎖的程序擁有更好的性能;
第二,它天生就是死鎖免疫的。
就憑藉這兩個優點,就值得咱們冒險嘗試使用無鎖的併發。
(3)CAS算法的過程是這樣:它包含三個參數CAS(V,E,N): V表示要更新的變量,E表示預期值,N表示新值。僅當V值等於E值時,纔會將V的值設爲N,若是V值和E值不一樣,則說明已經有其餘線程作了更新,則當前線程什麼都不作。最後,CAS返回當前V的真實值。
(4)CAS操做是抱着樂觀的態度進行的,它老是認爲本身能夠成功完成操做。當多個線程同時使用CAS操做一個變量時,只有一個會勝出,併成功更新,其他均會失敗。失敗的線程不會被掛起,僅是被告知失敗,而且容許再次嘗試,固然也容許失敗的線程放棄操做。基於這樣的原理,CAS操做即便沒有鎖,也能夠發現其餘線程對當前線程的干擾,並進行恰當的處理。
(5)簡單地說,CAS須要你額外給出一個指望值,也就是你認爲這個變量如今應該是什麼樣子的。若是變量不是你想象的那樣,那說明它已經被別人修改過了。你就從新讀取,再次嘗試修改就行了。
(6)在硬件層面,大部分的現代處理器都已經支持原子化的CAS指令。在JDK 5.0之後,虛擬機即可以使用這個指令來實現併發操做和併發數據結構,而且,這種操做在虛擬機中能夠說是無處不在。
/** * Atomically increments by one the current value. * * @return the updated value */
public final int incrementAndGet() { for (;;) { //獲取當前值
int current = get(); //設置指望值
int next = current + 1; //調用Native方法compareAndSet,執行CAS操做
if (compareAndSet(current, next)) //成功後纔會返回指望值,不然無線循環
return next; } }
自旋鎖是採用讓當前線程不停地的在循環體內執行實現的,當循環的條件被其餘線程改變時 才能進入臨界區。以下
private AtomicReference<Thread> sign =new AtomicReference<>(); public void lock() { Thread current = Thread.currentThread(); while (!sign.compareAndSet(null, current)) { } } public void unlock() { Thread current = Thread.currentThread(); sign.compareAndSet(current, null); }
public class Test implements Runnable { static int sum; private SpinLock lock; public Test(SpinLock lock) { this.lock = lock; } /** * @param args * @throws InterruptedException */
public static void main(String[] args) throws InterruptedException { SpinLock lock = new SpinLock(); for (int i = 0; i < 100; i++) { Test test = new Test(lock); Thread t = new Thread(test); t.start(); } Thread.currentThread().sleep(1000); System.out.println(sum); } @Override public void run() { this.lock.lock(); this.lock.lock(); sum++; this.lock.unlock(); this.lock.unlock(); } }
當一個線程 調用這個不可重入的自旋鎖去加鎖的時候沒問題,當再次調用lock()的時候,由於自旋鎖的持有引用已經不爲空了,該線程對象會誤認爲是別人的線程持有了自旋鎖
使用了CAS原子操做,lock函數將owner設置爲當前線程,而且預測原來的值爲空。unlock函數將owner設置爲null,而且預測值爲當前線程。
當有第二個線程調用lock操做時因爲owner值不爲空,致使循環一直被執行,直至第一個線程調用unlock函數將owner設置爲null,第二個線程才能進入臨界區。
因爲自旋鎖只是將當前線程不停地執行循環體,不進行線程狀態的改變,因此響應速度更快。但當線程數不停增長時,性能降低明顯,由於每一個線程都須要執行,佔用CPU時間。若是線程競爭不激烈,而且保持鎖的時間段。適合使用自旋鎖。
若是想在不一樣的jvm中保證數據同步,使用分佈式鎖技術。
有數據庫實現、緩存實現、Zookeeper分佈式鎖