Java集合及concurrent併發包總結(轉)

1.集合包java

    集合包最經常使用的有Collection和Map兩個接口的實現類,Colleciton用於存放多個單對象,Map用於存放Key-Value形式的鍵值對。算法

  Collection中最經常使用的又分爲兩種類型的接口:List和Set,二者最明顯的差異爲List支持放入重複的元素,而Set不支持。數組

List最經常使用的實現類有:ArrayList、LinkedList、Vector及Stack;Set接口經常使用的實現類有:HashSet、TreeSet。緩存

1.1 ArrayList安全

  ArrayList基於數組方式實現,默認構造器經過調用ArrayList(int)來完成建立,傳入的值爲10,實例化了一個Object數組,並將此數組賦給了當前實例的elementData屬性,此Object數組的大小即爲傳入的initialCapacity,所以調用空構造器的狀況下會建立一個大小爲10的Object數組。服務器

插入對象:add(E)數據結構

    基於已有元素數量加1做爲名叫minCapacity的變量,比較此值和Object數組的大小,若大於數組值,那麼先將當前Object數組值賦給一個數組對象,接着產生一個鑫的數組容量值。此值的計算方法爲當前數組值*1.5+1,如得出的容量值仍然小於minCapacity,那麼就以minCapacity做爲新的容量值,調用Arrays.copyOf來生成新的數組對象。多線程

    還提供了add(int,E)這樣的方法將元素直接插入指定的int位置上,將目前index及其後的數據都日後挪一位,而後才能將指定的index位置的賦值爲傳入的對象,這種方式要多付出一次複製數組的代價。還提供了addAll併發

 刪除對象:remove(E)app

   這裏調用了faseRemove方法將index後的對象往前複製一位,並將數組中的最後一個元素的值設置爲null,即釋放了對此對象的引用。 還提供了remove(int)方法來刪除指定位置的對象,remove(int)的實現比remove(E)多了一個數組範圍的檢測,但少了對象位置的查找,所以性能會更好。

獲取單個對象:get(int)

遍歷對象:iterator()

判斷對象是否存在:contains(E)

 總結:

    1,ArrayList基於數組方式實現,無容量的限制;

    2,ArrayList在執行插入元素時可能要擴容,在刪除元素時並不會減少數組的容量(如但願相應的縮小數組容量,能夠調用ArrayList的trimToSize()),在查找元素時要遍歷數組,對於非null的元素採起equals的方式尋找;

    3,ArrayList是非線程安全的。

1.2 LinkedList

    LinkedList基於雙向鏈表機制,所謂雙向鏈表機制,就是集合中的每一個元素都知道其前一個元素及其後一個元素的位置。在LinkedList中,以一個內部的Entry類來表明集合中的元素,元素的值賦給element屬性,Entry中的next屬性指向元素的後一個元素,Entry中的previous屬性指向元素的前一個元素,基於這樣的機制能夠快速實現集合中元素的移動。

總結:

    1,LinkedList基於雙向鏈表機制實現;

    2,LinkedList在插入元素時,須建立一個新的Entry對象,並切換相應元素的先後元素的引用;在查找元素時,須遍歷鏈表;在刪除元素時,要遍歷鏈表,找到要刪除的元素,而後從鏈表上將此元素刪除便可,此時原有的先後元素改變引用連在一塊兒;

    3,LinkedList是非線程安全的。

1.3 Vector

    其add、remove、get(int)方法都加了synchronized關鍵字,默認建立一個大小爲10的Object數組,並將capacityIncrement設置爲0。容量擴充策略:若是capacityIncrement大於0,則將Object數組的大小擴大爲現有size加上capacityIncrement的值;若是capacity等於或小於0,則將Object數組的大小擴大爲現有size的兩倍,這種容量的控制策略比ArrayList更爲可控。

    Vector是基於Synchronized實現的線程安全的ArrayList,但在插入元素時容量擴充的機制和ArrayList稍有不一樣,並可經過傳入capacityIncrement來控制容量的擴充。

1.4 Stack

    Stack繼承於Vector,在其基礎上實現了Stack所要求的後進先出(LIFO)的彈出與壓入操做,其提供了push、pop、peek三個主要的方法:

    push操做經過調用Vector中的addElement來完成;

    pop操做經過調用peek來獲取元素,並同時刪除數組中的最後一個元素;

    peek操做經過獲取當前Object數組的大小,並獲取數組上的最後一個元素。

1.5 HashSet

    默認構造建立一個HashMap對象

add(E):調用HashMap的put方法來完成此操做,將須要增長的元素做爲Map中的key,value則傳入一個以前已建立的Object對象。

remove(E):調用HashMap的remove(E)方法完成此操做。

contains(E):HashMap的containsKey

iterator():調用HashMap的keySet的iterator方法。

HashSet不支持經過get(int)獲取指定位置的元素,只能自行經過iterator方法來獲取。

