一、首先看下HashMap的put方法。java
final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { Node<K,V>[] tab; Node<K,V> p; int n, i; if ((tab = table) == null || (n = tab.length) == 0) // 1.1 初始化數組長度 n = (tab = resize()).length; // 1.2 查詢數組對應下標是否有值,下標計算tab[i = (n - 1) & hash] // hash表示經過key.hashcode和key.hashcode的高16位進行異或操做,爲何要這樣作?,n-1表示15,二進制表示01111 /** * 解釋:好比key.hashcode = 10010110101010,那麼如何直接拿 key.hashcode & (n-1) 此處n-1 = 15 * 10010110101010 * 01111 *----------------------- * 01010 * 此時是否發現key.hashcode的高16位壓根就沒有進行計算,那麼只用低位計算是否是衝突的機率較大? * 這就是爲何將key.hashcode的高位和低位進行^操做後在與15進行&操做 */ if ((p = tab[i = (n - 1) & hash]) == null) // 1.3 將對應的值放入數組對應下標 tab[i] = newNode(hash, key, value, null); else { // hash 發生碰撞 Node<K,V> e; K k; // 而且發現key已存在,那麼將新值和舊值互換 if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; else if (p instanceof TreeNode) // 紅黑二叉樹 e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); else { // 鏈表 for (int binCount = 0; ; ++binCount) { // 判斷鏈表的next是否有值 if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); // 當鏈表的數量大於等於8時,鏈表會自動轉換成紅黑二叉樹,由於鏈表過長會致使鏈表的查詢效率太低 if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st treeifyBin(tab, hash); break; } if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } if (e != null) { // existing mapping for key V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } ++modCount; // 當map中的數據大於16*0.75=12 map的初始化大小位16,負載因子0.75 if (++size > threshold) resize(); afterNodeInsertion(evict); return null; }
二、 經過解讀源碼發現hashmap不時線程安全
那麼此時hashtable登場 經過源碼發現hashtable的put方法上加了synchronized關鍵字,HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的狀況下HashTable的效率很是低下。由於當一個線程訪問HashTable的同步方法時,其餘線程訪問HashTable的同步方法時,可能會進入阻塞或輪詢狀態。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,而且也不能使用get方法來獲取元素,因此競爭越激烈效率越低,此時會發現多線程操做時效率明顯太低對吧!那麼有辦法解決嗎?node
public synchronized V put(K key, V value) { // Make sure the value is not null if (value == null) { throw new NullPointerException(); } // Makes sure the key is not already in the hashtable. Entry<?,?> tab[] = table; int hash = key.hashCode(); int index = (hash & 0x7FFFFFFF) % tab.length; @SuppressWarnings("unchecked") Entry<K,V> entry = (Entry<K,V>)tab[index]; for(; entry != null ; entry = entry.next) { if ((entry.hash == hash) && entry.key.equals(key)) { V old = entry.value; entry.value = value; return old; } } addEntry(hash, key, value, index); return null; }
三、 當前有辦法了,ConcurrentHashMap閃亮登場數組
/** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) // 初始化數組,剛纔咱們提到多線程,難道每一個線程都去初始化?下面單獨拉出來看下 tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 利用cas無鎖機制,cas原子操做,在指定位置設定值 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) // 此處map擴容後,數據轉移 tab = helpTransfer(tab, f); else { // 鎖住當前node節點,這樣其餘線程繼續操做時,由於node對象不是同一個了, // 因此不會影響其餘線程的操做,這樣效率就提升了。 V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
四、回到剛纔ConcurrentMap初始化 tab = initTable()安全
private transient volatile int sizeCtl; // 注意此處sizeCtl爲volatile private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 第一次進來sizeCtl值默認爲0,那麼其餘線程過來時發現sizeCtl已經變成了-1, // 直接Thread.yield()讓出cpu時間片。故此就實現了只有一個線程去初始化數組 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 將sc的值置爲-1 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { // 將sizeCtl置爲 -1 sizeCtl = sc; } break; } } return tab; }
五、ConcurrentMap擴容注意點多線程
ConcurrentMap擴容時,其餘線程不能再往map中添加/刪除數據了,那麼難道其餘線程在那邊等待,直到擴容完成?固然不是了,其餘線程會去幫助擴容線程一塊兒進行擴容,每一個線程都會去領取屬於本身的任務app
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }