【java基礎】ConcurrentHashMap1.7和1.8的不一樣實現

ConcurrentHashMap

在多線程環境下,使用HashMap進行put操做時存在丟失數據的狀況,爲了不這種bug的隱患,強烈建議使用ConcurrentHashMap代替HashMap,爲了對ConcurrentHashMap有更深刻的瞭解,本文將對ConcurrentHashMap1.7和1.8的不一樣實現進行分析。數組

1.7實現

數據結構

jdk1.7中採用Segment + HashEntry的方式進行實現,結構以下:安全

ConcurrentHashMap初始化時,計算出Segment數組的大小ssize和每一個SegmentHashEntry數組的大小cap,並初始化Segment數組的第一個元素;其中ssize大小爲2的冪次方,默認爲16,cap大小也是2的冪次方,最小值爲2,最終結果根據根據初始化容量initialCapacity進行計算,計算過程以下:數據結構

1多線程

2併發

3dom

4this

5spa

if (c * ssize < initialCapacity)線程

    ++c;設計

int cap = MIN_SEGMENT_TABLE_CAPACITY;

while (cap < c)

    cap <<= 1;

其中Segment在實現上繼承了ReentrantLock,這樣就自帶了鎖的功能。

put實現

當執行put方法插入數據時,根據key的hash值,在Segment數組中找到相應的位置,若是相應位置的Segment還未初始化,則經過CAS進行賦值,接着執行Segment對象的put方法經過加鎖機制插入數據,實現以下:

場景:線程A和線程B同時執行相同Segment對象的put方法

一、線程A執行tryLock()方法成功獲取鎖,則把HashEntry對象插入到相應的位置;
二、線程B獲取鎖失敗,則執行scanAndLockForPut()方法,在scanAndLockForPut方法中,會經過重複執行tryLock()方法嘗試獲取鎖,在多處理器環境下,重複次數爲64,單處理器重複次數爲1,當執行tryLock()方法的次數超過上限時,則執行lock()方法掛起線程B;
三、當線程A執行完插入操做時,會經過unlock()方法釋放鎖,接着喚醒線程B繼續執行;

size實現

由於ConcurrentHashMap是能夠併發插入數據的,因此在準確計算元素時存在必定的難度,通常的思路是統計每一個Segment對象中的元素個數,而後進行累加,可是這種方式計算出來的結果並不同的準確的,由於在計算後面幾個Segment的元素個數時,已經計算過的Segment同時可能有數據的插入或則刪除,在1.7的實現中,採用了以下方式:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

try {

    for (;;) {

        if (retries++ == RETRIES_BEFORE_LOCK) {

            for (int j = 0; j < segments.length; ++j)

                ensureSegment(j).lock(); // force creation

        }

        sum = 0L;

        size = 0;

        overflow = false;

        for (int j = 0; j < segments.length; ++j) {

            Segment<K,V> seg = segmentAt(segments, j);

            if (seg != null) {

                sum += seg.modCount;

                int c = seg.count;

                if (c < 0 || (size += c) < 0)

                    overflow = true;

            }

        }

        if (sum == last)

            break;

        last = sum;

    }

} finally {

    if (retries > RETRIES_BEFORE_LOCK) {

        for (int j = 0; j < segments.length; ++j)

            segmentAt(segments, j).unlock();

    }

}

先採用不加鎖的方式,連續計算元素的個數,最多計算3次:
一、若是先後兩次計算結果相同,則說明計算出來的元素個數是準確的;
二、若是先後兩次計算結果都不一樣,則給每一個Segment進行加鎖,再計算一次元素的個數;

1.8實現

數據結構

1.8中放棄了Segment臃腫的設計,取而代之的是採用Node + CAS + Synchronized來保證併發安全進行實現,結構以下:

只有在執行第一次put方法時纔會調用initTable()初始化Node數組,實現以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

private final Node<K,V>[] initTable() {

    Node<K,V>[] tab; int sc;

    while ((tab = table) == null || tab.length == 0) {

        if ((sc = sizeCtl) < 0)

            Thread.yield(); // lost initialization race; just spin

        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {

            try {

                if ((tab = table) == null || tab.length == 0) {

                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                    @SuppressWarnings("unchecked")

                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];

                    table = tab = nt;

                    sc = n - (n >>> 2);

                }

            } finally {

                sizeCtl = sc;

            }

            break;

        }

    }

    return tab;

}