總結:

    1,HashSet基於HashMap實現,無容量限制;

    2,HashSet是非線程安全的。

1.6 TreeSet

    TreeSet和HashSet的主要不一樣在於TreeSet對於排序的支持,TreeSet基於TreeMap實現。

1.7 HashMap

    HashMap空構造,將loadFactor設爲默認的0.75,threshold設置爲12,並建立一個大小爲16的Entry對象數組。

    基於數組+鏈表的結合體(鏈表散列)實現,將key-value當作一個總體,存放於Entity[]數組,put的時候根據key hash後的hashcode和數組length-1按位與的結果值判斷放在數組的哪一個位置,若是該數組位置上若已經存放其餘元素,則在這個位置上的元素以鏈表的形式存放。若是該位置上沒有元素則直接存放。

當系統決定存儲HashMap中的key-value對時,徹底沒有考慮Entry中的value,僅僅只是根據key來計算並決定每一個Entry的存儲位置。咱們徹底能夠把Map集合中的value當成key的附屬,當系統決定了key的存儲位置以後,value隨之保存在那裏便可。get取值也是根據key的hashCode肯定在數組的位置,在根據key的equals肯定在鏈表處的位置。

1 while (capacity < initialCapacity)
2      capacity <<= 1;

 

以上代碼保證了初始化時HashMap的容量老是2的n次方,即底層數組的長度老是爲2的n次方。它經過h & (table.length -1) 來獲得該對象的保存位,若length爲奇數值,則與運算產生相同結果,便會造成鏈表,儘量的少出現鏈表才能提高hashMap的效率,因此這是hashMap速度上的優化。

擴容resize():

當HashMap中的元素愈來愈多的時候,hash衝突的概率也就愈來愈高,由於數組的長度是固定的。因此爲了提升查詢的效率,就要對HashMap的數組進行擴容,而在HashMap數組擴容以後,最消耗性能的點就出現了:原數組中的數據必須從新計算其在新數組中的位置,並放進去,這就是resize。那麼HashMap何時進行擴容呢?當HashMap中的元素個數超過數組大小*loadFactor時,就會進行數組擴容,loadFactor的默認值爲0.75,這是一個折中的取值。

負載因子衡量的是一個散列表的空間的使用程度,負載因子越大表示散列表的裝填程度越高,反之愈小。負載因子越大,對空間的利用更充分,然然後果是查找效率的下降;若是負載因子過小,那麼散列表的數據將過於稀疏,對空間形成嚴重浪費。

HashMap的實現中,經過threshold字段來判斷HashMap的最大容量。threshold就是在此loadFactor和capacity對應下容許的最大元素數目,超過這個數目就從新resize,以下降實際的負載因子。默認的的負載因子0.75是對空間和時間效率的一個平衡選擇。

initialCapacity*2,成倍擴大容量,HashMap(int initialCapacity, float loadFactor):以指定初始容量、指定的負載因子建立一個 HashMap。不設定參數,則初始容量值爲16,默認的負載因子爲0.75,不宜過大也不宜太小,過大影響效率,太小浪費空間。擴容後須要從新計算每一個元素在數組中的位置,是一個很是消耗性能的操做,因此若是咱們已經預知HashMap中元素的個數,那麼預設元素的個數可以有效的提升HashMap的性能。

     HashTable數據結構的原理大體同樣,區別在於put、get時加了同步關鍵字,並且HashTable不可存放null值。

在高併發時可使用ConcurrentHashMap,其內部使用鎖分段技術,維持這鎖Segment的數組,在數組中又存放着Entity[]數組,內部hash算法將數據較均勻分佈在不一樣鎖中。

總結:

    1,HashMap採用數組方式存儲key、value構成的Entry對象,無容量限制;

    2,HashMap基於key hash尋找Entry對象存放到數組的位置,對於hash衝突採用鏈表的方式解決;

    3,HashMap在插入元素時可能會擴大數組的容量,在擴大容量時需要從新計算hash,並複製對象到新的數組中;

    4,HashMap是非線程安全的。

詳細說明:http://zhangshixi.iteye.com/blog/672697 

1.8 TreeMap

    TreeMap基於紅黑樹的實現,所以它要求必定要有key比較的方法,要麼傳入Comparator實現,要麼key對象實現Comparable藉口。在put操做時,基於紅黑樹的方式遍歷,基於comparator來比較key應放在樹的左邊仍是右邊,如找到相等的key,則直接替換掉value。

 

2.併發包

 jdk5.0一很重要的特性就是增長了併發包java.util.concurrent.*,在說具體的實現類或接口以前,這裏先簡要說下Java內存模型、volatile變量及AbstractQueuedSynchronizer(如下簡稱AQS同步器),這些都是併發包衆多實現的基礎。

Java內存模型

    描述了線程內存與主存見的通信關係。定義了線程內的內存改變將怎樣傳遞到其餘線程的規則,一樣也定義了線程內存與主存進行同步的細節,也描述了哪些操做屬於原子操做及操做間的順序。

