點擊藍色「程序員cxuan 」關注我喲java
加個「星標」,歡迎來撩程序員
本文的組織形式以下,主要會介紹到同步容器類,操做系統的併發工具,Java 開發工具包(只是簡單介紹一下,後面會有源碼分析)。同步工具類有哪些。
web
下面咱們就來介紹一下 Java 併發中都涉及哪些模塊,這些併發模塊都是 Java 併發類庫所提供的。
面試
同步容器類
同步容器主要包括兩類,一種是原本就是線程安全實現的容器,這類容器有 Vector、Hashtable、Stack,這類容器的方法上都加了 synchronized
鎖,是線程安全的實現。數據庫
「Vector、Hashtable、Stack 這些容器咱們如今幾乎都不在使用,由於這些容器在多線程環境下的效率不高。編程
還有一類是由 Collections.synchronizedxxx
實現的非線程安全的容器,使用 Collections.synchronized 會把它們封裝起來編程線程安全的容器,舉出兩個例子數組
-
Collections.synchronizedList -
Collections.synchronizedMap
咱們能夠經過 Collections 源碼能夠看出這些線程安全的實現安全
要不爲啥要稱 Collections 爲集合工具類呢?Collections 會把這些容器類的狀態封裝起來,並對每一個同步方法進行同步,使得每次只有一個線程可以訪問容器的狀態。
服務器
其中每一個 synchronized xxx
都是至關於建立了一個靜態內部類。微信
雖然同步容器類都是線程安全的,可是在某些狀況下須要額外的客戶端加鎖來保證一些複合操做的安全性,複合操做就是有兩個及以上的方法組成的操做,好比最典型的就是 若沒有則添加
,用僞代碼表示則是
if(a == null){
a = get();
}
好比能夠用來判斷 Map 中是否有某個 key,若是沒有則添加進 Map 中。這些複合操做在沒有客戶端加鎖的狀況下是線程安全的,可是當多個線程併發修改容器時,可能會表現出意料以外的行爲。例以下面這段代碼
public class TestVector implements Runnable{
static Vector vector = new Vector();
static void addVector(){
for(int i = 0;i < 10000;i++){
vector.add(i);
}
}
static Object getVector(){
int index = vector.size() - 1;
return vector.get(index);
}
static void removeVector(){
int index = vector.size() - 1;
vector.remove(index);
}
@Override
public void run() {
getVector();
}
public static void main(String[] args) {
TestVector testVector = new TestVector();
testVector.addVector();
Thread t1 = new Thread(() -> {
for(int i = 0;i < vector.size();i++){
getVector();
}
});
Thread t2 = new Thread(() -> {
for(int i = 0;i < vector.size();i++){
removeVector();
}
});
t1.start();
t2.start();
}
}
這些方法看似沒有問題,由於 Vector 可以保證線程安全性,不管多少個線程訪問 Vector 也不會形成 Vector 的內部產生破壞,可是從整個系統來講,是存在線程安全性的,事實上你運行一下,也會發現報錯。
會出現
若是線程 A 在包含這麼多元素的基礎上調用 getVector
方法,會獲得一個數值,getVector 只是取得該元素,而並非從 vector 中移除,removeVector
方法是獲得一個元素進行移除,這段代碼的不安全因素就是,由於線程的時間片是亂序的,並且 getVector 和 removeVector 並不會保證互斥,因此在 removeVector 方法把某個值好比 6666 移除後,vector 中就不存在這個 6666 的元素,此時 getVector 方法取得 6666 ,就會拋出數組越界異常。爲何是數組越界異常呢?能夠看一下 vector 的源碼
若是用圖表示的話,則會是下面這樣。
因此,從系統的層面來看,上面這段代碼也要保證線程安全性才能夠,也就是在客戶端加鎖
實現,只要咱們讓複合操做使用一把鎖,那麼這些操做就和其餘單獨的操做同樣都是原子性的。以下面例子所示
static Object getVector(){
synchronized (vector){
int index = vector.size() - 1;
return vector.get(index);
}
}
static void removeVector(){
synchronized (vector) {
int index = vector.size() - 1;
vector.remove(index);
}
}
也能夠經過鎖住 .class
來保證原子性操做,也能達到一樣的效果。
static Object getVector(){
synchronized (TestVector.class){
int index = vector.size() - 1;
return vector.get(index);
}
}
static void removeVector(){
synchronized (TestVector.class) {
int index = vector.size() - 1;
vector.remove(index);
}
}
在調用 size 和 get 之間,Vector 的長度可能會發生變化,這種變化在對 Vector 進行排序時出現,以下所示
for(int i = 0;i< vector.size();i++){
doSomething(vector.get(i));
}
這種迭代的操做正確性取決於運氣,即在調用 size 和 get 之間會修改 Vector,在單線程環境中,這種假設徹底成立,可是再有其餘線程併發修改 Vector 時,則可能會致使麻煩。
咱們仍舊能夠經過客戶端加鎖的方式來避免這種狀況
synchronized(vector){
for(int i = 0;i< vector.size();i++){
doSomething(vector.get(i));
}
}
這種方式爲客戶端的可靠性提供了保證,可是犧牲了伸縮性,並且這種在遍歷過程當中進行加鎖,也不是咱們所但願看到的。
fail-fast
針對上面這種狀況,不少集合類都提供了一種 fail-fast
機制,由於大部分集合內部都是使用 Iterator 進行遍歷,在循環中使用同步鎖的開銷會很大,而 Iterator 的建立是輕量級的,因此在集合內部若是有併發修改的操做,集合會進行快速失敗
,也就是 fail-fast
。當他們發現容器在迭代過程當中被修改時,會拋出 ConcurrentModificationException
異常,這種快速失敗不是一種完備的處理機制,而只是 善意
的捕獲併發錯誤。
若是查看過 ConcurrentModificationException 的註解,你會發現,ConcurrentModificationException 拋出的原則由兩種,以下
形成這種異常的緣由是因爲多個線程在遍歷集合的同時對集合類內部進行了修改,這也就是 fail-fast 機制。
該註解還聲明瞭另一種方式
這個問題也是很經典的一個問題,咱們使用 ArrayList 來舉例子。以下代碼所示
public static void main(String[] args) {
List<String> list = new ArrayList<>();
for (int i = 0 ; i < 10 ; i++ ) {
list.add(i + "");
}
Iterator<String> iterator = list.iterator();
int i = 0 ;
while(iterator.hasNext()) {
if (i == 3) {
list.remove(3);
}
System.out.println(iterator.next());
i ++;
}
}
該段代碼會發生異常,由於在 ArrayList 內部,有兩個屬性,一個是 modCount
,一個是 expectedModCount
,ArrayList 在 remove 等對集合結構的元素形成數量上的操做會有 checkForComodification
的判斷,以下所示,這也是這段代碼的錯誤緣由。
fail-safe
fail-safe
是 Java 中的一種 安全失敗
機制,它表示的是在遍歷時不是直接在原集合上進行訪問,而是先複製原有集合內容,在拷貝的集合上進行遍歷。因爲迭代時是對原集合的拷貝進行遍歷,因此在遍歷過程當中對原集合所做的修改並不能被迭代器檢測到,因此不會觸發 ConcurrentModificationException。java.util.concurrent
包下的容器都是安全失敗的,能夠在多線程條件下使用,併發修改。
好比 CopyOnWriteArrayList
, 它就是一種 fail-safe 機制的集合,它就不會出現異常,例如以下操做
List<Integer> integers = new CopyOnWriteArrayList<>();
integers.add(1);
integers.add(2);
integers.add(3);
Iterator<Integer> itr = integers.iterator();
while (itr.hasNext()) {
Integer a = itr.next();
integers.remove(a);
}
CopyOnWriteArrayList 就是 ArrayList 的一種線程安全的變體,CopyOnWriteArrayList 中的全部可變操做好比 add 和 set 等等都是經過對數組進行全新複製來實現的。
操做系統中的併發工具
講到併發容器,就不得不提操做系統級別實現了哪些進程/線程間的併發容器,說白了其實就是數據結構的設計。下面咱們就來一塊兒看一下操做系統級別的併發工具
信號量
信號量是 E.W.Dijkstra 在 1965 年提出的一種方法,它使用一個整形變量來累計喚醒次數,以供以後使用。在他的觀點中,有一個新的變量類型稱做 信號量(semaphore)
。一個信號量的取值能夠是 0 ,或任意正數。0 表示的是不須要任何喚醒,任意的正數表示的就是喚醒次數。
Dijkstra 提出了信號量有兩個操做,如今一般使用 down
和 up
(分別能夠用 sleep 和 wakeup 來表示)。down 這個指令的操做會檢查值是否大於 0 。若是大於 0 ,則將其值減 1 ;若該值爲 0 ,則進程將睡眠,並且此時 down 操做將會繼續執行。檢查數值、修改變量值以及可能發生的睡眠操做均爲一個單一的、不可分割的 原子操做(atomic action)
完成。
互斥量
若是不須要信號量的計數能力時,可使用信號量的一個簡單版本,稱爲 mutex(互斥量)
。互斥量的優點就在於在一些共享資源和一段代碼中保持互斥。因爲互斥的實現既簡單又有效,這使得互斥量在實現用戶空間線程包時很是有用。
互斥量是一個處於兩種狀態之一的共享變量:解鎖(unlocked)
和 加鎖(locked)
。這樣,只須要一個二進制位來表示它,不過通常狀況下,一般會用一個 整型(integer)
來表示。0 表示解鎖,其餘全部的值表示加鎖,比 1 大的值表示加鎖的次數。
mutex 使用兩個過程,當一個線程(或者進程)須要訪問關鍵區域時,會調用 mutex_lock
進行加鎖。若是互斥鎖當前處於解鎖狀態(表示關鍵區域可用),則調用成功,而且調用線程能夠自由進入關鍵區域。
另外一方面,若是 mutex 互斥量已經鎖定的話,調用線程會阻塞直到關鍵區域內的線程執行完畢而且調用了 mutex_unlock
。若是多個線程在 mutex 互斥量上阻塞,將隨機選擇一個線程並容許它得到鎖。
Futexes
隨着並行的增長,有效的同步(synchronization)
和鎖定(locking)
對於性能來講是很是重要的。若是進程等待時間很短,那麼自旋鎖(Spin lock)
是很是有效;可是若是等待時間比較長,那麼這會浪費 CPU 週期。若是進程不少,那麼阻塞此進程,並僅當鎖被釋放的時候讓內核解除阻塞是更有效的方式。不幸的是,這種方式也會致使另外的問題:它能夠在進程競爭頻繁的時候運行良好,可是在競爭不是很激烈的狀況下內核切換的消耗會很是大,並且更困難的是,預測鎖的競爭數量更不容易。
有一種有趣的解決方案是把二者的優勢結合起來,提出一種新的思想,稱爲 futex
,或者是 快速用戶空間互斥(fast user space mutex)
,是否是聽起來頗有意思?
futex 是 Linux
中的特性實現了基本的鎖定(很像是互斥鎖)並且避免了陷入內核中,由於內核的切換的開銷很是大,這樣作能夠大大提升性能。futex 由兩部分組成:內核服務和用戶庫。內核服務提供了了一個 等待隊列(wait queue)
容許多個進程在鎖上排隊等待。除非內核明確的對他們解除阻塞,不然它們不會運行。
Pthreads 中的互斥量
Pthreads 提供了一些功能用來同步線程。最基本的機制是使用互斥量變量,能夠鎖定和解鎖,用來保護每一個關鍵區域。但願進入關鍵區域的線程首先要嘗試獲取 mutex。若是 mutex 沒有加鎖,線程可以立刻進入而且互斥量可以自動鎖定,從而阻止其餘線程進入。若是 mutex 已經加鎖,調用線程會阻塞,直到 mutex 解鎖。若是多個線程在相同的互斥量上等待,當互斥量解鎖時,只有一個線程可以進入而且從新加鎖。這些鎖並非必須的,程序員須要正確使用它們。
下面是與互斥量有關的函數調用
和咱們想象中的同樣,mutex 可以被建立和銷燬,扮演這兩個角色的分別是 Phread_mutex_init
和 Pthread_mutex_destroy
。mutex 也能夠經過 Pthread_mutex_lock
來進行加鎖,若是互斥量已經加鎖,則會阻塞調用者。還有一個調用Pthread_mutex_trylock
用來嘗試對線程加鎖,當 mutex 已經被加鎖時,會返回一個錯誤代碼而不是阻塞調用者。這個調用容許線程有效的進行忙等。最後,Pthread_mutex_unlock
會對 mutex 解鎖而且釋放一個正在等待的線程。
除了互斥量之外,Pthreads
還提供了第二種同步機制:條件變量(condition variables)
。mutex 能夠很好的容許或阻止對關鍵區域的訪問。條件變量容許線程因爲未知足某些條件而阻塞。絕大多數狀況下這兩種方法是一塊兒使用的。下面咱們進一步來研究線程、互斥量、條件變量之間的關聯。
下面再來從新認識一下生產者和消費者問題:一個線程將東西放在一個緩衝區內,由另外一個線程將它們取出。若是生產者發現緩衝區沒有空槽可使用了,生產者線程會阻塞起來直到有一個線程可使用。生產者使用 mutex 來進行原子性檢查從而不受其餘線程干擾。可是當發現緩衝區已經滿了之後,生產者須要一種方法來阻塞本身並在之後被喚醒。這即是條件變量作的工做。
下面是一些與條件變量有關的最重要的 pthread 調用
上表中給出了一些調用用來建立和銷燬條件變量。條件變量上的主要屬性是 Pthread_cond_wait
和 Pthread_cond_signal
。前者阻塞調用線程,直到其餘線程發出信號爲止(使用後者調用)。阻塞的線程一般須要等待喚醒的信號以此來釋放資源或者執行某些其餘活動。只有這樣阻塞的線程才能繼續工做。條件變量容許等待與阻塞原子性的進程。Pthread_cond_broadcast
用來喚醒多個阻塞的、須要等待信號喚醒的線程。
「須要注意的是,條件變量(不像是信號量)不會存在於內存中。若是將一個信號量傳遞給一個沒有線程等待的條件變量,那麼這個信號就會丟失,這個須要注意
管程
爲了可以編寫更加準確無誤的程序,Brinch Hansen 和 Hoare 提出了一個更高級的同步原語叫作 管程(monitor)
。管程有一個很重要的特性,即在任什麼時候候管程中只能有一個活躍的進程,這一特性使管程可以很方便的實現互斥操做。管程是編程語言的特性,因此編譯器知道它們的特殊性,所以能夠採用與其餘過程調用不一樣的方法來處理對管程的調用。一般狀況下,當進程調用管程中的程序時,該程序的前幾條指令會檢查管程中是否有其餘活躍的進程。若是有的話,調用進程將被掛起,直到另外一個進程離開管程纔將其喚醒。若是沒有活躍進程在使用管程,那麼該調用進程才能夠進入。
進入管程中的互斥由編譯器負責,可是一種通用作法是使用 互斥量(mutex)
和 二進制信號量(binary semaphore)
。因爲編譯器而不是程序員在操做,所以出錯的概率會大大下降。在任什麼時候候,編寫管程的程序員都無需關心編譯器是如何處理的。他只須要知道將全部的臨界區轉換成爲管程過程便可。毫不會有兩個進程同時執行臨界區中的代碼。
即便管程提供了一種簡單的方式來實現互斥,但在咱們看來,這還不夠。由於咱們還須要一種在進程沒法執行被阻塞。在生產者-消費者問題中,很容易將針對緩衝區滿和緩衝區空的測試放在管程程序中,可是生產者在發現緩衝區滿的時候該如何阻塞呢?
解決的辦法是引入條件變量(condition variables)
以及相關的兩個操做 wait
和 signal
。當一個管程程序發現它不能運行時(例如,生產者發現緩衝區已滿),它會在某個條件變量(如 full)上執行 wait
操做。這個操做形成調用進程阻塞,而且還將另外一個之前等在管程以外的進程調入管程。在前面的 pthread 中咱們已經探討過條件變量的實現細節了。另外一個進程,好比消費者能夠經過執行 signal
來喚醒阻塞的調用進程。
經過臨界區自動的互斥,管程比信號量更容易保證並行編程的正確性。可是管程也有缺點,咱們前面說到過管程是一個編程語言的概念,編譯器必需要識別管程並用某種方式對其互斥做出保證。C、Pascal 以及大多數其餘編程語言都沒有管程,因此不能依靠編譯器來遵照互斥規則。
與管程和信號量有關的另外一個問題是,這些機制都是設計用來解決訪問共享內存的一個或多個 CPU 上的互斥問題的。經過將信號量放在共享內存中並用 TSL
或 XCHG
指令來保護它們,能夠避免競爭。可是若是是在分佈式系統中,可能同時具備多個 CPU 的狀況,而且每一個 CPU 都有本身的私有內存呢,它們經過網絡相連,那麼這些原語將會失效。由於信號量過低級了,而管程在少數幾種編程語言以外沒法使用,因此還須要其餘方法。
消息傳遞
上面提到的其餘方法就是 消息傳遞(messaage passing)
。這種進程間通訊的方法使用兩個原語 send
和 receive
,它們像信號量而不像管程,是系統調用而不是語言級別。示例以下
send(destination, &message);
receive(source, &message);
send 方法用於向一個給定的目標發送一條消息,receive 從一個給定的源接收一條消息。若是沒有消息,接受者可能被阻塞,直到接收一條消息或者帶着錯誤碼返回。
消息傳遞系統如今面臨着許多信號量和管程所未涉及的問題和設計難點,尤爲對那些在網絡中不一樣機器上的通訊情況。例如,消息有可能被網絡丟失。爲了防止消息丟失,發送方和接收方能夠達成一致:一旦接受到消息後,接收方立刻回送一條特殊的 確認(acknowledgement)
消息。若是發送方在一段時間間隔內未收到確認,則重發消息。
如今考慮消息自己被正確接收,而返回給發送着的確認消息丟失的狀況。發送者將重發消息,這樣接受者將收到兩次相同的消息。
對於接收者來講,如何區分新的消息和一條重發的老消息是很是重要的。一般採用在每條原始消息中嵌入一個連續的序號來解決此問題。若是接受者收到一條消息,它具備與前面某一條消息同樣的序號,就知道這條消息是重複的,能夠忽略。
消息系統還必須處理如何命名進程的問題,以便在發送或接收調用中清晰的指明進程。身份驗證(authentication)
也是一個問題,好比客戶端怎麼知道它是在與一個真正的文件服務器通訊,從發送方到接收方的信息有可能被中間人所篡改。
屏障
最後一個同步機制是準備用於進程組而不是進程間的生產者-消費者狀況的。在某些應用中劃分了若干階段,而且規定,除非全部的進程都就緒準備着手下一個階段,不然任何進程都不能進入下一個階段,能夠經過在每一個階段的結尾安裝一個 屏障(barrier)
來實現這種行爲。當一個進程到達屏障時,它會被屏障所攔截,直到全部的屏障都到達爲止。屏障可用於一組進程同步,以下圖所示
在上圖中咱們能夠看到,有四個進程接近屏障,這意味着每一個進程都在進行運算,可是尚未到達每一個階段的結尾。過了一段時間後,A、B、D 三個進程都到達了屏障,各自的進程被掛起,但此時還不能進入下一個階段呢,由於進程 B 尚未執行完畢。結果,當最後一個 C 到達屏障後,這個進程組纔可以進入下一個階段。
避免鎖:讀-複製-更新
最快的鎖是根本沒有鎖。問題在於沒有鎖的狀況下,咱們是否容許對共享數據結構的併發讀寫進行訪問。答案固然是不能夠。假設進程 A 正在對一個數字數組進行排序,而進程 B 正在計算其平均值,而此時你進行 A 的移動,會致使 B 會屢次讀到重複值,而某些值根本沒有遇到過。
然而,在某些狀況下,咱們能夠容許寫操做來更新數據結構,即使還有其餘的進程正在使用。竅門在於確保每一個讀操做要麼讀取舊的版本,要麼讀取新的版本,例以下面的樹
上面的樹中,讀操做從根部到葉子遍歷整個樹。加入一個新節點 X 後,爲了實現這一操做,咱們要讓這個節點在樹中可見以前使它"剛好正確":咱們對節點 X 中的全部值進行初始化,包括它的子節點指針。而後經過原子寫操做,使 X 稱爲 A 的子節點。全部的讀操做都不會讀到先後不一致的版本
在上面的圖中,咱們接着移除 B 和 D。首先,將 A 的左子節點指針指向 C 。全部本來在 A 中的讀操做將會後續讀到節點 C ,而永遠不會讀到 B 和 D。也就是說,它們將只會讀取到新版數據。一樣,全部當前在 B 和 D 中的讀操做將繼續按照原始的數據結構指針而且讀取舊版數據。全部操做均能正確運行,咱們不須要鎖住任何東西。而不須要鎖住數據就可以移除 B 和 D 的主要緣由就是 讀-複製-更新(Ready-Copy-Update,RCU)
,將更新過程當中的移除和再分配過程分離開。
Java 併發工具包
JDK 1.5 提供了許多種併發容器來改進同步容器的性能,同步容器將全部對容器狀態的訪問都串行化,以實現他們之間的線程安全性。這種方法的代價是嚴重下降了併發性能,當多個線程爭搶容器鎖的同時,嚴重下降吞吐量。
下面咱們就來一塊兒認識一下 Java 中都用了哪些併發工具
Java 併發工具綜述
在 Java 5.0 中新增長了 ConcurrentHashMap
用來替代基於散列的 Map 容器;新增長了 CopyOnWriteArrayList
和 CopyOnWriteArraySet
來分別替代 ArrayList 和 Set 接口實現類;還新增長了兩種容器類型,分別是 Queue
和 BlockingQueue
, Queue 是隊列的意思,它有一些實現分別是傳統的先進先出隊列 ConcurrentLinkedQueue
以及併發優先級隊列 PriorityQueue
。Queue 是一個先入先出的隊列,它的操做不會阻塞,若是隊列爲空那麼獲取元素的操做會返回空值。PriorityQueue 擴展了 Queue,增長了可阻塞的插入和獲取等操做。若是隊列爲空,那麼獲取元素的操做將一直阻塞,直到隊列中出現一個可用的元素爲止。若是隊列已滿,那麼插入操做則一直阻塞,直到隊列中有可用的空間爲止。
Java 6.0 還引入了 ConcurrentSkipListMap
和 ConcurrentSkipListSet
分別做爲同步的 SortedMap 和 SortedSet 的併發替代品。下面咱們就展開探討了,設計不到底層源碼,由於本篇文章主要目的就是爲了描述一下有哪些東西以及用了哪些東西。
ConcurrentHashMap
咱們先來看一下 ConcurrentHashMap 在併發集合中的位置
能夠看到,ConcurrentHashMap 繼承了 AbstractMap
接口並實現了 ConcurrentMap 和 Serializable 接口,AbstractMap 和 ConcurrentMap 都是 Map 的實現類,只不過 AbstractMap 是抽象實現。
ConcurrentHashMap 和 Hashtable 的構造很是類似,只不過 Hashtable 容器在激烈競爭的場景中會表現出效率低下的現象,這是由於全部訪問 Hashtable 的線程都想獲取同一把鎖,若是容器裏面有多把鎖,而且每一把鎖都只用來鎖定一段數據,那麼當多個線程訪問不一樣的數據段時,就不存在競爭關係。這就是 ConcurreentHashMap 採用的 分段鎖
實現。在這種鎖實現中,任意數量的讀取線程能夠併發的訪問 Map,執行讀取操做的線程和執行寫入的線程能夠併發的訪問 Map,而且在讀取的同時也能夠併發修改 Map。
ConcurrentHashMap 分段鎖實現帶來的結果是,在併發環境下能夠實現更高的吞吐量,在單線程環境下只損失很是小的性能。
你知道 HashMap 是具備 fail-fast 機制的,也就是說它是一種強一致性的集合,在數據不一致的狀況下會拋出 ConcurrentModificationException
異常,而 ConcurrentHashMap 是一種 弱一致性
的集合,在併發修改其內部結構時,它不會拋出 ConcurrentModificationException 異常,弱一致性可以容忍併發修改。
在 HashMap 中,咱們通常使用的 size、empty、containsKey 等方法都是標準方法,其返回的結果是必定的,包含就是包含,不包含就是不包含,能夠做爲判斷條件;而 ConcurrentHashMap 中的這些方法只是參考方法,它不是一個 精確值
,像是 size、empty 這些方法在併發場景下用處很小,由於他們的返回值老是在不斷變化,因此這些操做的需求就被弱化了。
在 ConcurrentHashMap 中沒有實現對 Map 加鎖從而實現獨佔訪問。在線程安全的 Map 實現 Hashtable
和 Collections.synchronizedMap
中都實現了獨佔訪問,所以只能單個線程修改 Map 。ConcurrentHashMap 與這些 Map 容器相比,具備更多的優點和更少的劣勢,只有當須要獨佔訪問的需求時纔會使用 Hashtable 或者是 Collections.synchronizedMap ,不然其餘併發場景下,應該使用 ConcurrentHashMap。
ConcurrentMap
ConcurrentMap 是一個接口,它繼承了 Map 接口並提供了 Map 接口中四個新的方法,這四個方法都是 原子性
方法,進一步擴展了 Map 的功能。
public interface ConcurrentMap<K, V> extends Map<K, V> {
// 僅當 key 沒有相應的映射值時才插入
V putIfAbsent(K key, V value);
// 僅當 key 被映射到 value 時才移除
boolean remove(Object key, Object value);
// 僅當 key 被映射到 value 時才移除
V replace(K key, V value);
// 僅當 key 被映射到 oldValue 時才替換爲 newValue
boolean replace(K key, V oldValue, V newValue);
}
ConcurrentNavigableMap
java.util.concurrent.ConcurrentNavigableMap
類是 java.util.NavigableMap
的子類,它支持併發訪問,而且容許其視圖的併發訪問。
什麼是視圖呢?視圖就是集合中的一段數據序列,ConcurrentNavigableMap 中支持使用 headMap
、subMap
、tailMap
返回的視圖。與其從新解釋一下 NavigableMap 中找到的全部方法,不如看一下 ConcurrentNavigableMap 中添加的方法
-
headMap 方法:headMap 方法返回一個嚴格小於給定鍵的視圖 -
tailMap 方法:tailMap 方法返回包含大於或等於給定鍵的視圖。 -
subMap 方法:subMap 方法返回給定兩個參數的視圖
ConcurrentNavigableMap 接口包含一些可能有用的其餘方法
-
descendingKeySet() -
descendingMap() -
navigableKeySet()
更多關於方法的描述這裏就再也不贅述了,讀者朋友們可自行查閱 javadoc
ConcurrentSkipListMap
ConcurrentSkipListMap
是線程安全的有序的哈希表,適用於高併發的場景。
ConcurrentSkipListMap 的底層數據結構是基於跳錶
實現的。ConcurrentSkipListMap 能夠提供 Comparable 內部排序或者是 Comparator 外部排序,具體取決於使用哪一個構造函數。
ConcurrentSkipListSet
ConcurrentSkipListSet
是線程安全的有序的集合,適用於高併發的場景。ConcurrentSkipListSet 底層是經過 ConcurrentNavigableMap 來實現的,它是一個有序的線程安全的集合。
ConcurrentSkipListSet有序的,基於元素的天然排序或者經過比較器肯定的順序;
ConcurrentSkipListSet是線程安全的;
CopyOnWriteArrayList
CopyOnWriteArrayList 是 ArrayList 的變體,在 CopyOnWriteArrayList 中,全部可變操做好比 add、set 其實都是從新建立了一個副本,經過對數組進行復制而實現的。
CopyOnWriteArrayList 其內部有一個指向數組的引用,數組是不會被修改的,每次併發修改 CopyOnWriteArrayList 都至關於從新建立副本,CopyOnWriteArrayList 是一種 fail-safe
機制的,它不會拋出 ConcurrentModificationException 異常,而且返回元素與迭代器建立時的元素相同。
每次併發寫操做都會建立新的副本,這個過程存在必定的開銷,因此,通常在規模很大時,讀操做要遠遠多於寫操做時,爲了保證線程安全性,會使用 CopyOnWriteArrayList。
相似的,CopyOnWriteArraySet 的做用也至關於替代了 Set 接口。
BlockingQueue
BlockingQueue 譯爲 阻塞隊列
,它是 JDK 1.5 添加的新的工具類,它繼承於 Queue 隊列
,並擴展了 Queue 的功能。
BlockingQueue 在檢索元素時會等待隊列變成非空,並在存儲元素時會等待隊列變爲可用。BlockingQueue 的方法有四種實現形式,以不一樣的方式來處理。
-
第一種是 拋出異常
-
特殊值
:第二種是根據狀況會返回 null 或者 false -
阻塞
:第三種是無限期的阻塞當前線程直到操做變爲可用後 -
超時
:第四種是給定一個最大的超時時間,超事後纔會放棄
BlockingQueue 不容許添加 null 元素,在其實現類的方法 add、put 或者 offer 後時添加 null 會拋出空指針異常。BlockingQueue 會有容量限制。在任意時間內,它都會有一個 remainCapacity,超過該值以前,任意 put 元素都會阻塞。
BlockingQueue 通常用於實現生產者 - 消費者
隊列,以下圖所示
BlockingQueue 有多種實現,下面咱們一塊兒來認識一下這些容器。
其中 LinkedBlockingQueue
和 ArrayBlockingQueue
是 FIFO 先入先出隊列,兩者分別和 LinkedList
和 ArrayList
對應,比同步 List 具備更好的併發性能。PriorityBlockingQueue
是一個優先級排序的阻塞隊列,若是你但願按照某種順序而不是 FIFO 處理元素時這個隊列將很是有用。正如其餘有序的容器同樣,PriorityBlockingQueue 既能夠按照天然順序來比較元素,也可使用 Comparator
比較器進行外部元素比較。SynchronousQueue
它維護的是一組線程而不是一組隊列,實際上它不是一個隊列,它的每一個 insert 操做必須等待其餘相關線程的 remove 方法後才能執行,反之亦然。
LinkedBlockingQueue
LinkedBlockingQueue
是一種 BlockingQueue 的實現。
它是一種基於鏈表的構造、先入先出的有界阻塞隊列。隊列的 head
也就是頭元素是在隊列中等待時間最長的元素;隊列的 tail
也就是隊尾元素是隊列中等待時間最短的元素。新的元素會被插入到隊尾中,檢索操做將獲取隊列中的頭部元素。鏈表隊列一般比基於數組的隊列具備更高的吞吐量,可是在大多數併發應用程序中,可預測的性能較差。
ArrayBlockingQueue
ArrayBlockingQueue
是一個用數組實現的有界隊列,此隊列順序按照先入先出
的原則對元素進行排序。
默認狀況下不保證線程公平的訪問隊列,所謂公平訪問隊列指的是阻塞的線程,能夠按照阻塞的前後順序訪問,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的。有可能先阻塞的線程最後才訪問隊列。
PriorityBlockingQueue
PriorityBlockingQueue
是一個支持優先級的阻塞隊列,默認狀況下的元素採起天然順序生序或者降序,也能夠本身定義 Comparator 進行外部排序。但須要注意的是不能保證同優先級元素的順序。
DelayQueue
DelayQueue
是一個支持延時獲取元素的無阻塞隊列,其中的元素只能在延遲到期後才能使用,DelayQueue 中的隊列頭是延遲最長時間的元素,若是沒有延遲,則沒有 head 頭元素,poll 方法會返回 null。判斷的依據就是 getDelay(TimeUnit.NANOSECONDS)
方法返回一個值小於或者等於 0 就會發生過時。
TransferQueue
TransferQueue 繼承於 BlockingQueue,它是一個接口,一個 BlockingQueue 是一個生產者可能等待消費者接受元素,TransferQueue 則更進一步,生產者會一直阻塞直到所添加到隊列的元素被某一個消費者所消費,新添加的transfer 方法用來實現這種約束。
TransferQueue 有下面這些方法:兩個 tryTransfer
方法,一個是非阻塞的,另外一個是帶有 timeout 參數設置超時時間的。還有兩個輔助方法 hasWaitingConsumer
和 getWaitingConcusmerCount
。
LinkedTransferQueue
一個無界的基於鏈表的 TransferQueue。這個隊列對任何給定的生產者進行 FIFO 排序,head 是隊列中存在時間最長的元素。tail 是隊列中存在時間最短的元素。
BlockingDeque
與 BlockingQueue 相對的還有 BlockingDeque 和 Deque,它們是 JDK1.6 被提出的,分別對 Queue 和 BlockingQueue 作了擴展。
Deque
是一個雙端隊列,分別實現了在隊列頭和隊列尾的插入。Deque 的實現有 ArrayDeque
、ConcurrentLinkedDeque
,BlockingDeque 的實現有 LinkedBlockingDeque
。
阻塞模式通常用於生產者 - 消費者隊列,而雙端隊列適用於工做密取。在工做密取的設計中,每一個消費者都有各自的雙端隊列,若是一個消費者完成了本身雙端隊列的任務,就會去其餘雙端隊列的末尾進行消費。密取方式要比傳統的生產者 - 消費者隊列具備更高的可伸縮性,這是由於每一個工做密取的工做者都有本身的雙端隊列,不存在競爭的狀況。
ArrayDeque
ArrayDeque 是 Deque 的可動態調整大小的數組實現,其內部沒有容量限制,他們會根據須要進行增加。ArrayDeque 不是線程安全的,若是沒有外部加鎖的狀況下,不支持多線程訪問。ArrayDeque 禁止空元素,這個類做爲棧使用時要比 Stack 快,做爲 queue 使用時要比 LinkedList 快。
除了 remove、removeFirstOccurrence、removeLastOccurrence、contains、interator.remove 外,大部分的 ArrayDeque 都以恆定的開銷運行。
「注意:ArrayDeque 是 fail-fast 的,若是建立了迭代器以後,卻使用了迭代器外部的 remove 等修改方法,那麼這個類將會拋出 ConcurrentModificationException 異常。
ConcurrentLinkedDeque
ConcurrentLinkedDeque
是 JDK1.7 引入的雙向鏈表的無界併發隊列。它與 ConcurrentLinkedQueue 的區別是 ConcurrentLinkedDeque 同時支持 FIFO 和 FILO 兩種操做方式,便可以從隊列的頭和尾同時操做(插入/刪除)。ConcurrentLinkedDeque 也支持 happen-before
原則。ConcurrentLinkedDeque 不容許空元素。
LinkedBlockingDeque
LinkedBlockingDeque
是一個由鏈表結構組成的雙向阻塞隊列,便可以從隊列的兩端插入和移除元素。雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。LinkedBlockingDeque 把初始容量和構造函數綁定,這樣可以有效過分拓展。初始容量若是沒有指定,就取的是 Integer.MAX_VALUE
,這也是 LinkedBlockingDeque 的默認構造函數。
同步工具類
同步工具類能夠是任何一個對象,只要它根據自身狀態來協調線程的控制流。阻塞隊列能夠做爲同步控制類,其餘類型的同步工具類還包括 信號量(Semaphore)
、柵欄(Barrier)
和 閉鎖(Latch)
。下面咱們就來一塊兒認識一下這些工具類
Semaphore
Semaphore 翻譯過來就是 信號量
,信號量是什麼?它其實就是一種信號,在操做系統中,也有信號量的這個概念,在進程間通訊的時候,咱們就會談到信號量進行通訊。還有在 Linux 操做系統採起中斷時,也會向進程發出中斷信號,根據進程的種類和信號的類型判斷是否應該結束進程。
在 Java 中,Semaphore(信號量)
是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
Semaphore 管理着一組許可(permit)
,許可的初始數量由構造函數來指定。在獲取某個資源以前,應該先從信號量獲取許可(permit)
,以確保資源是否可用。當線程完成對資源的操做後,會把它放在池中並向信號量返回一個許可,從而容許其餘線程訪問資源,這叫作釋放許可。若是沒有許可的話,那麼 acquire
將會阻塞直到有許可(中斷或者操做超時)爲止。release
方法將返回一個許可信號量。
Semaphore 能夠用來實現流量控制,例如經常使用的數據庫鏈接池,線程請求資源時,若是數據庫鏈接池爲空則阻塞線程,直接返回失敗,若是鏈接池不爲空時解除阻塞。
CountDownLatch
閉鎖(Latch)
是一種同步工具類,它能夠延遲線程的進度以直到其到達終止狀態。閉鎖的做用至關因而一扇門,在閉鎖達到結束狀態前,門是一直關着的,沒有任何線程可以經過。當閉鎖到達結束狀態後,這扇門會打開而且容許任何線程經過,而後就一直保持打開狀態。
CountDownLatch
就是閉鎖的一種實現。它可使一個或者多個線程等待一組事件的發生。閉鎖有一個計數器,閉鎖須要對計數器進行初始化,表示須要等待的次數,閉鎖在調用 await
處進行等待,其餘線程在調用 countDown 把閉鎖 count 次數進行遞減,直到遞減爲 0 ,喚醒 await。以下代碼所示
public class TCountDownLatch {
public static void main(String[] args) {
CountDownLatch latch = new CountDownLatch(5);
Increment increment = new Increment(latch);
Decrement decrement = new Decrement(latch);
new Thread(increment).start();
new Thread(decrement).start();
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Decrement implements Runnable {
CountDownLatch countDownLatch;
public Decrement(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for(long i = countDownLatch.getCount();i > 0;i--){
Thread.sleep(1000);
System.out.println("countdown");
this.countDownLatch.countDown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Increment implements Runnable {
CountDownLatch countDownLatch;
public Increment(CountDownLatch countDownLatch){
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
System.out.println("await");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Waiter Released");
}
}
Future
咱們常見的建立多線程的方式有兩種,一種是繼承 Thread 類,一種是實現 Runnable 接口。這兩種方式都沒有返回值。相對的,建立多線程還有其餘三種方式,那就是使用 Callable
接口、 Future
接口和 FutureTask
類。Callable 咱們以前聊過,這裏就再也不描述了,咱們主要來描述一下 Future 和 FutureTask 接口。
Future 就是對具體的 Runnable 或者 Callable 任務的執行結果進行一系列的操做,必要時可經過 get
方法獲取執行結果,這個方法會阻塞直到執行結束。Future 中的主要方法有
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
-
cancel(boolean mayInterruptIfRunning)
: 嘗試取消任務的執行。若是任務已經完成、已經被取消或者因爲某些緣由而沒法取消,那麼這個嘗試會失敗。若是取消成功,或者在調用 cancel 時此任務還沒有開始,那麼此任務永遠不會執行。若是任務已經開始,那麼 mayInterruptIfRunning 參數會肯定是否中斷執行任務以便於嘗試中止該任務。這個方法返回後,會對isDone
的後續調用也返回 true,若是 cancel 返回 true,那麼後續的調用isCancelled
也會返回 true。 -
boolean isCancelled()
:若是此任務在正常完成以前被取消,則返回 true。 -
boolean isDone()
:若是任務完成,返回 true。 -
V get() throws InterruptedException, ExecutionException
:等待必要的計算完成,而後檢索其結果 -
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
:必要時最多等待給定時間以完成計算,而後檢索其結果。
由於Future只是一個接口,因此是沒法直接用來建立對象使用的,所以就有了下面的FutureTask。
FutureTask
FutureTask 實現了 RunnableFuture
接口,RunnableFuture 接口是什麼呢?
RunnableFuture 接口又繼承了 Runnable
接口和 Future
接口。納尼?在 Java 中不是隻容許單繼承麼,是的,單繼承更多的是說的類與類之間的繼承關係,子類繼承父類,擴展父類的接口,這個過程是單向的,就是爲了解決多繼承引發的過渡引用問題。而接口之間的繼承是接口的擴展,在 Java 編程思想中也印證了這一點
對 RunnableFuture 接口的解釋是:成功執行的 run 方法會使 Future 接口的完成並容許訪問其結果。因此它既能夠做爲 Runnable 被線程執行,又能夠做爲 Future 獲得 Callable 的返回值。
FutureTask 也能夠用做閉鎖,它能夠處於如下三種狀態
-
等待運行 -
正在運行 -
運行完成
FutureTask 在 Executor
框架中表示異步任務,此外還能夠表示一些時間較長的計算,這些計算能夠在使用計算結果以前啓動。
FutureTask 具體的源碼我後面會單獨出文章進行描述。
Barrier
咱們上面聊到了經過閉鎖來啓動一組相關的操做,使用閉鎖來等待一組事件的執行。閉鎖是一種一次性對象,一旦進入終止狀態後,就不能被 重置
。
Barrier
的特色和閉鎖也很相似,它也是阻塞一組線程直到某個事件發生。柵欄與閉鎖的區別在於,全部線程必須同時到達柵欄的位置,才能繼續執行,就像咱們上面操做系統給出的這幅圖同樣。
ABCD 四條線程,必須同時到達 Barrier,而後 手牽手
一塊兒走過幸福的殿堂。
當線程到達 Barrier 的位置時會調用 await
方法,這個方法會阻塞直到全部線程都到達 Barrier 的位置,若是全部線程都到達 Barrier 的位置,那麼 Barrier 將會打開使全部線程都被釋放,而 Barrier 將被重置以等待下次使用。若是調用 await 方法致使超時,或者 await 阻塞的線程被中斷,那麼 Barrier 就被認爲被打破,全部阻塞的 await 都會拋出 BrokenBarrierException
。若是成功經過柵欄後,await 方法返回一個惟一索引號,能夠利用這些索引號選舉一個新的 leader,來處理一下其餘工做。
public class TCyclicBarrier {
public static void main(String[] args) {
Runnable runnable = () -> System.out.println("Barrier 1 開始...");
Runnable runnable2 = () -> System.out.println("Barrier 2 開始...");
CyclicBarrier barrier1 = new CyclicBarrier(2,runnable);
CyclicBarrier barrier2 = new CyclicBarrier(2,runnable2);
CyclicBarrierRunnable b1 = new CyclicBarrierRunnable(barrier1,barrier2);
CyclicBarrierRunnable b2 = new CyclicBarrierRunnable(barrier1,barrier2);
new Thread(b1).start();
new Thread(b2).start();
}
}
class CyclicBarrierRunnable implements Runnable {
CyclicBarrier barrier1;
CyclicBarrier barrier2;
public CyclicBarrierRunnable(CyclicBarrier barrier1,CyclicBarrier barrier2){
this.barrier1 = barrier1;
this.barrier2 = barrier2;
}
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "等待 barrier1" );
barrier1.await();
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "等待 barrier2" );
barrier2.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() +
" 作完了!");
}
}
Exchanger
與 Barrier 相關聯的還有一個工具類就是 Exchanger
, Exchanger 是一個用於線程間協做的工具類。Exchanger用於進行線程間的數據交換。
它提供一個同步點,在這個同步點兩個線程能夠交換彼此的數據。這兩個線程經過exchange 方法交換數據, 若是第一個線程先執行 exchange方法,它會一直等待第二個線程也執行 exchange,當兩個線程都到達同步點時,這兩個線程就能夠交換數據,將本線程生產出來的數據傳遞給對方。所以使用Exchanger 的重點是成對的線程使用 exchange() 方法,當有一對線程達到了同步點,就會進行交換數據。所以該工具類的線程對象是成對的。
下面經過一段例子代碼來說解一下
public class TExchanger {
public static void main(String[] args) {
Exchanger exchanger = new Exchanger();
ExchangerRunnable exchangerRunnable = new ExchangerRunnable(exchanger,"A");
ExchangerRunnable exchangerRunnable2 = new ExchangerRunnable(exchanger,"B");
new Thread(exchangerRunnable).start();
new Thread(exchangerRunnable2).start();
}
}
class ExchangerRunnable implements Runnable {
Exchanger exchanger;
Object object;
public ExchangerRunnable(Exchanger exchanger,Object object){
this.exchanger = exchanger;
this.object = object;
}
@Override
public void run() {
Object previous = object;
try {
object = this.exchanger.exchange(object);
System.out.println(
Thread.currentThread().getName() + "改變前是" + previous + "改變後是" + object);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
總結
本篇文章咱們從同步容器類入手,主要講了 fail-fast
和 fail-safe
機制,這兩個機制在併發編程中很是重要。而後咱們從操做系統的角度,聊了聊操做系統層面實現安全性的幾種方式,而後從操做系統 -> 併發咱們聊了聊 Java 中的併發工具包有哪些,以及構建併發的幾種工具類。
若是這篇文章幫助到你,小夥伴們不妨點贊、再看、轉發,三連是讓我繼續更文的最大動力!
往期精選
本文分享自微信公衆號 - 程序員cxuan(cxuangoodjob)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。