在介紹併發容器以前,先分析下普通的容器,以及相應的實現,方便後續的對比。html
Hashtable、HashMap、TreeMap 都是最多見的一些 Map 實現,是以鍵值對的形式存儲和操做數據的容器類型。java
Hashtable 是早期 Java 類庫提供的一個哈希表實現,自己是同步的,不支持 null 鍵和值,因爲同步致使的性能開銷,因此已經不多被推薦使用。node
HashMap 是應用更加普遍的哈希表實現,行爲上大體上與 HashTable 一致,主要區別在於 HashMap 不是同步的,支持 null 鍵和值等。一般狀況下,HashMap 進行 put 或者 get 操做,能夠達到常數時間的性能,因此它是絕大部分利用鍵值對存取場景的首選,好比,實現一個用戶 ID 和用戶信息對應的運行時存儲結構。git
HashMap 明確聲明不是線程安全的數據結構,若是忽略這一點,簡單用在多線程場景裏,不免會出現問題,如 HashMap 在併發環境可能出現無限循環佔用 CPU、size 不許確等詭異的問題。github
TreeMap 則是基於紅黑樹的一種提供順序訪問的 Map,和 HashMap 不一樣,它的 get、put、remove 之類操做都是 O(log(n))的時間複雜度,具體順序能夠由指定的 Comparator 來決定,或者根據鍵的天然順序來判斷。面試
Hashtable是經過"拉鍊法"實現的哈希表,結構以下圖所示:算法
1. 定義後端
public class Hashtable<K,V>
extends Dictionary<K,V>
implements Map<K,V>, Cloneable, java.io.Serializable{}
複製代碼
Hashtable 繼承於 Dictionary 類,實現了 Map, Cloneable, java.io.Serializable接口。數組
2. 構造方法緩存
Hashtable 一共提供了 4 個構造方法:
public Hashtable(int initialCapacity, float loadFactor): 用指定初始容量和指定負載因子構造一個新的空哈希表。
public Hashtable(int initialCapacity):用指定初始容量和默認的負載因子 (0.75) 構造一個新的空哈希表。
public Hashtable():默認構造函數,容量爲 11,負載因子爲 0.75。
- public Hashtable(Map<? extends K, ? extends V> t):構造一個與給定的 Map 具備相同映射關係的新哈希表。
複製代碼
它包括幾個重要的成員變量:table, count, threshold, loadFactor, modCount。
fail-fast機制舉例:有兩個線程(線程A,線程B),其中線程A負責遍歷list、線程B修改list。線程A在遍歷list過程的某個時候(此時expectedModCount = modCount=N),線程啓動,同時線程B增長一個元素,這是modCount的值發生改變(modCount + 1 = N + 1)。線程A繼續遍歷執行next方法時,通告checkForComodification方法發現expectedModCount = N ,而modCount = N + 1,二者不等,這時就拋出ConcurrentModificationException 異常,從而產生fail-fast機制。
3. PUT操做
put 方法的整個流程爲:
public synchronized V put(K key, V value) {
// Make sure the value is not null確保value不爲null
if (value == null) {
throw new NullPointerException();
}
// Makes sure the key is not already in the hashtable.
//確保key不在hashtable中
//首先,經過hash方法計算key的哈希值,並計算得出index值,肯定其在table[]中的位置
//其次,迭代index索引位置的鏈表,若是該位置處的鏈表存在相同的key,則替換value,返回舊的value
Entry tab[] = table;
int hash = hash(key);
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
V old = e.value;
e.value = value;
return old;
}
}
modCount++;
if (count >= threshold) {
// Rehash the table if the threshold is exceeded
//若是超過閥值,就進行rehash操做
rehash();
tab = table;
hash = hash(key);
index = (hash & 0x7FFFFFFF) % tab.length;
}
// Creates the new entry.
//將值插入,返回的爲null
Entry<K,V> e = tab[index];
// 建立新的Entry節點,並將新的Entry插入Hashtable的index位置,並設置e爲新的Entry的下一個元素
tab[index] = new Entry<>(hash, key, value, e);
count++;
return null;
}
複製代碼
4. Get操做
首先經過 hash()方法求得 key 的哈希值,而後根據 hash 值獲得 index 索引。而後迭代鏈表,返回匹配的 key 的對應的 value;找不到則返回 null。
public synchronized V get(Object key) {
Entry tab[] = table;
int hash = hash(key);
int index = (hash & 0x7FFFFFFF) % tab.length;
for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
if ((e.hash == hash) && e.key.equals(key)) {
return e.value;
}
}
return null;
}
複製代碼
5. rehash擴容
protected void rehash() {
int oldCapacity = table.length;
Entry<?,?>[] oldMap = table;
// overflow-conscious code
int newCapacity = (oldCapacity << 1) + 1;
if (newCapacity - MAX_ARRAY_SIZE > 0) {
if (oldCapacity == MAX_ARRAY_SIZE)
// Keep running with MAX_ARRAY_SIZE buckets
return;
newCapacity = MAX_ARRAY_SIZE;
}
Entry<?,?>[] newMap = new Entry<?,?>[newCapacity];
modCount++;
threshold = (int)Math.min(newCapacity * loadFactor, MAX_ARRAY_SIZE + 1);
table = newMap;
for (int i = oldCapacity ; i-- > 0 ;) {
for (Entry<K,V> old = (Entry<K,V>)oldMap[i] ; old != null ; ) {
Entry<K,V> e = old;
old = old.next;
int index = (e.hash & 0x7FFFFFFF) % newCapacity;
e.next = (Entry<K,V>)newMap[index];
newMap[index] = e;
}
}
}
複製代碼
6. Remove方法
remove方法主要邏輯以下:
Hash值的不一樣實現:JDK7 Vs JDK8
以上給出的代碼均爲jdk7中的實現,注意到在jdk7和8裏面,關於元素hash值的計算方法是不同的。
//利用異或,移位等運算,對key的hashcode進一步進行計算以及二進制位的調整等來保證最終獲取的存儲位置儘可能分佈均勻
final int hash(Object k) {
int h = hashSeed;
if (0 != h && k instanceof String) {
return sun.misc.Hashing.stringHash32((String) k);
}
h ^= k.hashCode();
h ^= (h >>> 20) ^ (h >>> 12);
return h ^ (h >>> 7) ^ (h >>> 4);
}
複製代碼
以上hash函數計算出的值,經過indexFor進一步處理來獲取實際的存儲位置
//返回數組下標
static int indexFor(int h, int length) {
return h & (length-1);
}
複製代碼
int hash = key.hashCode();
int index = (hash & 0x7FFFFFFF) % tab.length;
複製代碼
注意到都使用到了hashCode,這個方法是在Object方法中定義的,
@HotSpotIntrinsicCandidate
public native int hashCode();
複製代碼
能夠看到是Object裏沒有給出hashCode的實現,只是聲明爲一個native方法,說明Java會去調用本地C/C++對hashcode的具體實現。
在JDK8及之後,能夠經過以下指令來獲取到全部的hash算法,
java -XX:+PrintFlagsFinal | grep hashCode
複製代碼
具體大概有以下幾種,第5個算法是默認使用的,用到了異或操做和一些偏移算法來生成hash值。
0 == Lehmer random number generator, 1 == "somehow" based on memory address 2 == always 1 3 == increment counter 4 == memory based again ("somehow") 5 == Marsaglia XOR-Shift algorithm, that has nothing to do with memory.
HashTable相對於HashMap的最大特色就是線程安全,全部的操做都是被synchronized鎖保護的
參考:
HashMap是java中使用最爲頻繁的map類型,其讀寫效率較高,可是由於其是非同步的,即讀寫等操做都是沒有鎖保護的,因此在多線程場景下是不安全的,容易出現數據不一致的問題。
HashMap的結構和HashTable一致,都是使用是由數組和鏈表兩種數據結構組合而成的,不一樣的是在JDK8裏面引入了紅黑樹,當鏈表長度大於8時,會將鏈表轉換爲紅黑樹。
HashMap的成員變量和HashTable同樣,在進行初始化的時候,都會設置一個容量值(capacity)和加載因子(loadFactor)。
HashMap的核心構造函數以下,主要是設置負載因子,以及根據用戶的設定容量,找到一個不小於該容量的閾值。
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
複製代碼
因爲HashMap和HashTable有實現上有諸多類似之處,這裏會重點介紹hashMap在jdk7和8中的不一樣實現。
Hash運算
無論增長、刪除、查找鍵值對,定位到哈希桶數組的位置都是很關鍵的第一步。都須要用到hash算法,jdk7和8中的算法基本一致,具體實現以下:
static final int hash(Object key) { //jdk1.8 & jdk1.7
int h;
// h = key.hashCode() 爲第一步 取hashCode值
// h ^ (h >>> 16) 爲第二步 高位參與運算
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
複製代碼
而後利用獲得的hash值與數組長度取模,獲得相應的index。
如下圖示實例,給出了計算過程,
Get操做
public V get(Object key) {
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
/**
* Implements Map.get and related methods
*
* @param hash hash for key
* @param key the key
* @return the node, or null if none
*/
final Node<K,V> getNode(int hash, Object key) {
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
if ((e = first.next) != null) {
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
複製代碼
Get操做比較簡單:
PUT操做
PUT操做的執行過程以下:
①.判斷鍵值對數組table[i]是否爲空或爲null,不然執行resize()進行擴容;
②.根據鍵值key計算hash值獲得插入的數組索引i,若是table[i]==null,直接新建節點添加,轉向⑥,若是table[i]不爲空,轉向③;
③.判斷table[i]的首個元素是否和key同樣,若是相同直接覆蓋value,不然轉向④,這裏的相同指的是hashCode以及equals;
④.判斷table[i] 是否爲treeNode,即table[i] 是不是紅黑樹,若是是紅黑樹,則直接在樹中插入鍵值對,不然轉向⑤;
⑤.遍歷table[i],判斷鏈表長度是否大於8,大於8的話把鏈表轉換爲紅黑樹,在紅黑樹中執行插入操做,不然進行鏈表的插入操做;遍歷過程當中若發現key已經存在直接覆蓋value便可;
⑥.插入成功後,判斷實際存在的鍵值對數量size是否超多了最大容量threshold,若是超過,進行擴容。
1 public V put(K key, V value) {
2 // 對key的hashCode()作hash
3 return putVal(hash(key), key, value, false, true);
4 }
5
6 final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
7 boolean evict) {
8 Node<K,V>[] tab; Node<K,V> p; int n, i;
9 // 步驟①:tab爲空則建立
10 if ((tab = table) == null || (n = tab.length) == 0)
11 n = (tab = resize()).length;
12 // 步驟②:計算index,並對null作處理
13 if ((p = tab[i = (n - 1) & hash]) == null)
14 tab[i] = newNode(hash, key, value, null);
15 else {
16 Node<K,V> e; K k;
17 // 步驟③:節點key存在,直接覆蓋value
18 if (p.hash == hash &&
19 ((k = p.key) == key || (key != null && key.equals(k))))
20 e = p;
21 // 步驟④:判斷該鏈爲紅黑樹
22 else if (p instanceof TreeNode)
23 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
24 // 步驟⑤:該鏈爲鏈表
25 else {
26 for (int binCount = 0; ; ++binCount) {
27 if ((e = p.next) == null) {
28 p.next = newNode(hash, key,value,null);
//鏈表長度大於8轉換爲紅黑樹進行處理
29 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
30 treeifyBin(tab, hash);
31 break;
32 }
// key已經存在直接覆蓋value
33 if (e.hash == hash &&
34 ((k = e.key) == key || (key != null && key.equals(k))))
35 break;
36 p = e;
37 }
38 }
39
40 if (e != null) { // existing mapping for key
41 V oldValue = e.value;
42 if (!onlyIfAbsent || oldValue == null)
43 e.value = value;
44 afterNodeAccess(e);
45 return oldValue;
46 }
47 }
48 ++modCount;
49 // 步驟⑥:超過最大容量 就擴容
50 if (++size > threshold)
51 resize();
52 afterNodeInsertion(evict);
53 return null;
54 }
複製代碼
Resize擴容操做
因爲JDK8引入了紅黑樹,因此在實現上JDK7和8的resize過程不太一致。
首先是JDK7的實現,
1 void resize(int newCapacity) { //傳入新的容量
2 Entry[] oldTable = table; //引用擴容前的Entry數組
3 int oldCapacity = oldTable.length;
4 if (oldCapacity == MAXIMUM_CAPACITY) { //擴容前的數組大小若是已經達到最大(2^30)了
5 threshold = Integer.MAX_VALUE; //修改閾值爲int的最大值(2^31-1),這樣之後就不會擴容了
6 return;
7 }
8
9 Entry[] newTable = new Entry[newCapacity]; //初始化一個新的Entry數組
10 transfer(newTable); //!!將數據轉移到新的Entry數組裏
11 table = newTable; //HashMap的table屬性引用新的Entry數組
12 threshold = (int)(newCapacity * loadFactor);//修改閾值
13 }
複製代碼
這裏就是使用一個容量更大的數組來代替已有的容量小的數組,transfer()方法將原有Entry數組的元素拷貝到新的Entry數組裏。
1 void transfer(Entry[] newTable) {
2 Entry[] src = table; //src引用了舊的Entry數組
3 int newCapacity = newTable.length;
4 for (int j = 0; j < src.length; j++) { //遍歷舊的Entry數組
5 Entry<K,V> e = src[j]; //取得舊Entry數組的每一個元素
6 if (e != null) {
7 src[j] = null;//釋放舊Entry數組的對象引用(for循環後,舊的Entry數組再也不引用任何對象)
8 do {
9 Entry<K,V> next = e.next;
10 int i = indexFor(e.hash, newCapacity); //!!從新計算每一個元素在數組中的位置
11 e.next = newTable[i]; //標記[1]
12 newTable[i] = e; //將元素放在數組上
13 e = next; //訪問下一個Entry鏈上的元素
14 } while (e != null);
15 }
16 }
17 }
複製代碼
newTable[i]的引用賦給了e.next,也就是使用了單鏈表的頭插入方式,同一位置上新元素總會被放在鏈表的頭部位置;這樣先放在一個索引上的元素終會被放到Entry鏈的尾部(若是發生了hash衝突的話),這一點和Jdk1.8有區別。
具體舉例以下圖所示:
接下來是JDK8中的實現,
/**
* Initializes or doubles table size. If null, allocates in
* accord with initial capacity target held in field threshold.
* Otherwise, because we are using power-of-two expansion, the
* elements from each bin must either stay at same index, or move
* with a power of two offset in the new table.
*
* @return the table
*/
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
複製代碼
因爲Size會進行2次冪的擴展(指長度擴爲原來2倍),因此,元素的位置要麼是在原位置,要麼是在原位置再移動2次冪的位置。經過下面的例子,能夠清楚的看到,21和5在原來的數組中都處於相同的位置,可是在新的數組中,21到了新的位置,位置爲原來的位置加上16,也就是舊的Capacity;可是5還在原來的位置。
假定咱們在Size變爲2倍之後,從新計算hash,由於n變爲2倍,相應的n-1的mask範圍在高位多1bit(紅色),也就是與上面示意圖中紅色部分對應的那一位,若是那位是1,則須要移動到新的位置,不然不變。
回到代碼實現中,直接用舊的hash值與上oldCapacity,由於舊的capacity是2的倍數(二進制爲00000...1000),並且獲取舊index的時候採用hash&(oldCap-1),因此直接e.hash & oldCap
就是判斷新增長的高位是否爲1,爲1則須要移動,不然保持不變。
if ((e.hash & oldCap) == 0)
複製代碼
這種巧妙的方法,同時因爲高位的1和0隨機出現,保證了resize以後元素分佈的離散性。
下圖是這一過程的模擬,
JDK8中的紅黑樹
引入紅黑樹主要是爲了保證在hash分佈極不均勻的狀況下的性能,當一個鏈表太長(大於8)的時候,經過動態的將它替換成一個紅黑樹,這話的話會將時間複雜度從O(n)降爲O(logn)。
爲何HashMap的數組長度必定保持2的次冪?
HashMap Vs HashTable
參考:
TreeMap繼承於AbstractMap,實現了Map, Cloneable, NavigableMap, Serializable接口。
TreeMap 是一個有序的key-value集合,它是經過紅黑樹實現的。該映射根據其鍵的天然順序進行排序,或者根據建立映射時提供的Comparator進行排序,具體取決於使用的構造方法。 TreeMap的基本操做 containsKey、get、put 和 remove 的時間複雜度是 log(n) 。
對於SortedMap來講,該類是TreeMap體系中的父接口,也是區別於HashMap體系最關鍵的一個接口。SortedMap接口中定義的第一個方法Comparator<? super K> comparator();
該方法決定了TreeMap體系的走向,有了比較器,就能夠對插入的元素進行排序了。
TreeMap的查找、插入、更新元素等操做,主要是對紅黑樹的節點進行相應的更新,和數據結構中相似。
TreeSet基於TreeMap實現,底層也是紅黑樹。只是每次插入元素時,value爲一個默認的dummy數據。
HashSet的實現很簡單,內部有一個HashMap的成員變量,全部的Set相關的操做都轉換爲了對HashMapde操做。
public class HashSet<E>
extends AbstractSet<E>
implements Set<E>, Cloneable, java.io.Serializable
{
static final long serialVersionUID = -5024744406713321676L;
private transient HashMap<E,Object> map;
// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();
//其餘操做省略
}
複製代碼
從上面的code能夠看到,內部還定義了一個PRESENT的dummy對象,當添加元素時,直接添加一對鍵值對,key爲元素值,value爲PRESENT。
/**
* Adds the specified element to this set if it is not already present.
* More formally, adds the specified element <tt>e</tt> to this set if
* this set contains no element <tt>e2</tt> such that
* <tt>(e==null ? e2==null : e.equals(e2))</tt>.
* If this set already contains the element, the call leaves the set
* unchanged and returns <tt>false</tt>.
*
* @param e element to be added to this set
* @return <tt>true</tt> if this set did not already contain the specified
* element
*/
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
複製代碼
其餘的操做相似,就是把PRESENT當作value。
首先是定義,
public class LinkedHashMap<K,V>
extends HashMap<K,V>
implements Map<K,V>
{
...
}
複製代碼
能夠看到,LinkedHashMap是HashMap的子類,但和HashMap的無序性不同,LinkedHashMap經過維護一個運行於全部條目的雙向鏈表,保證了元素迭代的順序。該迭代順序能夠是插入順序或者是訪問順序,這個能夠在初始化的時候肯定,默認採用插入順序來維持取出鍵值對的次序。
在成員變量上,與HashMap不一樣的是,引入了before和after兩個變量來記錄先後的元素。
一、K key
二、V value
三、Entry<K, V> next
四、int hash
五、Entry<K, V> before
六、Entry<K, V> after
複製代碼
1-4是從HashMap.Entry中繼承過來的;5-6是LinkedHashMap獨有的。注意next是用於維護HashMap指定table位置上鍊接的Entry的順序的,before、After是用於維護Entry插入的前後順序的。
能夠把LinkedHashMap的結構當作以下圖所示:
接下來主要介紹LinkedHashMap的排序操做,
在構造函數中,須要指定accessOrder,有兩種狀況:
public LinkedHashMap(int initialCapacity,
float loadFactor, boolean accessOrder) {
super(initialCapacity, loadFactor);
this.accessOrder = accessOrder;
}
複製代碼
第二種狀況,也就是accessOrder爲true時,每次經過get/put方法訪問時,都把訪問的那個數據移到雙向隊列的尾部去,也就是說,雙向隊列最頭的那個數據就是最不常訪問的那個數據。具體實現以下,afterNodeAccess這個方法在HashMap中沒有實現,LinkedHashMap進行了實現,將元素進行排序。
void afterNodeAccess(Node<K,V> e) { // move node to last
LinkedHashMap.Entry<K,V> last;
if (accessOrder && (last = tail) != e) {
LinkedHashMap.Entry<K,V> p =
(LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
p.after = null;
if (b == null)
head = a;
else
b.after = a;
if (a != null)
a.before = b;
else
last = b;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
tail = p;
++modCount;
}
}
複製代碼
利用LinkedHashMap實現LRU緩存
LRU即Least Recently Used,最近最少使用,也就是說,當緩存滿了,會優先淘汰那些最近最不常訪問的數據。LinkedHashMap正好知足這個特性,當咱們開啓accessOrder爲true時,最新訪問(get或者put(更新操做))的數據會被丟到隊列的尾巴處,那麼雙向隊列的頭就是最不常用的數據了。
此外,LinkedHashMap還提供了一個方法,這個方法就是爲了咱們實現LRU緩存而提供的,removeEldestEntry(Map.Entry<K,V> eldest) 方法。該方法能夠提供在每次添加新條目時移除最舊條目的實現程序,默認返回 false。
下面是一個最簡單的LRU緩存的實現,當size超過maxElement時,每次新增一個元素時,就會移除最久遠的元素。
public class LRUCache extends LinkedHashMap
{
public LRUCache(int maxSize)
{
super(maxSize, 0.75F, true);
maxElements = maxSize;
}
protected boolean removeEldestEntry(java.util.Map.Entry eldest)
{
//邏輯很簡單,當大小超出了Map的容量,就移除掉雙向隊列頭部的元素,給其餘元素騰出點地來。
return size() > maxElements;
}
private static final long serialVersionUID = 1L;
protected int maxElements;
}
複製代碼
參考:
這節開始介紹併發容器,首先是ConcurrentHashMap,實現了線程安全的HashMap。以前也提到了HashMap在多線程環境下的問題,這小節先詳細分析爲何HashMap多線程下不安全。
首先說結論,爲何HashMap不是線程安全的?在多線程下,會致使HashMap的Entry鏈表造成環形數據結構,一旦造成環形,Entry的next節點永遠不爲空,不管是進行resize仍是get/size等操做時,就會產生死循環。
首先針對JDK7進行分析:
下面是resize部分的代碼,這段代碼將原HashMap中的元素依次移動到擴容後的HashMap中,
1: // Transfer method in java.util.HashMap -
2: // called to resize the hashmap
3: // 依次移動每一個bucket中的元素到新的buckets中
4: for (int j = 0; j < src.length; j++) {
5: Entry e = src[j];
6: if (e != null) {
7: src[j] = null;
8: do {
// Next指向下一個須要移動的元素
9: Entry next = e.next;
// 計算新Map中的位置
10: int i = indexFor(e.hash, newCapacity);
// 插入到bucket中第一個位置
11: e.next = newTable[i];
12: newTable[i] = e;
// 指向原bucket中下一個位置的元素
13: e = next;
14: } while (e != null);
15: }
16: }
複製代碼
在正常單線程的狀況下,若是有以下的HashMap的結構,爲了方便這裏只有2個bucket(java.util.HashMap中默認是 16)。
按照上面的resize流程,e和next分別指向A和B,A是第一次迭代將會被移動的元素,B是下一個。
在resize完成以後,每一個bucket的深度變小了,達到了resize的目的。整個過程在單線程下沒有任何問題,可是考慮到多線程的狀況,就會可能會出現競爭。
如今有兩個線程Thread1,Thread2同時進行resize的操做,假設Thread1在運行到第9行後,Thread2獲取了CPU而且也開始執行resize的操做。
1: // Transfer method in java.util.HashMap -
2: // called to resize the hashmap
3:
4: for (int j = 0; j < src.length; j++) {
5: Entry e = src[j];
6: if (e != null) {
7: src[j] = null;
8: do {
9: Entry next = e.next;
// Thread1 STOPS RIGHT HERE
10: int i = indexFor(e.hash, newCapacity);
11: e.next = newTable[i];
12: newTable[i] = e;
13: e = next;
14: } while (e != null);
15: }
16: }
複製代碼
Thread1運行後,對應的e1和next1別指向A和B,可是Thread1並無移動元素。
假設Thread2在獲取CPU後完整的運行了整個resize,新的Map結構將會以下圖所示:
注意到e1
和next1
仍是指向A和B,可是A和B的位置關係已經變了,按照resize的算法進行兩輪迭代以後,變成以下的結構,
注意此時e
和next
的指向,在下一次的迭代中,將把A放在第3個bucket的一個位置,可是B仍然是指向A的,因此出現了下面的相似於雙向鏈表的結構,
接着Thread1就會進入到無限循環中,此時若是有get操做的話,也會出現無限循環的狀況。這就是HashMap在多線程狀況下容易出現的問題。
接着針對JDK8進行分析:
前面已經提到,JDK8和7在Resize的不一樣之處就是8保留了鏈表中元素的前後位置,這樣基本能夠確保在resize過程當中不出現循環的問題,可是仍是可能出現數據丟失的問題。如下是resize的核心實現,
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
複製代碼
在實現中會使用兩個臨時鏈表,分別存儲新地址和舊地址的鏈表,最後將這兩個鏈表放到對應的位置。
假定出現以下的狀況,有ABC三個元素須要移動,首先線程1指向A,next即爲B,此後線程2一樣進行resize,並把high/low兩個鏈表的更新完成,這時返回線程1繼續運行。
可是線程1仍然按照正常的流程繼續,A會被放到High鏈表,B會被放到Low鏈表,這以後因爲B後面沒有元素,更新完成,所以C就漏掉了。
其實無論是JDK7仍是8,因爲鏈表的不少操做都沒有加鎖,每一個操做也不是原子操做,致使可能出現不少意想不到的結果,也是爲何須要引入專門的ConcurrentHashMap。
爲何不使用HashTable?
以前介紹的HashTable也能保證線程安全,可是HashTable使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法,其餘線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,因此競爭越激烈效率越低。正由於如此,須要引入更加高效的多線程解決方案。
ConcurrentHashMap的結構在JDk1.7和1.8中有較大的不一樣,下面將會分別進行介紹。
ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成。Segment實際繼承自可重入鎖(ReentrantLock),在ConcurrentHashMap裏扮演鎖的角色;HashEntry則用於存儲鍵值對數據。一個ConcurrentHashMap裏包含一個Segment數組,每一個Segment裏包含一個HashEntry數組,咱們稱之爲table,每一個HashEntry是一個鏈表結構的元素。
Segment實際繼承自可重入鎖(ReentrantLock),這是與普通HashMap的最大區別。
面試點:ConcurrentHashMap實現原理是怎麼樣的或者ConcurrentHashMap如何在保證高併發下線程安全的同時實現了性能提高?
ConcurrentHashMap容許多個修改操做併發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不一樣部分進行的修改。內部使用段(Segment)來表示這些不一樣的部分,每一個段其實就是一個小的hash table,只要多個修改操做發生在不一樣的段上,它們就能夠併發進行。
1.1 初始化過程
初始化有三個參數:
如下是對初始化函數的分析:
1.2 Hash值計算
對某個元素進行Put/Get操做以前,都須要定位該元素在哪一個segment元素的某個table元素中的,定位的過程,取得key的hashcode值進行一次再散列(經過Wang/Jenkins算法),拿到再散列值後,以再散列值的高位進行取模獲得當前元素在哪一個segment上。
具體的Hash實現以下:
1.3 Get方法
定位segment和定位table後,依次掃描這個table元素下的的鏈表,要麼找到元素,要麼返回null。
在高併發下的狀況下如何保證取得的元素是最新的?
用於存儲鍵值對數據的HashEntry,在設計上它的成員變量value等都是volatile類型的,這樣就保證別的線程對value值的修改,get方法能夠立刻看到。
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
複製代碼
1.4 Put方法
一、首先定位segment,當這個segment在map初始化後,還爲null,由ensureSegment方法負責填充這個segment。
二、對Segment加鎖,雖然value是volatile的,只能保證可見性,不能保證原子性。這裏put操做不是原子操做,所以須要加鎖。
三、定位所在的table元素,並掃描table下的鏈表,找到時:
注意到默認onlyIfAbsent爲false,也就是若是有相同key的元素,會覆蓋舊的值。不管是否覆蓋,都是返回舊值。
沒有找到時:
1.5 擴容操做
擴容操做不會擴容Segment,只會擴容對應的table數組,每次都是將數組翻倍。
以前也提到過,因爲數組長度爲2次冪,因此每次擴容以後,元素要麼在原處,要麼在原處加上偏移量爲舊的size的新位置。
1.6 Size方法
size的時候進行兩次不加鎖的統計,兩次一致直接返回結果,不一致,從新加鎖再次統計,
ConcurrentHashMap的弱一致性
get方法和containsKey方法都是經過對鏈表遍歷判斷是否存在key相同的節點以及得到該節點的value。但因爲遍歷過程當中其餘線程可能對鏈表結構作了調整,所以get和containsKey返回的多是過期的數據,這一點是ConcurrentHashMap在弱一致性上的體現。
相比JDK1.7的重要變化:
一、取消了segment數組,引入了Node結構,直接用Node數組來保存數據,鎖的粒度更小,減小併發衝突的機率。 二、存儲數據時採用了鏈表+紅黑樹的形式,純鏈表的形式時間複雜度爲O(n),紅黑樹則爲O(logn),性能提高很大。何時鏈表轉紅黑樹?當key值相等的元素造成的鏈表中元素個數超過8個的時候。
2.1 數據結構
ConcurrentHashMap在初始化時,只是給成員變量賦值,put時進行實際數組的填充。
2.2 Hash計算
先計算key的hash值,而後將高位加入計算來進行再散列。
2.3 Get方法
首先計算hash值,肯定在table中的位置。
注意到在初始化TreeBin,也就是設置紅黑樹所在的Node的第一個節點時,會設置對應的hash值,這些hash值定義以下。因此上面的代碼中,能夠經過判斷首節點的hash值<0來肯定該節點爲樹。
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
複製代碼
2.4 Put方法
PUT方法中會實際初始化數組,
2.5 擴容操做
線程執行put操做,發現容量已經達到擴容閾值,須要進行擴容操做。ConcurrentHashMap支持併發擴容,實現方式是,將表拆分,讓每一個線程處理本身的區間。以下圖:
遷移完畢的hash桶,會被設置成ForwardingNode節點,以此告知訪問此桶的其餘線程,此節點已經遷移完畢。此時線程2訪問到了ForwardingNode節點,若是線程2執行的put或remove等寫操做,那麼就會先幫其擴容。若是線程2執行的是get等讀方法,則會調用ForwardingNode的find方法,去nextTable裏面查找相關元素。
2.6 Size
Put操做時,addCount 方法用於 CAS 更新 baseCount,但頗有可能在高併發的狀況下,更新失敗,那麼這些節點雖然已經被添加到哈希表中了,可是數量卻沒有被統計。
當更新 baseCount 失敗的時候,會調用 fullAddCount 將這些失敗的結點包裝成一個 CounterCell 對象,保存在 CounterCell 數組中。
整張表實際的 size 實際上是 baseCount 加上 CounterCell 數組中元素的個數。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);
}
複製代碼
具體的計算count方法,
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
複製代碼
和JDK1.7同樣,這樣獲得的size也只是大概數字,也具備弱一致性。
ConcurrentSkipListMap是一個併發安全, 基於skiplist實現有序存儲的Map。能夠當作是TreeMap的併發版本。
ConcurrentHashMap採用空間換取時間, 但它有着ConcurrentHashMap不能比擬的優勢: 有序數據存儲.
SkipList的結構以下圖所示:
從圖中能夠得出ConcurrentSkipListMap的幾個特色:
ConcurrentSkipListSet基於ConcurrentSkipListMap實現,相似於TreeSet基於TreeMap實現。
ConcurrentLinkedQueue實現了一個高併發的隊列,底層使用鏈表做爲其數據結構。從性能角度看,能夠算是高併發環境下性能最好的隊列了。
ConcurrentLinkedQueue類中,核心節點Node的定義以下,item表示目標元素,next表示當前Node的下一個元素。
private static class Node<E> {
volatile E item;
volatile Node<E> next;
複製代碼
add,offer將元素插入到尾部,其中add實現上直接調用了offer。peek方法拿頭部的數據,可是不移除和poll拿頭部的數據,可是同時移除。
CopyOnWrite(寫時複製)的容器。通俗的理解是當咱們往一個容器添加元素的時候,不直接往當前容器添加,而是先將當前容器進行Copy,複製出一個新的容器,而後新的容器裏添加元素,添加完元素以後,再用新的容器替換舊的容器。
好處是咱們能夠對容器進行併發的讀,而不須要加鎖,由於當前容器不會添加任何元素。因此寫時複製容器也是一種讀寫分離的思想,讀和寫不一樣的容器。若是讀的時候有多個線程正在向容器添加數據,讀仍是會讀到舊的數據,由於寫的時候不會鎖住舊的,只能保證最終一致性。
下面介紹一下寫的過程,
/**
* Appends the specified element to the end of this list.
*
* @param e element to be appended to this list
* @return {@code true} (as specified by {@link Collection#add})
*/
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
複製代碼
首先,寫入操做使用鎖,主要是爲了控制寫寫的狀況。接着進行新數組的複製,將新的元素加入newElements,最後使用新的數組替換老的數組,修改就完成了。整個過程不會影響讀取,而且修改完成之後,讀取線程能夠「覺察」到這個修改,由於array是volatile類型,保證了可見性。
/** The array, accessed only via getArray/setArray. */
private transient volatile Object[] array;
複製代碼
容器的適用場景:適用讀多寫少的併發場景,常見應用:白名單/黑名單,商品類目的訪問和更新場景。可是因爲會複製舊的數組,全部可能存在內存佔用問題。
CopyOnWriteArraySet基於CopyOnWriteArrayList實現,爲了保證數據的惟一性,在往其中加入數據時,會check當前數組中是否存在該元素,若是不存在,則加入到當前數組。
/**
* Appends the element, if not present.
*
* @param e element to be added to this list, if absent
* @return {@code true} if the element was added
*/
public boolean addIfAbsent(E e) {
Object[] snapshot = getArray();
return indexOf(e, snapshot, 0, snapshot.length) >= 0 ? false :
addIfAbsent(e, snapshot);
}
複製代碼
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:
阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。
阻塞隊列提供了四種處理方法:
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
JDK7 提供了 7 個阻塞隊列。分別是
1. ArrayBlockingQueue
ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認狀況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的全部生產者線程或消費者線程,當隊列可用時,能夠按照阻塞的前後順序訪問隊列,即先阻塞的生產者線程,能夠先往隊列裏插入元素,先阻塞的消費者線程,能夠先從隊列裏獲取元素。一般狀況下爲了保證公平性會下降吞吐量。咱們可使用如下代碼建立一個公平的阻塞隊列:
2. LinkedBlockingQueue
一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。
3. PriorityBlockingQueue
一個支持優先級的無界隊列。默認狀況下元素採起天然順序排列,也能夠經過比較器 comparator 來指定元素的排序規則。元素按照升序排列。
4. DelayQueue
一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。隊列中的元素必須實現 Delayed 接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。咱們能夠將 DelayQueue 運用在如下應用場景:
緩存系統的設計:能夠用 DelayQueue 保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示緩存有效期到了。
定時任務調度。使用 DelayQueue 保存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,從好比 TimerQueue 就是使用 DelayQueue 實現的。
隊列中的 Delayed 必須實現 compareTo 來指定元素的順序。好比讓延時時間最長的放在隊列的末尾。
5. SynchronousQueue
SynchronousQueue 是一個不存儲元素的阻塞隊列。每個 put 操做必須等待一個 take 操做,不然不能繼續添加元素。SynchronousQueue 能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己並不存儲任何元素,很是適合於傳遞性場景, 好比在一個線程中使用的數據,傳遞給另一個線程使用,SynchronousQueue 的吞吐量高於 LinkedBlockingQueue 和 ArrayBlockingQueue。
6. LinkedTransferQueue
是一個由鏈表結構組成的無界阻塞 TransferQueue 隊列。相對於其餘阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
複製代碼
第一行代碼是試圖把存放當前元素的 s 節點做爲 tail 節點。第二行代碼是讓 CPU 自旋等待消費者消費元素。由於自旋會消耗 CPU,因此自旋必定的次數後使用 Thread.yield() 方法來暫停當前正在執行的線程,並執行其餘線程。
對於帶有時間限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,則是試圖把生產者傳入的元素直接傳給消費者,可是若是沒有消費者消費該元素則等待指定的時間再返回,若是超時還沒消費元素,則返回 false,若是在超時時間內消費了元素,則返回 true。
7. LinkedBlockingDeque
一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你能夠從隊列的兩端插入和移出元素。雙端隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以 Last 單詞結尾的方法,表示插入,獲取或移除雙端隊列的最後一個元素。另外插入方法 add 等同於 addLast,移除方法 remove 等效於 removeFirst。可是 take 方法卻等同於 takeFirst,不知道是否是 Jdk 的 bug,使用時仍是用帶有 First 和 Last 後綴的方法更清楚。
在初始化 LinkedBlockingDeque 時能夠設置容量防止其過渡膨脹。另外雙向阻塞隊列能夠運用在「工做竊取」模式中。
在介紹阻塞隊列的實現以前,先介紹一下生產者與消費者模式:
生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,若是生產者處理速度很快,而消費者處理速度很慢,那麼生產者就必須等待消費者處理完,才能繼續生產數據。一樣的道理,若是消費者的處理能力大於生產者,那麼消費者就必須等待生產者。爲了解決這種生產消費能力不均衡的問題,便有了生產者和消費者模式。
生產者和消費者模式是經過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通訊,而是經過阻塞隊列來進行通訊,因此生產者生產完數據以後不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列裏取,阻塞隊列就至關於一個緩衝區,平衡了生產者和消費者的處理能力。
1)當隊列滿的時候,插入元素的線程被阻塞,直達隊列不滿。 2)隊列爲空的時候,獲取元素的線程被阻塞,直到隊列不空。
JDK是如何讓生產者和消費者可以高效率的進行通信呢?
答案是使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素後,會通知生產者當前隊列可用。
以ArrayBlockingQueue爲例:
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其餘代碼
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
複製代碼
從上述代碼能夠看到,當隊列爲空,notEmpty進行等待;插入元素後,喚醒等待的線程。當隊列滿時,notFull進行等待;刪除元素後,喚醒等待的線程。
參考:
本文由『後端精進之路』原創,首發於博客 teckee.github.io/ , 轉載請註明出處
搜索『後端精進之路』關注公衆號,馬上獲取最新文章和價值2000元的BATJ精品面試課程。