代碼順序規則:

    一個線程內的每一個動做happens-before同一個線程內在代碼順序上在其後的全部動做.

volatile變量規則:

    對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入.

傳遞性:

    若是A happens-before B, B happens-before C, 那麼A happens-before C.    

 

volatile

當咱們聲明共享變量爲volatile後,對這個變量的讀/寫將會很特別。理解volatile特性的一個好方法是:把對volatile變量的單個讀/寫,當作是使用同一個監視器鎖對這些單個讀/寫操做作了同步。

監視器鎖的happens-before規則保證釋放監視器和獲取監視器的兩個線程之間的內存可見性,這意味着對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入。

 

簡而言之,volatile變量自身具備下列特性:

  • 可見性。對一個volatile變量的讀,老是能看到(任意線程)對這個volatile變量最後的寫入。

  • 原子性:對任意單個volatile變量的讀/寫具備原子性,但相似於volatile++這種複合操做不具備原子性。

 

volatile寫的內存語義以下:

  • 當寫一個volatile變量時,JMM會把該線程對應的本地內存中的共享變量刷新到主內存。

 

volatile讀的內存語義以下:

  • 當讀一個volatile變量時,JMM會把該線程對應的本地內存置爲無效。線程接下來將從主內存中讀取共享變量。

 

下面對volatile寫和volatile讀的內存語義作個總結:

  • 線程A寫一個volatile變量,實質上是線程A向接下來將要讀這個volatile變量的某個線程發出了(其對共享變量所在修改的)消息。

  • 線程B讀一個volatile變量,實質上是線程B接收了以前某個線程發出的(在寫這個volatile變量以前對共享變量所作修改的)消息。

  • 線程A寫一個volatile變量,隨後線程B讀這個volatile變量,這個過程實質上是線程A經過主內存向線程B發送消息。

鎖釋放-獲取與volatile的讀寫具備相同的內存語義,

鎖釋放的內存語義以下:

    當線程釋放鎖時,JMM會把該線程對應的本地內存中的共享變量刷新到主內存。

鎖獲取的內存語義以下:

    當線程獲取鎖時,JMM會把該線程對應的本地內存置爲無效,從而使得被監視器保護的臨界區代碼必需要從主內存中讀取共享變量。

 

下面對鎖釋放和鎖獲取的內存語義作個總結:

  • 線程A釋放一個鎖,實質上是線程A向接下來將要獲取這個鎖的某個線程發出了(線程A對共享變量所作修改的)消息。

  • 線程B獲取一個鎖,實質上是線程B接收了以前某個線程發出的(在釋放這個鎖以前對共享變量所作修改的)消息。

  • 線程A釋放鎖,隨後線程B獲取這個鎖,這個過程實質上是線程A經過主內存向線程B發送消息。

示例:

複製代碼

1 class VolatileExample {
 2     int x = 0;
 3     volatile int b = 0;
 4 
 5     private void write() {
 6         x = 5;
 7         b = 1;
 8     }
 9 
10     private void read() {
11         int dummy = b;
12         while (x != 5) {
13         }
14     }
15 
16     public static void main(String[] args) throws Exception {
17         final VolatileExample example = new VolatileExample();
18         Thread thread1 = new Thread(new Runnable() {
19             public void run() {
20                 example.write();
21             }
22         });
23         Thread thread2 = new Thread(new Runnable() {
24             public void run() {
25                 example.read();
26             }
27         });
28         thread1.start();
29         thread2.start();
30         thread1.join();
31         thread2.join();
32     }
33 }

複製代碼

 

 

 

若thread1先於thread2執行,則程序執行流程分析如上圖所示,thread2讀的結果是dummy=1,x=5因此不會進入死循環。

但並不能保證兩線程的執行順序,若thread2先於thread1執行,則程序在兩線程join中斷以前的結果爲:由於b變量的類型是volatile,故thread1寫以後,thread2便可讀到b變量的值發生變化,

而x是普通變量,故最後狀況是dummy=1,但thread2的讀操做由於x=0而進入死循環中。

    在JSR-133以前的舊Java內存模型中,雖然不容許volatile變量之間重排序,但舊的Java內存模型仍然會容許volatile變量與普通變量之間重排序。JSR-133則加強了volatile的內存語義:嚴格限制編譯器(在編譯期)和處理器(在運行期)對volatile變量與普通變量的重排序,確保volatile的寫-讀和監視器的釋放-獲取同樣,具備相同的內存語義。限制重排序是經過內存屏障實現的,具體可見JMM的描述。

 

    因爲volatile僅僅保證對單個volatile變量的讀/寫具備原子性,而監視器鎖的互斥執行的特性能夠確保對整個臨界區代碼的執行具備原子性。在功能上,監視器鎖比volatile更強大;在可伸縮性和執行性能上,volatile更有優點。若是讀者想在程序中用volatile代替監視器鎖,請必定謹慎。

 

