一、首先拋出幾個問題(文章最後有答案):java
a、ConcurrentHashMap在put的時候,key通過幾回hash計算?node
b、segment 會增大嗎?數組
c、新的值是放在鏈表的表頭仍是表尾?安全
二、ConcurrentHashMap是如何存儲數據的?數據結構
先看圖:併發
從圖中咱們能夠看出ConcurrentHashMap有兩個種數據結構:數組和單向鏈表ssh
那ConcurrentHashMap和如何存放一對key和value呢?高併發
put的具體過程:性能
a、根據key計算hash值優化
b、根據hash值找到segment數組的下標
c、根據上面的下標獲取tab數組,
d、根據hash值,獲取tab數組的下標
c、若是tab當前下標位置上沒有值,就直接把存儲有key和value的HashEntry存放在tab的當前下標下,不然就是造成一個鏈表(解決了Hash值衝突)
這就是整個put的大概過程。
是否是有小夥伴說,褲子都脫了,你給我看這個?哈哈哈哈哈,好,上代碼
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根據key獲取hash值 int j = (hash >>> segmentShift) & segmentMask; // 定位segment數組的下標j if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) // 根據下標j獲取數組該下標的元素s s = ensureSegment(j); return s.put(key, hash, value, false); }
看代碼,你們是否是也有些不太明白呢?那我就一行一行的解釋吧
解釋以前咱們得先了解幾個參數:(注:涉及到unsafe的相關用法參考:https://my.oschina.net/huangy/blog/1620321)
segmentShift、segmentMask、SSHIFT,SBASE,segments
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { // 默認值initialCapacity=16 loadFactor=0.75 concurrencyLevel=16 // initialCapacity 決定tab數組的初始化長度,concurrencyLevel決定segment數組的長度 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments // 找到一個ssize不小於concurrencyLevel且必須是2的n次冪,爲何呢?下面解釋 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } this.segmentShift = 32 - sshift; // sshift=4,segmentShift=28 this.segmentMask = ssize - 1; // segment數組的長度=ssize=16,segmentMask=15 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // create segments and segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; // segments長度=16 }
static { int ss, ts; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class tc = HashEntry[].class; Class sc = Segment[].class; TBASE = UNSAFE.arrayBaseOffset(tc);// 獲取HashEntry[]的基本偏移量=6 SBASE = UNSAFE.arrayBaseOffset(sc);// 獲取Segment[]的基本偏移量=6 ts = UNSAFE.arrayIndexScale(tc);// 獲取HashEntry[]單位偏移量=4 ss = UNSAFE.arrayIndexScale(sc);//獲取Segment[]單位偏移量=4 HASHSEED_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("hashSeed")); SEGSHIFT_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentShift")); SEGMASK_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segmentMask")); SEGMENTS_OFFSET = UNSAFE.objectFieldOffset( ConcurrentHashMap.class.getDeclaredField("segments")); } catch (Exception e) { throw new Error(e); } if ((ss & (ss-1)) != 0 || (ts & (ts-1)) != 0) throw new Error("data type scale not a power of two"); SSHIFT = 31 - Integer.numberOfLeadingZeros(ss);//把Segment[]單位偏移量轉成移位的次數=2 TSHIFT = 31 - Integer.numberOfLeadingZeros(ts);//把HashEntry[]單位偏移量轉成移位的次數=2 }
開始解釋代碼:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); // 根據key獲取hash值 int j = (hash >>> segmentShift) & segmentMask; // 定位segment數組的下標j // 上面知道segmentShift=28,segmentMask=15(二進制:00000000000000000000000000001111) // 假設hash=994162679 二進制:00111011010000011011011111110111 // (hash >>> segmentShift) // 這句話的意思就是hash右移28 結果=3 (二級制:00000000000000000000000000000011) // int j = (hash >>> segmentShift) & segmentMask; // 就變成 // int j = 3 & segmentMask; // 00000000000000000000000000000011 & 00000000000000000000000000001111 // 的結果=00000000000000000000000000000011 // 因此 j = 3 // 這就根據hash值定位segments的數組下標 j=3 // 咱們回想一下:segments數組長度ssize=16 // segmentMask = ssize-1 = 15 // 而後每一個通過右移28位的hash值和segmentMask進行與操做, // 就能保證j必定落在數組內,保證了不越界,同時效率也很是高 // 這就是要找到一個ssize不小於concurrencyLevel且必須是2的n次冪的緣由 // HashMap 也是這麼幹的,能夠關注我查看個人相關文章。 if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null) // 根據下標j獲取數組該下標的元素s // UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) // 根據上面知道:SSHIFT=2 SBASE=6 // 同時剛剛求出 j=3 // (j << SSHIFT) + SBASE 的結果是12+6=20 // UNSAFE.getObject()會根據segments和偏移量獲得數組下標=3的元素 // UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) 能夠簡單認爲是 segments[3] // 只不過這是直接內存操做很是高效而已,這樣的操做ConcurrentHashMap用的很是多 // 不明白能夠查看個人相關文章 s = ensureSegment(j); return s.put(key, hash, value, false); }
private Segment<K,V> ensureSegment(int k) { // 這裏一系列的unsafe操做請查看個人相關文章 final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; //仍是獲取偏移量 Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype int cap = proto.table.length; // cap=16 float lf = proto.loadFactor; // lf =0.75 int threshold = (int)(cap * lf); HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // UNSAFE.compareAndSwapObject 保證了原子性,能夠思考一下沒有原子保證,會有什麼後果 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
分析:s.put(key, hash, value, false);
分析以前咱們瞭解Segment,它繼承了ReentrantLock,咱們知道ConcurrentHashMap 是線程安全的,這就是關鍵點了
s.put()執行過程
a、嘗試獲取鎖
b、根據hash值獲取tab數組下標
c、tab數組當前下標,是否有HashEntry,有則遍歷,沒有則建立一個HashEntry
d、釋放鎖
大概就是這麼一個過程
看代碼
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 嘗試獲取鎖 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; // tab.length=16 16-1=15(二進制00000000000000000000000000001111) // 看到這個是否是很熟悉,不解釋了 HashEntry<K,V> first = entryAt(tab, index);// 根據數組和下標獲取元素,仍是unsafe操做 //遍歷一次就搞定了全部事情,若是是你寫,你會怎麼寫? for (HashEntry<K,V> e = first;;) { if (e != null) { // e,若是不爲空,則表示tab數組當前這個下標,已經有值,極可能造成一個鏈表, // K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { // 若是key和hash都相同,表示同一個,按要求看是否要更新value oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first);// 從這裏能夠看出新進來的,會作鏈表頭 else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
其實最讓人膜拜的代碼是:
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
tryLock()嘗試獲取鎖,獲取不到就執行scanAndLockForPut,咱們看看scanAndLockForPut都幹嗎了
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { // entryForHash根據hash值獲取tab中對應的元素,看不懂能夠參考以前的分析 HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 不斷的在嘗試獲取鎖 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { // 爲了防止無限次嘗試,作了個限制,通常MAX_SCAN_RETRIES=64 lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; } // 這段代碼主要作兩件事: // 一、獲取鎖,執行完這個方法確定能獲得鎖 // 二、在獲取鎖等待的過程當中,有必要的建立新HashEntry // 這段代碼主要優化的是: // 利用在獲取鎖等待的時間,若是發現tab當前這個下標的值爲空 // 那麼建立HashEntry,而後繼續獲取鎖,知道超過MAX_SCAN_RETRIES的次數 // 執行到lock(),而後整個線程就會進入等待 // 若是不是使用tryLock(),而是上來就是lock(),那麼整個線程就會進入等待,什麼都幹不了 // 這是一個很是小的優化,可是絕大部分應用場景都是新建立HashEntry這樣的狀況的, // 因此這個優化仍是很是值得確定的 // 大神寫的代碼,是否是有種眼界提升的感受,哈哈哈哈哈哈
到此,ConcurrentHashMap的最關鍵的代碼就是這些了,只要你能看懂這些,其餘的都不在話下。
Unsafe的相關操做參考:
https://my.oschina.net/huangy/blog/1620321
三、總結
開頭的問題有答案了嗎?
好,揭曉答案
a、ConcurrentHashMap在put的時候,key通過幾回hash計算?一次
b、segment 會增大嗎?不會
c、新的值是放在鏈表的表頭仍是表尾?表頭
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel)
initialCapacity:控制tab數組的大小(默認16)
loadFactor:tab進行rehash閾值百分數(默認0.75)
concurrencyLevel:控制segment的大小 (默認16)
因此,一旦concurrencyLevel指定了就不能改變了
那麼ConcurrentHashMap裏爲何分segment呢?
這就是ConcurrentHashMap高明之處,經過以前的分析咱們都知道鎖只在segment中存在,這樣就把鎖的粒度變小,提升併發,同時仍是線程安全的,
因此,若是咱們使用ConcurrentHashMap存放數據的時候,數據很是大的時候,concurrencyLevel的指定就尤其重要了,合適concurrencyLevel的可讓ConcurrentHashMap性能最佳。
最後留個問題:
hash值相同的兩個對象是同一個嗎?歡迎你們評論裏留言
歡迎關注,轉發
持續更新有意思的代碼