put實現

當執行put方法插入數據時,根據key的hash值,在Node數組中找到相應的位置,實現以下:

一、若是相應位置的Node還未初始化,則經過CAS插入相應的數據;

1

2

3

4

else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {

    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))

        break;                   // no lock when adding to empty bin

}

二、若是相應位置的Node不爲空,且當前該節點不處於移動狀態,則對該節點加synchronized鎖,若是該節點的hash不小於0,則遍歷鏈表更新節點或插入新節點;

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

if (fh >= 0) {

    binCount = 1;

    for (Node<K,V> e = f;; ++binCount) {

        K ek;

        if (e.hash == hash &&

            ((ek = e.key) == key ||

             (ek != null && key.equals(ek)))) {

            oldVal = e.val;

            if (!onlyIfAbsent)

                e.val = value;

            break;

        }

        Node<K,V> pred = e;

        if ((e = e.next) == null) {

            pred.next = new Node<K,V>(hash, key, value, null);

            break;

        }

    }

}

三、若是該節點是TreeBin類型的節點,說明是紅黑樹結構,則經過putTreeVal方法往紅黑樹中插入節點;

1

2

3

4

5

6

7

8

9

else if (f instanceof TreeBin) {

    Node<K,V> p;

    binCount = 2;

    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {

        oldVal = p.val;

        if (!onlyIfAbsent)

            p.val = value;

    }

}

四、若是binCount不爲0,說明put操做對數據產生了影響,若是當前鏈表的個數達到8個,則經過treeifyBin方法轉化爲紅黑樹,若是oldVal不爲空,說明是一次更新操做,沒有對元素個數產生影響,則直接返回舊值;

1

2

3

4

5

6

7

if (binCount != 0) {

    if (binCount >= TREEIFY_THRESHOLD)

        treeifyBin(tab, i);

    if (oldVal != null)

        return oldVal;

    break;

}

五、若是插入的是一個新節點,則執行addCount()方法嘗試更新元素個數baseCount

size實現

1.8中使用一個volatile類型的變量baseCount記錄元素的個數,當插入新數據或則刪除數據時,會經過addCount()方法更新baseCount,實現以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

if ((as = counterCells) != null ||

    !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {

    CounterCell a; long v; int m;

    boolean uncontended = true;

    if (as == null || (m = as.length - 1) < 0 ||

        (a = as[ThreadLocalRandom.getProbe() & m]) == null ||

        !(uncontended =

          U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {

        fullAddCount(x, uncontended);

        return;

    }

    if (check <= 1)

        return;

    s = sumCount();

}

一、初始化時counterCells爲空,在併發量很高時,若是存在兩個線程同時執行CAS修改baseCount值,則失敗的線程會繼續執行方法體中的邏輯,使用CounterCell記錄元素個數的變化;

二、若是CounterCell數組counterCells爲空,調用fullAddCount()方法進行初始化,並插入對應的記錄數,經過CAS設置cellsBusy字段,只有設置成功的線程才能初始化CounterCell數組,實現以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

else if (cellsBusy == 0 && counterCells == as &&

         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {

    boolean init = false;

    try {                           // Initialize table

        if (counterCells == as) {

            CounterCell[] rs = new CounterCell[2];

            rs[h & 1] = new CounterCell(x);

            counterCells = rs;

            init = true;

        }

    } finally {

        cellsBusy = 0;

    }

    if (init)

        break;

}

三、若是經過CAS設置cellsBusy字段失敗的話,則繼續嘗試經過CAS修改baseCount字段,若是修改baseCount字段成功的話,就退出循環,不然繼續循環插入CounterCell對象;

1

2

else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))

    break;

因此在1.8中的size實現比1.7簡單多,由於元素個數保存baseCount中,部分元素的變化個數保存在CounterCell數組中,實現以下:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

public int size() {

    long n = sumCount();

    return ((n < 0L) ? 0 :

            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :

            (int)n);

}

 

final long sumCount() {

    CounterCell[] as = counterCells; CounterCell a;

    long sum = baseCount;

    if (as != null) {

        for (int i = 0; i < as.length; ++i) {

            if ((a = as[i]) != null)

                sum += a.value;

        }

    }

    return sum;

}

經過累加baseCountCounterCell數組中的數量,便可獲得元素的總個數;

相關文章
相關標籤/搜索