AbstractQueuedSynchronizer (AQS)

    AQS使用一個整型的volatile變量(命名爲state)來維護同步狀態,這是接下來實現大部分同步需求的基礎。提供了一個基於FIFO隊列,能夠用於構建鎖或者其餘相關同步裝置的基礎框架。使用的方法是繼承,子類經過繼承同步器並須要實現它的方法來管理其狀態,管理的方式就是經過相似acquire和release的方式來操縱狀態。然而多線程環境中對狀態的操縱必須確保原子性,所以子類對於狀態的把握,須要使用這個同步器提供的如下三個方法對狀態進行操做:

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)

  • java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)

子類推薦被定義爲自定義同步裝置的內部類,同步器自身沒有實現任何同步接口,它僅僅是定義了若干acquire之類的方法來供使用。該同步器便可以做爲排他模式也能夠做爲共享模式,當它被定義爲一個排他模式時,其餘線程對其的獲取就被阻止,而共享模式對於多個線程獲取均可以成功。

    同步器是實現鎖的關鍵,利用同步器將鎖的語義實現,而後在鎖的實現中聚合同步器。能夠這樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行爲,而每一個鎖須要完成特定的操做也是透過這些行爲來完成的(好比:能夠容許兩個線程進行加鎖,排除兩個以上的線程),可是實現是依託給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對資源是否可以獲取以及線程的排隊等操做。鎖和同步器很好的隔離了兩者所須要關注的領域,嚴格意義上講,同步器能夠適用於除了鎖之外的其餘同步設施上(包括鎖)。
同步器的開始提到了其實現依賴於一個FIFO隊列,那麼隊列中的元素Node就是保存着線程引用和線程狀態的容器,每一個線程對同步器的訪問,均可以看作是隊列中的一個節點。

對於一個獨佔鎖的獲取和釋放有以下僞碼能夠表示:

獲取一個排他鎖

複製代碼

1 while(獲取鎖) {
 2     if (獲取到) {
 3         退出while循環
 4     } else {
 5         if(當前線程沒有入隊列) {
 6             那麼入隊列
 7         }
 8         阻塞當前線程
 9     }
10 }

複製代碼

釋放一個排他鎖

1 if (釋放成功) {
2     刪除頭結點
3     激活原頭結點的後繼節點
4 }

示例:

下面經過一個排它鎖的例子來深刻理解一下同步器的工做原理,而只有掌握同步器的工做原理才能更加深刻了解其餘的併發組件。

排他鎖的實現,一次只能一個線程獲取到鎖:

複製代碼

1 public class Mutex implements Lock, java.io.Serializable {
 2     // 內部類,自定義同步器
 3     private static class Sync extends AbstractQueuedSynchronizer {
 4       // 是否處於佔用狀態
 5       protected boolean isHeldExclusively() {
 6         return getState() == 1;
 7       }
 8       // 當狀態爲0的時候獲取鎖
 9       public boolean tryAcquire(int acquires) {
10         assert acquires == 1; // Otherwise unused
11         if (compareAndSetState(0, 1)) {
12           setExclusiveOwnerThread(Thread.currentThread());
13           return true;
14         }
15         return false;
16       }
17       // 釋放鎖,將狀態設置爲0
18       protected boolean tryRelease(int releases) {
19         assert releases == 1; // Otherwise unused
20         if (getState() == 0) throw new IllegalMonitorStateException();
21         setExclusiveOwnerThread(null);
22         setState(0);
23         return true;
24       }
25       // 返回一個Condition,每一個condition都包含了一個condition隊列
26       Condition newCondition() { return new ConditionObject(); }
27     }
28     // 僅須要將操做代理到Sync上便可
29     private final Sync sync = new Sync();
30     public void lock()                { sync.acquire(1); }
31     public boolean tryLock()          { return sync.tryAcquire(1); }
32     public void unlock()              { sync.release(1); }
33     public Condition newCondition()   { return sync.newCondition(); }
34     public boolean isLocked()         { return sync.isHeldExclusively(); }
35     public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
36     public void lockInterruptibly() throws InterruptedException {
37       sync.acquireInterruptibly(1);
38     }
39     public boolean tryLock(long timeout, TimeUnit unit)
40         throws InterruptedException {
41       return sync.tryAcquireNanos(1, unit.toNanos(timeout));
42     }
43   }

複製代碼

 

能夠看到Mutex將Lock接口均代理給了同步器的實現。使用方將Mutex構造出來後,調用lock獲取鎖,調用unlock將鎖釋放。

獲取鎖,acquire(int arg)的主要邏輯包括:

1. 嘗試獲取(調用tryAcquire更改狀態,須要保證原子性);

    在tryAcquire方法中適用了同步器提供的對state操做的方法,利用compareAndSet保證只有一個線程可以對狀態進行成功修改,而沒有成功修改的線程將進入sync隊列排隊。

