首先來看下HashMap的類繼承結構:
public class HashMap extends AbstractMap<K,V> impement Map<K,V>,Coloneable,Serializable{java
}
能夠看出HashMap實現了Map接口。其裏面的方法都是非線程安全的,且不支持併發操做。
對於HashMap主要看的是get/put方法實現,其在jdk1.7,及1.8在解決哈希衝突的上有所不一樣。
1、Java7 HashMapnode
從上面的結構圖中,能夠大體看出,HashMap由數組:transient Entry<K,V>[] table = (Entry<K,V>[]) EMPTY_TABLE;沒個元素對應爲一個單向鏈表,鏈表數據結構以下: static class Entry<K,V> implements Map.Entry<K,V> { final K key; V value; Entry<K,V> next; int hash; }
在HashMap中定義的成員變量:
capacity:當前數組容量,始終保持 2^n,能夠擴容,擴容後數組大小爲當前的 2 倍。
loadFactor:負載因子,默認爲 0.75。
threshold:擴容的閾值,等於 capacity * loadFactor,當容量超過這個值時,數組將擴容。
transient int modCount; //HashMap修改次數,這個值用於和expectedModCount指望修改次數比較。數組
一、put方法解析:
public V put(K key, V value) {
//1.當插入第一個元素時,須要建立並初始化指定大小的數組
if (table == EMPTY_TABLE) {
inflateTable(threshold);
}緩存
//2.若是 key 爲 null,循環遍歷table[0]上的鏈表,最終會將這個 entry 放到 table[0] 中 if (key == null) return putForNullKey(value); //3.計算key的哈希值 int hash = hash(key); //四、經過h & (length-1)即h%length求模找到鍵值對放在哪一個位置。 int i = indexFor(hash, table.length); for (Entry<K,V> e = table[i]; e != null; e = e.next) { //循環對應位置的單向鏈表,逐個查找每一個鏈表節點上是否有整個鍵了。 Object k; if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {//hash值爲整數,比較性能比equals高;另外短路運算,哈希值系統了就不必在比較equals。 V oldValue = e.value;//先將當前節點的鍵對應的值取出來。 e.value = value; //替換爲新值。 e.recordAccess(this); return oldValue; } } modCount++; //容器修改次數加1 addEntry(hash, key, value, i); //在指定的位置上添加數據,若空間不夠則動態擴充,當前容量乘以2,新建一個數組,長度爲capacity*2;並將原來的數組拷貝過來,更新對應變量。 return null; } 數組初始化: private void inflateTable(int toSize) { // Find a power of 2 >= toSize int capacity = roundUpToPowerOf2(toSize); //指定數組容量,默認爲16 threshold = (int) Math.min(capacity * loadFactor, MAXIMUM_CAPACITY + 1); table = new Entry[capacity]; //改變數組的引用,指向新建立的數組 initHashSeedAsNeeded(capacity); } 計算鍵值對的位置: static int indexFor(int h, int length) { // assert Integer.bitCount(length) == 1 : "length must be a non-zero power of 2"; return h & (length-1); //等同於求模:h%length } 添加節點到鏈表中 void addEntry(int hash, K key, V value, int bucketIndex) { //假如map的元素個數大於等於閾值,而且bucketIndex的位置已經元素,則須要動態擴容 if ((size >= threshold) && (null != table[bucketIndex])) { //擴容 resize(2 * table.length); hash = (null != key) ? hash(key) : 0; //從新計算應該存儲下標 bucketIndex = indexFor(hash, table.length); } //建立元素及鏈表節點 createEntry(hash, key, value, bucketIndex); } //新建一個Entry對象,插入單向鏈表表頭,並增長size(不管是否擴容這一步都要進行) void createEntry(int hash, K key, V value, int bucketIndex) { Entry<K,V> e = table[bucketIndex]; table[bucketIndex] = new Entry<>(hash, key, value, e); size++; } 數組擴容: void resize(int newCapacity) { Entry[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity == MAXIMUM_CAPACITY) { threshold = Integer.MAX_VALUE; return; } //新建一個容量擴充2倍的數組 Entry[] newTable = new Entry[newCapacity]; //調用transfer方法將舊數組中的鍵值對拷貝過來 transfer(newTable, initHashSeedAsNeeded(newCapacity)); //舊數組原來的堆空間設置爲引用切斷,指向新數組 table = newTable; //從新計算閾值 threshold = (int)Math.min(newCapacity * loadFactor, MAXIMUM_CAPACITY + 1); } 鍵值對移植: /** * Transfers all entries from current table to newTable. */ void transfer(Entry[] newTable, boolean rehash) { int newCapacity = newTable.length; for (Entry<K,V> e : table) { while(null != e) { Entry<K,V> next = e.next; //是否從新計算key的哈希 if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } //從新計算元素位置 int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } } } 以上就是保存鍵值對的主要代碼,基本步驟: 1)、計算key的哈希值; 2)、根據哈希值計算數組元素的保存位置(h&(length-1)或h%length); 3)、根據須要擴充數組大小; 4)、將鍵值對插入到對應的鏈表頭部或更新已有值; 二、get方法解析 public V get(Object key) { //若是key爲空則直接,在存放元素時是直接存放到table[0],因此直接調用getForNullKey方法遍歷對應鏈表便可。 if (key == null) return getForNullKey(); Entry<K,V> entry = getEntry(key); return null == entry ? null : entry.getValue(); } 遍歷table[0]位置的鏈表,返回對應key==null的值,若果返回null,則有兩種狀況,要麼沒有key==null的鍵值對,要麼對應位置上的值爲null。 private V getForNullKey() { if (size == 0) { return null; } for (Entry<K,V> e = table[0]; e != null; e = e.next) { if (e.key == null) return e.value; } return null; } key值不爲空,則調用返回對應的值: final Entry<K,V> getEntry(Object key) { if (size == 0) { return null; } int hash = (key == null) ? 0 : hash(key); for (Entry<K,V> e = table[indexFor(hash, table.length)]; e != null; e = e.next) { Object k; if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } return null; } 總結基本流程: 一、計算鍵的哈希值; 二、根據哈希值找到數組中對於的鏈表; 三、遍歷鏈表,查找對應key的值; 四、在比較查找的過程當中,先快速比較哈希值,hash相同則再繼續經過equals比較;
2、java7 ConcurrentHashMap
在java7 下ConcurrentHashMap結構以下:安全
ConcurrentHashMap是併發版的HashMap,支持複雜的併發操做,經過下降鎖的粒度和cas等實現了高併發,支持原子條件的更新操做,不會拋出ConcurrentModificationException,實現了弱一致性。 ConCurrentHashMap是一個Segment數組,每一個segment元素對應一個哈希表(結構相似於HashMap),每一個Segment段的數組一經構造方法初始化,不可再擴容。而Segment[i]段上的HashEntry[]數組則能夠作相似HashMap結構同樣擴容,只是操做比HashMap複雜。 初始化: initialCapacity:初始容量,這裏指的是ConcurrentHashMap容量,會平均分配給每一個Segment。 loadFactor:負載因子,因爲Segment[]數組不能夠擴展,因此這個參數是給Segment[i]內部使用的. concurrencyLevel:併發級別,默認爲16,表示ConcurrentHashMap被分紅16段。若是初始化時傳入的不是2的n次冪,則將會取最近最大的一個2的冪,好比14則擴展爲16,28擴展爲32。 @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { //參數檢查 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //MAX_SEGMENTS 爲1<<16=65536,也就是最大併發數爲65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; //位移位數 int ssize = 1; //segment數組長度, while (ssize < concurrencyLevel) { ++sshift; //左移一位 ssize <<= 1; } this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; //掩嗎 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //根據ConcurrentHashMap容量,計算segment每一個段中數組的長度,也必定是2的n次 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; //segment[i]中數組容量最小爲2,這樣不至於插入一個元素時,不會發生擴容。只有插入第二個的時候纔會擴容。 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] 建立segment數組並初始化第一個元素,其他的延時初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; //Class sc = Segment[].class; //SBASE = UNSAFE.arrayBaseOffset(sc);獲取數組中第一個元素的地址偏移量 //putOrderedObject是putObjectVolitle的volitile版本, UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; } Segment數組的大小ssize是由concurrentLevel來決定的,可是卻不必定等於concurrentLevel,ssize必定是大於或等於concurrentLevel的最小的2的次冪。 put方法解析: @SuppressWarnings("unchecked") public V put(K key, V value) { Segment<K,V> s; //這地方明確說明了ConcurrentHashMap不容許null的key/value if (value == null) throw new NullPointerException(); //hash方法不是簡單的取得hashCode,是再hashCode的基礎上進行了位運算,保證散列均勻 int hash = hash(key); //返回的hashcode先無符號位右移segmentShift位,在段掩碼位運算 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment //ensureSegment(j) 對 segment[j] 進行初始化 s = ensureSegment(j); //插入key、value到s段中 return s.put(key, hash, value, false); } 此ConcurrentHashMap的put方法就作了兩件事情: 一、定位segment段,並初始化segment[j]。 二、調用segment段上的put方法。 接下來,接着看ConcurrentHashMap代理到Segment上的put方法,這裏就須要考慮併發,只是鎖的粒度被細化了而已。 final V put(K key, int hash, V value, boolean onlyIfAbsent) { //寫入前先得到獨佔鎖,tryLock不成功時會遍歷定位到的HashEnry位置的鏈表(遍歷主要是爲了使CPU緩存鏈表),若找不到,則建立HashEntry。tryLock必定次數後(MAX_SCAN_RETRIES變量決定,單個1,多核64),則lock等待阻塞。若遍歷過程當中,因爲其餘線程的操做致使鏈表頭結點變化,則須要從新遍歷。 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table;// // 這個是 segment 內部的數組 int index = (tab.length - 1) & hash; //和HashMap相似,取模,定位HashEntry,能夠看到,這個hash值在定位Segment時和在Segment中定位HashEntry都會用到,只不過定位Segment時只用到高几位(無符號右移)。 HashEntry<K,V> first = entryAt(tab, index); //取得制定位置的節點 , first 是數組該位置處的鏈表的表頭 //循環遍歷鏈表,考慮到鏈表是否存在 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; //修改次數 } break; } e = e.next; //不存在這樣的key,則指針指向下一個節點 } else { //若是不爲 null,那就直接將它設置爲鏈表表頭;若是是null,初始化並設置爲鏈表表頭。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //判斷是否須要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // 沒有達到閾值,將 node 放到數組 tab 的 index 位置, // 其實就是將新的節點設置成原鏈表的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; //告訴GC能夠回收 break; } } } finally { unlock(); //釋放鎖 } return oldValue; } 初始化segment段: private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; //之因此強調是「當前」,是由於在ConcurrentHashMap構造時,初始化了segment[0]. long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck //之因此DCL是爲了不其餘線程已經初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用 while 循環,內部用 CAS,當前線程成功設值或其餘線程成功設值後,退出 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; } 獲取寫入鎖:在往segment[i]上進行put操做的時候,HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);先是經過快速tryLock獲取獨佔鎖,若是獲取失敗,則通scanAndLockForPut獲取。 private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node //循環獲取鎖,再次使用trylock快速的去獲取鎖。 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { //鏈表爲空,則再次判斷node是否爲空,避免其餘線程獲取鎖後初始化了node 節點 if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; }//重試次數超過MAX_SCAN_RETRIES(單核1次,多核64次)後,則進入阻塞隊列等待鎖 else if (++retries > MAX_SCAN_RETRIES) { lock();//lock是阻塞方法,直到得到鎖後才返回。 break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { //此時有新元素進入了鏈表中,成爲了表頭,這裏處理方法是從新遍歷鏈表,在走一遍scanAndLockForPut e = first = f; // re-traverse if entry changed retries = -1; } } return node; } 這個方法作了兩件事情:1:、得到segment獨佔鎖,二、若是鏈表爲空則初始化node節點 擴容:rehash 首先須要注意segment[]一旦被初始化是不能夠被擴容的,能擴容的只是segment[i]位置上的HashEntry[]數組,當數組實際容量超過閾值,則須要擴容,容量*2。這個過程當中不用再考慮併發的問題了,由於這個時候必定得到了獨佔鎖。 //node爲本次擴容要添加到鏈表的節點 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; //容量*2 threshold = (int)(newCapacity * loadFactor); HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; //建立新數組 int sizeMask = newCapacity - 1; //新的掩碼 // 遍歷原數組,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置 for (int i = 0; i < oldCapacity ; i++) { //e是鏈表的第一個元素 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 計算應該放置在新數組中的位置, // 假設原數組長度爲 16,e 在 oldTable[3] 處,那麼 idx 只多是 3 或者是 3 + 16 = 19 int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; //只有一個元素,直接放到idx位置 else { // Reuse consecutive sequence at same slot HashEntry<K,V> lastRun = e; // idx 是當前鏈表的頭結點 e 的新位置 int lastIdx = idx; //下面這個 for 循環會找到一個 lastRun 節點,這個節點以後的全部元素是將要放到一塊兒的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 將 lastRun 及其以後的全部節點組成的這個鏈表放到 lastIdx 這個位置 newTable[lastIdx] = lastRun; // Clone remaining nodes // 下面的操做是處理 lastRun 以前的節點, // 這些節點可能分配在另外一個鏈表中,也可能分配到上面的那個鏈表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } get方法分析: public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead 手工集成訪問方法以減小開銷 HashEntry<K,V>[] tab; //計算hash值 int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; //根據hash值找到對應位置的segment段 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { //找到segment內部數組對應位置上的鏈表,遍歷 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; } 其基本流程爲: 一、計算hash值,根據hash找到對應的segment段。 二、找到segment中數組的具體位置。 三、遍歷鏈表查找。 併發問題分析
如今咱們已經說完了 put 過程和 get 過程,咱們能夠看到 get 過程當中是沒有加鎖的,那天然咱們就須要去考慮併發問題。數據結構
添加節點的操做 put 和刪除節點的操做 remove 都是要加 segment 上的獨佔鎖的,因此它們之間天然不會有問題,咱們須要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操做。併發
put 操做的線程安全性。
初始化槽,這個咱們以前就說過了,使用了 CAS 來初始化 Segment 中的數組。
添加節點到鏈表的操做是插入到表頭的,因此,若是這個時候 get 操做在鏈表遍歷的過程已經到了中間,是不會影響的。固然,另外一個併發問題就是 get 操做在 put 以後,須要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject。
擴容。擴容是新建立了數組,而後進行遷移數據,最後面將 newTable 設置給屬性 table。因此,若是 get 操做此時也在進行,那麼也不要緊,若是 get 先行,那麼就是在舊的 table 上作查詢操做;而 put 先行,那麼 put 操做的可見性保證就是 table 使用了 volatile 關鍵字。ssh
remove 操做的線程安全性。
get 操做須要遍歷鏈表,可是 remove 操做會」破壞」鏈表。
若是 remove 破壞的節點 get 操做已通過去了,那麼這裏不存在任何問題。
若是 remove 先破壞了一個節點,分兩種狀況考慮。 一、若是此節點是頭結點,那麼須要將頭結點的 next 設置爲數組該位置的元素,table 雖然使用了 volatile 修飾,可是 volatile 並不能提供數組內部操做的可見性保證,因此源碼中使用了 UNSAFE 來操做數組,請看方法 setEntryAt。二、若是要刪除的節點不是頭結點,它會將要刪除節點的後繼節點接到前驅節點中,這裏的併發保證就是 next 屬性是 volatile 的。ide