JDK 1.5 以前, 主要包括:數組
Vector
和 Hashtable
)Collections.synchronizedXxx
)這些類的共同特徵是, 公共方法都是由 synchronized
來修飾的, 以限制一次只能有一個線程能訪問容器.安全
老的容器自身並不支持複合操做, 包括:併發
好在老的容器類遵循一個支持 客戶端加鎖 的同步策略. 來解決複合運算的問題:函數
解決迭代和導航:ui
synchronized(list) { // 確保調用 size() 後, list 大小不會改變 for (int i = 0; i < list.size(); ++i) { doSomething(list[i]); } }
解決條件運算:this
synchronized(list) { // 確保調用 size() 後, list 大小不會改變 int lastIndex = list.size() - 1; list.remove(lastIndex); }
這樣作的弊端是:線程
作任何操做都要鎖住整個容器, 效率低, 容易出錯.設計
ConcurrentModificationException
Collection
進行迭代的標準方法是使用 Iterator
, 不管是顯式使用仍是 經過 JDK 1.5 以後的 for-each
語法. code
在 迭代 的時候, 仍有其餘線程在併發修改容器的可能性, 使用迭代器仍不可避免地須要在迭代期間對容器加鎖.xml
迭代器在併發修改的時候, 策略是 及時失敗(fail-fast) 的: 當發現迭代器被修改後(如: add
和 remove
), 會拋出一個未檢查的 ConcurrentModificationException
.
以 ArrayList
爲例子, 其父類 AbstractList
內部有一個字段名爲 modCount
的計數器. 任何改變 List
大小的操做都須要改變 modCount
這個值.
這個值會被用來在迭代或者時, 檢查有沒有修改容器, 套路是這樣的:
修改時:
if (modCount != expectedModCount) throw new ConcurrentModificationException(); } // Add or Remove // ....... expectedModCount = modCount;
迭代:
public E prev/next() { if (modCount != expectedModCount) throw new ConcurrentModificationException(); } // Other..... }
Note: ConcurrentModificationException
也能夠出現單線程的代碼中, 好比當在迭代期間調用 remove
方法
有時候, 一些操做會隱含的調用迭代器, 好比:
調用 toString()
方法, 尤爲是寫 log 時, 有
log("Set:" + set);
這樣的語句.
hashCode
和 equals
方法, 如下是 HashTable
的 hashCode
和 equals
方法:
public synchronized boolean equals(Object o) { if (o == this) return true; if (!(o instanceof Map)) return false; Map<?,?> t = (Map<?,?>) o; if (t.size() != size()) return false; try { Iterator<Map.Entry<K,V>> i = entrySet().iterator(); while (i.hasNext()) { Map.Entry<K,V> e = i.next(); K key = e.getKey(); V value = e.getValue(); if (value == null) { if (!(t.get(key)==null && t.containsKey(key))) return false; } else { if (!value.equals(t.get(key))) return false; } } } catch (ClassCastException unused) { return false; } catch (NullPointerException unused) { return false; } return true; } public synchronized int hashCode() { int h = 0; if (count == 0 || loadFactor < 0) return h; // Returns zero loadFactor = -loadFactor; // Mark hashCode computation in progress Entry<?,?>[] tab = table; for (Entry<?,?> entry : tab) { while (entry != null) { h += entry.hashCode(); entry = entry.next; } } loadFactor = -loadFactor; // Mark hashCode computation complete return h; }
另外 containAll
, removeAll
和 retainAll
也會產生迭代.
JDK 1.5 後, 新增長了:
ConcurrentHashMap
, 來替代同步的 Map
實現, 增長了 put-if-absent
, 替換和條件刪除CopyOnWriteArrayList
, 是 List
相應的同步實現Queue
, 用來臨時保存正在等待進一步處理的一系列元素, 實現包括ConcurrentLinkedQueue
, 一個傳統的 FIFO 隊列PriorityQueue
, 一個(非併發)居右優先級順序的隊列BlockingQueue
, 拓展自 Queue
, 增長了可阻塞的插入和獲取操做. JDK 1.6 後, 新增長了
Deque
和 BlockingDeque
, 分別擴展了 Queue
和 BlockingQueue
:
Deque
接口, 實現類是 ArrayDeque
, 不阻塞BlockingDeque
接口, 實現類是 LinkedBlockingDeque
, 阻塞.ConcurrentSkipListMap
和 ConcurrentSkipListSet
, 做爲 SortedMap
和 SortedSet
的併發替代品
Note: 從一個空的Queue
中取元素, 並不會阻塞, 而是返回 null
ConcurrentHashMap
在 ConcurrentHashMap
以前, HashTable
和 SynchronizedMap
都是經過給整個方法加 synchronized
來達到同步的, 這樣限制某一時刻只有一個線程能夠訪問容器.
ConcurrentHashMap
使用一個更加細化的鎖機制, 名叫分離鎖. 這個機制容許更深層次的共享訪問:
因爲併發環境中, Map 的大小一般是動態的, size
和 isEmpty
返回的只是個估算值(可能返回後接着過時).
支持的複合操做:
CopyOnWriteArrayList
寫入時複製(COW)
容器的線程安全原理:
只要不可邊對象被正確發佈, 那麼訪問它將不須要更多的同步.
所以, 每次添加/修改一個元素, 容器內就會新建立一個新的數組, 容器底層的數組會指向這個新數組. 舊數組仍然被使用, 直到沒有引用後被 GC 回收.
因爲 COW 複製數組有開銷, 因此 COW 適用於容器迭代操做遠遠高於對容器修改的頻率.
FAQ: Arrays.copyOf
和 System.arraycopy
區別?
Arrays.copyOf
不只會複製元素, 還會建立新的數組. System.arrayCopy
拷貝到一個現有數組, Arrays.copyOf
實現中用了 System.arrayCopy
;
生產者-消費者設計分離了 "識別須要完成的工做" 和 "執行工做". 該模式不會發現一個工做便當即處理, 而是把工做置入一個任務清單中:
最多見的生產者-消費者設計是: 線程池和工做隊列的結合
在設計初期就使用阻塞隊列創建對資源的管理, 提前作這件事情會比往後再修復容易的多.
Blocking queue 提供了可阻塞的 put
和 take
方法. 常見的實現有:
LinkedBlockedQueue
, FIFO, 鏈表實現, 隊列首 take, 隊列尾 put.ArrayBlockingQueue
, FIFO, 數組實現, 能夠在 putIndex(隊列尾) 插入, 從 takeIndex(隊列首) 取出.PriorityBlockingQueue
, 根據 Comparator 排序順序取出SynchronousQueue
, 生產線程直接和消費線程對接, 若是生產線程找不到消費者或反之, 則, put 和 take 會一直阻止. 只有在消費者充足的時候比較適合, 他們總能爲下一個任務作好準備.雙端隊列用來實現 竊取工做(work stealing) 模式.
在傳統的 生產者-消費者 設計中, 全部的消費者只共享一個工做隊列.
而在 竊取工做 設計中, 每個消費者都有一個本身的雙端隊列. 若是一個消費者完成了本身雙端隊列中的所有工做, 它能夠偷取其餘消費者的雙端隊列的 末尾 任務(其餘消費者仍然從隊列 首 取任務).
由於工做者線程並不會競爭一個共享的任務隊列, 因此 竊取工做 模式比傳統的 生產者-消費者 設計有更好的伸縮性.
阻塞: 線程被掛起, 狀態變爲BLOCKED
, WAITING
或是 TIMED_WAITING
, 等待直到一個事件發生才能繼續進行.
BlockingQueue
的 put
和 take
方法會拋出一個受檢查的 InterruptedException
, 這個異常說明這是個阻塞方法, 能夠被中斷來提早結束阻塞.
處理中斷的方法:
InterruptedException
. 傳遞給調用者, 能夠對其中特定活動進行簡潔地清理後, 再拋出.恢復中斷. 當代碼是 Runnable
的一部分時, 必須捕獲 InterruptedException
. 而且, 在當前線程中調用 interrupt
從新設置中斷狀態(拋出異常會清理中斷標誌位), 這樣調用棧中更高層代碼能夠發現中斷已經發生.
try { processTask(queue.take()); } catch (InterruptedException e) { // 恢復中斷狀態 Thread.currentThread().interrupt(); }
Synchronizer 是一個對象, 它根據自己的狀態調節線程的控制流. 主要類型有:
他們的特性: 封裝狀態, 這些狀態絕對着線程執行到某一點時是經過仍是被迫等待.
直到 閉鎖 到達 終點狀態 以前, 門一直是關閉的, 沒有線程可以經過, 在 終點狀態 到來的時候, 門開了, 容許全部線程經過. 一旦到了終點狀態, 他就 不能 再改變狀態了.
用例:
FutureTask 描述了一個抽象的可攜帶結果的計算. FutureTask的計算經過 Callable
實現.
Callable
等價於一個可攜帶結果的 Runnable
. Callable
有三種狀態:
要獲取 FutureTask
的結果, 能夠調用 get()
方法. 調用 get()
時, 有兩種狀況:
FutureTask
保證了計算結果將計算線程安全的傳遞到當前線程.
假如FutureTask
執行的任務有異常拋出, 則異常會被封裝在 ExecutionException
裏. 如下代碼能夠從 ExecutionException
中取出異常:
try { futureTask.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceOf XXXException) { // 本身想要捕獲的異常 } else { throw launderThrowable(cause); } } public static RuntimeException launderThrowable(Throwable cause) { if (t instanceOf RuntimeException) { return (RuntimeException)t; } else if (t instanceOf Error) { } else { throw new IllegalStateException("Not unchecked", t); } }
計數信號量用來控制可以同時訪問某種資源的活動的數量, 或者同時執行某一操做的數量.
使用計數信號量以前須要先構造一個, 構造時能夠將許可集(permit)總數傳遞進去. 在使用計數信號量時, 要先嚐試獲取(acquire)一個許可, 假如此時有剩餘許可則繼續執行, 若沒有, 則 阻塞. 使用完以後, 要手動釋放(release)一個許可.
用處:
關卡用來阻塞一組線程, 直到 全部線程 達到一個條件. 就像一些家庭成員指定商場的一個集合地點:"咱們每一個人6:00在麥當勞見, 到了之後不見不散, 以後咱們再決定接下來作什麼".
關卡 與 閉鎖 的不一樣:
關卡: 等待的是其餘線程, 能夠重複被使用 閉鎖: 等待的是事件, 只能使用一次
當一個線程到達關卡點時, 調用 await
, await
會被阻塞, 直到全部線程都到達關卡點.
若是對 await
的調用超時, 或者阻塞中的線程被中斷, 那麼關卡就被認爲是 失敗 的.
await
, 那麼當這個線程 await
超時, 這個線程會拋出 TimeoutException
異常, 其餘調用 barrior.await()
的線程會拋出 BrokenBarrierException
;若是成功地經過關卡, await
爲每個線程返回一個惟一的到達索引號, 能夠用來 "選舉" 產生一個領導, 在下一次迭代中承擔一些特殊工做.
CyclicBarrier
也容許你向構造函數傳遞一個 關卡行爲(Barrier action), 這是一個 Runnable, 當成功經過關卡的時候, 會(在 某一個 子任務線程中) 執行, 可是在阻塞線程被釋放以前是不能執行的.
Exchanger 是關卡的另外一種形式, 它是一種兩步關卡, 在關卡點會交換數據.