2. 若是獲取不到,將當前線程構形成節點Node並加入sync隊列;

    進入隊列的每一個線程都是一個節點Node,從而造成了一個雙向隊列,相似CLH隊列,這樣作的目的是線程間的通訊會被限制在較小規模(也就是兩個節點左右)。

3. 再次嘗試獲取,若是沒有獲取到那麼將當前線程從線程調度器上摘下,進入等待狀態。

釋放鎖,release(int arg)的主要邏輯包括:

1. 嘗試釋放狀態;

    tryRelease可以保證原子化的將狀態設置回去,固然須要使用compareAndSet來保證。若是釋放狀態成功以後,就會進入後繼節點的喚醒過程。

2. 喚醒當前節點的後繼節點所包含的線程。

    經過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續acquire狀態。

回顧整個資源的獲取和釋放過程:

在獲取時,維護了一個sync隊列,每一個節點都是一個線程在進行自旋,而依據就是本身是不是首節點的後繼而且可以獲取資源;

在釋放時,僅僅須要將資源還回去,而後通知一下後繼節點並將其喚醒。

這裏須要注意,隊列的維護(首節點的更換)是依靠消費者(獲取時)來完成的,也就是說在知足了自旋退出的條件時的一刻,這個節點就會被設置成爲首節點。

 

隊列裏的節點線程的禁用和喚醒是經過LockSupport的park()及unpark(),調用的unsafe、底層也是native的實現。

關於java lock的淺析可見:http://jm-blog.aliapp.com/?p=414 

 

 

共享模式和以上的獨佔模式有所區別,分別調用acquireShared(int arg)和releaseShared(int arg)獲取共享模式的狀態。

以文件的查看爲例,若是一個程序在對其進行讀取操做,那麼這一時刻,對這個文件的寫操做就被阻塞,相反,這一時刻另外一個程序對其進行一樣的讀操做是能夠進行的。若是一個程序在對其進行寫操做,

那麼全部的讀與寫操做在這一時刻就被阻塞,直到這個程序完成寫操做。

以讀寫場景爲例,描述共享和獨佔的訪問模式,以下圖所示:

 

上圖中,紅色表明被阻塞,綠色表明能夠經過。

 

在上述對同步器AbstractQueuedSynchronizer進行了實現層面的分析以後,咱們經過一個例子來加深對同步器的理解:

設計一個同步工具,該工具在同一時刻,只能有兩個線程可以並行訪問,超過限制的其餘線程進入阻塞狀態。

對於這個需求,能夠利用同步器完成一個這樣的設定,定義一個初始狀態,爲2,一個線程進行獲取那麼減1,一個線程釋放那麼加1,狀態正確的範圍在[0,1,2]三個之間,當在0時,表明再有新的線程對資源進行獲取時只能進入阻塞狀態(注意在任什麼時候候進行狀態變動的時候均須要以CAS做爲原子性保障)。因爲資源的數量多於1個,同時能夠有兩個線程佔有資源,所以須要實現tryAcquireShared和tryReleaseShared方法。

複製代碼

1 public class TwinsLock implements Lock {
 2     private final Sync  sync    = new Sync(2);
 3 
 4     private static final class Sync extends AbstractQueuedSynchronizer {
 5         private static final long   serialVersionUID    = -7889272986162341211L;
 6 
 7         Sync(int count) {
 8             if (count <= 0) {
 9                 throw new IllegalArgumentException("count must large than zero.");
10             }
11             setState(count);
12         }
13 
14         public int tryAcquireShared(int reduceCount) {
15             for (;;) {
16                 int current = getState();
17                 int newCount = current - reduceCount;
18                 if (newCount < 0 || compareAndSetState(current, newCount)) {
19                     return newCount;
20                 }
21             }
22         }
23 
24         public boolean tryReleaseShared(int returnCount) {
25             for (;;) {
26                 int current = getState();
27                 int newCount = current + returnCount;
28                 if (compareAndSetState(current, newCount)) {
29                     return true;
30                 }
31             }
32         }
33     }
34 
35     public void lock() {
36         sync.acquireShared(1);
37     }
38 
39     public void lockInterruptibly() throws InterruptedException {
40         sync.acquireSharedInterruptibly(1);
41     }
42 
43     public boolean tryLock() {
44         return sync.tryAcquireShared(1) >= 0;
45     }
46 
47     public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
48         return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
49     }
50 
51     public void unlock() {
52         sync.releaseShared(1);
53     }
54 
55     public Condition newCondition() {
56         return null;
57     }
58 }

複製代碼

 

這裏咱們編寫一個測試來驗證TwinsLock是否可以正常工做並達到預期。

複製代碼

1 public class TwinsLockTest {
 2 
 3     @Test
 4     public void test() {
 5         final Lock lock = new TwinsLock();
 6 
 7         class Worker extends Thread {
 8             public void run() {
 9                 while (true) {
10                     lock.lock();
11 
12                     try {
13                         Thread.sleep(1000L);
14                 System.out.println(Thread.currentThread());
15                         Thread.sleep(1000L);
16                     } catch (Exception ex) {
17 
18                     } finally {
19                         lock.unlock();
20                     }
21                 }
22             }
23         }
24 
25         for (int i = 0; i < 10; i++) {
26             Worker w = new Worker();
27             w.start();
28         }
29 
30         new Thread() {
31             public void run() {
32                 while (true) {
33 
34                     try {
35                         Thread.sleep(200L);
36                         System.out.println();
37                     } catch (Exception ex) {
38 
39                     }
40                 }
41             }
42         }.start();
43 
44         try {
45             Thread.sleep(20000L);
46         } catch (InterruptedException e) {
47             e.printStackTrace();
48         }
49     }
50 }

複製代碼

 

上述測試用例的邏輯主要包括:

1. 打印線程

Worker在兩次睡眠之間打印自身線程,若是一個時刻只能有兩個線程同時訪問,那麼打印出來的內容將是成對出現。

2. 分隔線程

不停的打印換行,能讓Worker的輸出看起來更加直觀。

該測試的結果是在一個時刻,僅有兩個線程可以得到到鎖,並完成打印,而表象就是打印的內容成對出現。

利用CAS(compare and set)是不會進行阻塞的,只會一個返回成功,一個返回失敗,保證了一致性。

CAS操做同時具備volatile讀和volatile寫的內存語義。

AQS這部分轉載於http://ifeve.com/introduce-abstractqueuedsynchronizer/ 

 

 2.1 ConcurrentHashMap

    ConcurrentHashMap是線程安全的HashMap的實現,默認構造一樣有initialCapacity和loadFactor屬性,不過還多了一個concurrencyLevel屬性,三屬性默認值分別爲1六、0.75及16。其內部使用鎖分段技術,維持這鎖Segment的數組,在Segment數組中又存放着Entity[]數組,內部hash算法將數據較均勻分佈在不一樣鎖中。

put操做:並無在此方法上加上synchronized,首先對key.hashcode進行hash操做,獲得key的hash值。hash操做的算法和map也不一樣,根據此hash值計算並獲取其對應的數組中的Segment對象(繼承自ReentrantLock),接着調用此Segment對象的put方法來完成當前操做。

ConcurrentHashMap基於concurrencyLevel劃分出了多個Segment來對key-value進行存儲,從而避免每次put操做都得鎖住整個數組。在默認的狀況下,最佳狀況下可容許16個線程併發無阻塞的操做集合對象,儘量地減小併發時的阻塞現象。

get(key)

    首先對key.hashCode進行hash操做,基於其值找到對應的Segment對象,調用其get方法完成當前操做。而Segment的get操做首先經過hash值和對象數組大小減1的值進行按位與操做來獲取數組上對應位置的HashEntry。在這個步驟中,可能會由於對象數組大小的改變,以及數組上對應位置的HashEntry產生不一致性,那麼ConcurrentHashMap是如何保證的?

    對象數組大小的改變只有在put操做時有可能發生,因爲HashEntry對象數組對應的變量是volatile類型的,所以能夠保證如HashEntry對象數組大小發生改變,讀操做可看到最新的對象數組大小。

    在獲取到了HashEntry對象後,怎麼能保證它及其next屬性構成的鏈表上的對象不會改變呢?這點ConcurrentHashMap採用了一個簡單的方式,即HashEntry對象中的hash、key、next屬性都是final的,這也就意味着沒辦法插入一個HashEntry對象到基於next屬性構成的鏈表中間或末尾。這樣就能夠保證當獲取到HashEntry對象後,其基於next屬性構建的鏈表是不會發生變化的。

    ConcurrentHashMap默認狀況下采用將數據分爲16個段進行存儲,而且16個段分別持有各自不一樣的鎖Segment,鎖僅用於put和remove等改變集合對象的操做,基於volatile及HashEntry鏈表的不變性實現了讀取的不加鎖。這些方式使得ConcurrentHashMap可以保持極好的併發支持,尤爲是對於讀遠比插入和刪除頻繁的Map而言,而它採用的這些方法也可謂是對於Java內存模型、併發機制深入掌握的體現。

 

2.2 ReentrantLock

    在併發包的開始部分介紹了volatile特性及AQS同步器,而這兩部分正是ReentrantLock實現的基礎。經過上面AQS的介紹及原理分析,可知道是以volatile維持的int類型的state值,來判斷線程是執行仍是在syn隊列中等待。

ReentrantLock的實現不只能夠替代隱式的synchronized關鍵字,並且可以提供超過關鍵字自己的多種功能。

    這裏提到一個鎖獲取的公平性問題,若是在絕對時間上,先對鎖進行獲取的請求必定被先知足,那麼這個鎖是公平的,反之,是不公平的,也就是說等待時間最長的線程最有機會獲取鎖,也能夠說鎖的獲取是有序的。ReentrantLock這個鎖提供了一個構造函數,可以控制這個鎖是不是公平的。

    對於公平和非公平的定義是經過對同步器AbstractQueuedSynchronizer的擴展加以實現的,也就是tryAcquire的實現上作了語義的控制。

    公平和非公平性的更多原理分析見於http://ifeve.com/reentrantlock-and-fairness/ 

 

2.3 Condition

    Condition是併發包中提供的一個接口,典型的實現有ReentrantLock,ReentrantLock提供了一個mewCondition的方法,以便用戶在同一個鎖的狀況下能夠根據不一樣的狀況執行等待或喚醒動做。典型的用法可參考ArrayBlockingQueue的實現,下面來看ReentrantLock中

newCondition的實現。

ReentrantLock.newCondition()

    建立一個AbstractQueuedSynchronizer的內部類ConditionObject的對象實例。

ReentrantLock.newCondition().await()

    將當前線程加入此condition的等待隊列中,並將線程置爲等待狀態。

ReentrantLock.newCondition().signal()

    今後condition的等待隊列中獲取一個等待節點,並將節點上的線程喚醒,若是要喚醒所有等待節點的線程,則調用signalAll方法。

 

2.4 CopyOnWriteArrayList

    CopyOnWriteArrayList是一個線程安全、而且在讀操做時無鎖的ArrayList,其具體實現方法以下。

CopyOnWriteArrayList()

    和ArrayList不一樣,此步的作法爲建立一個大小爲0的數組。

add(E)

    add方法並無加上synchronized關鍵字,它經過使用ReentrantLock來保證線程安全。此處和ArrayList的不一樣是每次都會建立一個新的Object數組,此數組的大小爲當前數組大小加1,將以前數組中的內容複製到新的數組中,並將

新增長的對象放入數組末尾,最後作引用切換將新建立的數組對象賦值給全局的數組對象。

remove(E)

    和add方法同樣,此方法也經過ReentrantLock來保證其線程安全,但它和ArrayList刪除元素採用的方式並不同。

    首先建立一個比當前數組小1的數組,遍歷新數組,如找到equals或均爲null的元素,則將以後的元素所有賦值給新的數組對象,並作引用切換,返回true;如未找到,則將當前的元素賦值給新的數組對象,最後特殊處理數組中的最後

一個元素,如最後一個元素等於要刪除的元素,即將當前數組對象賦值爲新建立的數組對象,完成刪除操做,如最後一個元素也不等於要刪除的元素,那麼返回false。

    此方法和ArrayList除了鎖不一樣外,最大的不一樣在於其複製過程並無調用System的arrayCopy來完成,理論上來講會致使性能有必定降低。

get(int)    

    此方法很是簡單,直接獲取當前數組對應位置的元素,這種方法是沒有加鎖保護的,所以可能會出現讀到髒數據的現象。但相對而言,性能會很是高,對於寫少讀多且髒數據影響不大的場景而言是不錯的選擇。

iterator()

    調用iterator方法後建立一個新的COWIterator對象實例,並保存了一個當前數組的快照,在調用next遍歷時則僅對此快照數組進行遍歷,所以遍歷此list時不會拋出ConcurrentModificatiedException。

    與ArrayList的性能對比,在讀多寫少的併發場景中,較之ArrayList是更好的選擇,單線程以及多線程下增長元素及刪除元素的性能不比ArrayList好

 

2.5 CopyOnWriteArraySet

    CopyOnWriteArraySet基於CopyOnWriteArrayList實現,其惟一的不一樣是在add時調用的是CopyOnWriteArrayList的addIfAbsent方法。保證了無重複元素,但在add時每次都要進行數組的遍歷,所以性能會略低於上個。

 

2.6 ArrayBlockingQueue

 

 

2.7 ThreadPoolExecutor

與每次須要時都建立線程相比,線程池能夠下降建立線程的開銷,在線程執行結束後進行的是回收操做,提升對線程的複用。Java中主要使用的線程池是ThreadPoolExecutor,此外還有定時的線程池ScheduledThreadPoolExecutor。

Java裏面線程池的頂級接口是Executor,可是嚴格意義上講Executor並非一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。

比較重要的幾個類:

ExecutorService 真正的線程池接口
ScheduledExecutorService 和Time/TimeTask相似,解決須要任務重複執行的問題
ThreadPoolExecutor ExecutorService的默認實現
SchedulesThreadPoolExecutor 繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,週期性任務調度的類實現

要配置一個線程池是比較複雜的,尤爲是對於線程池的原理不是很清楚的狀況下,頗有可能配置的線程池不是較優的,所以在Executors類裏面提供了一些靜態工廠,生成一些經常使用的線程池。

1. newSingleThreadExecutor

建立一個單線程的線程池。這個線程池只有一個線程在工做,也就是至關於單線程串行執行全部任務。若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它。此線程池保證全部任務的執行順序按照任務的提交順序執行。

2.newFixedThreadPool

建立固定大小的線程池。每次提交一個任務就建立一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,若是某個線程由於執行異常而結束,那麼線程池會補充一個新線程。

3. newCachedThreadPool

建立一個可緩存的線程池。若是線程池的大小超過了處理任務所須要的線程,

那麼就會回收部分空閒(60秒不執行任務)的線程,當任務數增長時,此線程池又能夠智能的添加新線程來處理任務。此線程池不會對線程池大小作限制,線程池大小徹底依賴於操做系統(或者說JVM)可以建立的最大線程大小。

4.newScheduledThreadPool

建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。

PS:但須要注意使用,newSingleThreadExecutor和newFixedThreadPool將超過處理的線程放在隊列中,但工做線程較多時,會引發過多內存被佔用,然後二者返回的線程池是沒有線程上線的,因此在使用時須要小心,建立過多的線程容易引發服務器的宕機。

使用ThreadPoolExecutor自定義線程池,具體使用時需根據系統及JVM的配置設置適當的參數,下面是一示例:

1 int corePoolSize = Runtime.getRuntime().availableProcessors();
2 threadsPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, 10l, TimeUnit.SECONDS,
3                new LinkedBlockingQueue<Runnable>(2000));

2.8 Future和FutureTask

Future是一個接口,FutureTask是一個具體實現類。這裏先經過兩個場景看看其處理方式及優勢。

場景1,

如今經過調用一個方法從遠程獲取一些計算結果,假設有這樣一個方法:

1 HashMap data = getDataFromRemote();

若是是最傳統的同步方式的使用,咱們將一直等待getDataFromRemote()的返回,而後才能繼續後面的工做。這個函數是從遠程獲取數據的計算結果的,若是須要的時間很長,而且後面的那部分代碼與這些數據沒有關係的話,阻塞在這裏等待結果就會比較浪費時間。如何改進呢?

可以想到的辦法就是調用函數後立刻返回,而後繼續向下執行,等須要用數據時再來用或者再來等待這個數據。具體實現有兩種方式:一個是用Future,另外一個使用回調。

Future的用法

1 Future<HashMap> future = getDataFromRemote2();
2 //do something
3 HashMap data = future.get();

能夠看到,咱們調用的方法返回一個Future對象,而後接着進行本身的處理,後面經過future.get()來獲取真正的返回值。也即,在調用了getDataFromRemote2後,就已經啓動了對遠程計算結果的獲取,同時本身的線程還在繼續處理,直到須要時再獲取數據。來看一下getDataFromRemote2的實現:

複製代碼

1 privete Future<HashMap> getDataFromRemote2(){
2     return threadPool.submit(new Callable<HashMap>(){
3         public HashMap call() throws Exception{
4             return getDataFromRemote();
5         }
6     });
7 }

複製代碼

能夠看到,在getDataFromRemote2中仍是使用了getDataFromRemote來完成具體操做,而且用到了線程池:把任務加入到線程池中,把Future對象返回出去。咱們調用了getDataFromRemote2的線程,而後返回來繼續下面的執行,而背後是另外的線程在進行遠程調用及等待的工做。get方法也可設置超時時間參數,而不是一直等下去。

場景2,

key-value的形式存儲鏈接,若key存在則獲取,若不存在這個key,則建立新鏈接並存儲。

傳統的方式會使用HashMap來存儲並判斷key是否存在而實現鏈接的管理。而這在高併發的時候會出現屢次建立鏈接的現象。那麼新的處理方式又是怎樣呢?

經過ConcurrentHashMap及FutureTask實現高併發狀況的正確性,ConcurrentHashMap的分段鎖存儲知足數據的安全性又不影響性能,FutureTask的run方法調用Sync.innerRun方法只會執行Runnable的run方法一次(即便是高併發狀況)。

 

 

2.9 併發容器

在JDK中,有一些線程不安全的容器,也有一些線程安全的容器。併發容器是線程安全容器的一種,可是併發容器強調的是容器的併發性,也就是說不只追求線程安全,還要考慮併發性,提高在容器併發環境下的性能。

加鎖互斥的方式確實可以方便地完成線程安全,不過代價是下降了併發性,或者說是串行了。而併發容器的思路是儘可能不用鎖,比較有表明性的是以CopyOnWrite和Concurrent開頭的幾個容器。CopyOnWrite容器的思路是在更改容器的時候,把容器寫一份進行修改,保證正在讀的線程不受影響,這種方式用在讀多寫少的場景中會很是好,由於實質上是在寫的時候重建了一次容器。而以Concurrent開頭的容器的具體實現方式則不徹底相同,整體來講是儘可能保證讀不加鎖,而且修改時不影響讀,因此達到比使用讀寫鎖更高的併發性能。好比上面所說的ConcurrentHashMap,其餘的併發容器的具體實現,可直接分析JDK中的源碼。

相關文章
相關標籤/搜索