Java 併發集合類

集合

 1 ConcurrentHashMap

  基於散列鏈表+紅黑樹實現,相似於 HashMap,JDK 8 進行了優化,利用 volatile + CAS 實現無鎖化操做,保證線程安全的同時,提升性能。默認容量16,默認加載因子0.75;
  散列鏈表和紅黑樹的內部類定義以下:數組

// 基本結構
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
}
// 紅黑樹結構,鏈表長度大於8時轉換
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
}

  和 HashMap 比較,多了內部類 TreeBin,它不存儲鍵值,而是指向 TreeNode 列表和它們的根節點,而 ConcurrentHashMap 也是操做 TreeBin。此外,TreeBin 還維護了讀寫鎖狀態,這會使得在樹重組以前,持有鎖的寫操做會等待未持鎖的讀操做完成。安全

// 指向TreeNode列表和它們的根節點,
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    static final int WRITER = 1; // 持有寫鎖時
    static final int WAITER = 2; // 等待寫鎖時
    static final int READER = 4; // 用來設置讀鎖的增量值
}

  如何作到線程安全的呢?
  1. sizeCtl:控制表的初始化和重建。負數表示表正在初始化或者重建:
    -1表示在初始化;
    -(1+N)表示有N個正在進行重建的線程;
  2. 節點哈希值表示的狀態
    MOVED=-1 表示 forward 節點;
    TREEBIN=-2 表示 treeBin 的根節點;
  3. V put(K key, V value) 操做
    若是表爲空,則初始化表;
    若是hash位置爲空,則經過CAS設置值;
    若是hash=-1,則幫組擴容;
    若是節點既不爲空,也不等於-1,那麼鎖住該節點,在鏈表或者紅黑樹上添加值;
  4. void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) 擴容
    初始化新表,容量是原表的2倍;
    複製元素到新表,處理一個節點就把節點設置爲forward;
    當這個節點爲空時,經過CAS來設置forward;
    當節點值爲-1時,表示forward已經處理過了;
    當節點不爲空且不爲-1時,鎖住節點進行處理;
    其餘線程看到節點爲forward時,向後遍從來完成;
  5. V get(Object key) 操做
    若是hash值匹配,則直接獲取;
    若是hash值小於0,則從樹上查找;
    不然,遍歷鏈表尋找;
  6. mappingCount()(推薦,由於其返回值時long) 和 size(),都是調用 sumCount() 來計算
    定義了內部類 CounterCell 來計數,計算總數時,就是把 CounterCell[] 數組的值累加起來便可;併發

// put 源碼
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
    tab = initTable(); // 表長度爲空時,初始化表
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
        break;                   // hash的位置爲空時,經過CAS設置值
}
else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f); // 若是節點是 forward 節點,幫助擴容
else {
    synchronized (f) { // 不爲空,不是 forward 節點時,synchronized 鎖住節點
        if (tabAt(tab, i) == f) { // 鎖住後再次判斷節點有沒有變化
            if (fh >= 0) { 
                ... // 表示要操做鏈表節點
            }
            else if (f instanceof TreeBin) {
                ... // 表示操做的是TreeBin節點
            }
        }
    }
    if (binCount != 0) {
        if (binCount >= TREEIFY_THRESHOLD)
            treeifyBin(tab, i); // 鏈表長度大於8,轉成紅黑樹
    }
}
// 併發擴容的方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // 初始化新的表,容量是原表的2倍
        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
        nextTab = nt;
        nextTable = nextTab; // nextTable 是定義的臨時表,僅在表重建時不爲空
        transferIndex = n; // 表重建時的下一個表的索引
    }
    int nextn = nextTab.length; // 擴容後的表長度
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true; // true 表示該節點已處理
    boolean finishing = false; // 確保已經完成了
    for (int i = 0, bound = 0;;) {
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                ... // 完成了
                return;
            }
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1,表示多了一個線程來擴容
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd); // 節點位置是空的,經過CAS設置值爲forward
        else if ((fh = f.hash) == MOVED)
            advance = true; // 這個位置是forward節點,表示已經處理了
        else {
            synchronized (f) { // 節點不爲空,且不是forward節點,鎖住該節點再處理
                ... // 相似put的操做
            }
        }
    }
}
// get 源碼
if ((eh = e.hash) == h) {
    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
        return e.val; // 直接得到值
}
else if (eh < 0)
    return (p = e.find(h, key)) != null ? p.val : null; // 在樹上查找
while ((e = e.next) != null) {
    if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
        return e.val; // 遍歷鏈表查找
}
// 計數方法
private transient volatile CounterCell[] counterCells; // 數組,存儲統計值
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value; // 統計值累加
        }
    }
    return sum;
}

 2 ConcurrentSkipListMap

  基於跳錶實現,按照 key 天然排序,key 不能爲 null,相似 TreeMap。
  利用 volatile+CAS 來保證線程安全。app

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile Node<K,V> next;
}
boolean casValue(Object cmp, Object val) {
    return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}

 3 ConcurrentSkipListSet

  使用 ConcurrentSkipListMap 實現。ide

 4 CopyOnWriteArrayList

  基於數組實現,至關於支持併發的 ArrayList,剛建立時初始化爲長度0的數組。
  利用寫時複製來保證線程安全。
  寫時複製:數組是 volatile 類型的,修改數組時,首先 ReentrantLock 加鎖,而後複製一個副本數組,對副本進行修改完成後,把原來的數組指向這個新的數組完成賦值。讀時不用加鎖。工具

private transient volatile Object[] array;
public boolean add(E e) {
// 加鎖進行寫時複製
final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        Object[] elements = getArray();
        int len = elements.length;
        // 拷貝新數組,長度+1
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e; 
        // set給volatile的array
        setArray(newElements);
        return true;
  } finally {
        lock.unlock();
    }
}

 5 CopyOnWriteArraySet

  使用 CopyOnWriteArrayList 實現。去重的,可是按照插入順序排序的。性能

非阻塞隊列

 1 ConcurrentLinkedQueue

  基於鏈表實現的無界的線程安全的非阻塞隊列,遵循 FIFO,利用 volatile+CAS 來保證線程安全。優化

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;
}
// 初始化 head 和 tail
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
// 利用 CAS 修改鏈表
private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

 2 ConcurrentLinkedDeque

  基於雙向鏈表實現的無界的線程安全的非阻塞隊列,實現方式相似 ConcurrentLinkedQueue。this

// 雙向鏈表
static final class Node<E> {
    volatile Node<E> prev;
    volatile E item;
    volatile Node<E> next;
}

阻塞隊列

  實現:經過 ReentrantLock 和 Condition 實現的等待通知模型來實現阻塞隊列。線程

 1 ArrayBlockingQueue

  基於數組實現的阻塞隊列,須要指定容量。

// poll 相似
public boolean offer(E e) {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖
    try {
        if (count == items.length)
            return false; // 超過長度,返回false,數據丟失
        final Object[] items = this.items;
        items[putIndex] = x; // putIndex表示下一次加元素的索引
        if (++putIndex == items.length)
            putIndex = 0; // 達到長度後,索引位歸零
        count++; // 計數+1
        notEmpty.signal(); // 通知能夠取值了
        return true;
    } finally {
        lock.unlock(); // 解鎖
    }
}

 2 LinkedBlockingQueue

  基於鏈表實現的阻塞隊列,默認容量爲 Integer.MAX_VALUE。
  實現相似 ArrayBlockingQueue,計數用的原子類 AtomicInteger。

 3 PriorityBlockingQueue

  基於二叉小頂堆實現的阻塞隊列,保證取出的元素是最小的,默認初始化容量11。

 4 DelayQueue

  基於數組實現的延遲阻塞隊列。使用時必須實現 Delayed。

原子操做類

  以 AtomicInteger 爲例,利用 volatile+CAS 來保證原子操做,直接看源碼註釋

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;

private volatile int value;

// 直接獲取 volatile 變量
public final int get() {
    return value;
}
// 經過 Unsafe 的 CAS 原子操做 volatile 變量
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 經過 Unsafe 的 CAS 原子操做 + 1
public final int incrementAndGet() {
    return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}

併發工具類

 1 CountDownLatch

  功能:指定 N 個線程等待所有完成後,繼續執行。
  實現:內部類 Sync 實現了 AQS 同步器,初始化時設置 AQS 的同步狀態來表示 countDown 的數量,await() 方法把當前線程加入到 AQS 等待隊列,讓當前線程阻塞住,執行 countDown() 方法會把同步狀態減1,當減到0時,喚醒等待隊列中的線程。

 2 CyclicBarrier

  功能:相似 CountDownLatch,可是支持 reset() 重置狀態,能指定到達數量時執行的動做。
  實現:基於 ReentrantLock 和 Condition 實現,核心源碼以下

private int dowait(boolean timed, long nanos) {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 加鎖,保護 count
    try {
        
        if (Thread.interrupted()) {
            breakBarrier(); // 使用 signalAll 喚醒全部線程
            throw new InterruptedException();
        }

        int index = --count; // 線程數量遞減
        if (index == 0) {  // 若是線程數量到達 0
            final Runnable command = barrierCommand;
            if (command != null)
                command.run(); // 執行 barrierAction
            return 0;
        }

        // 此時線程數量還沒到 0
        for (;;) {
            try {
                if (!timed)
                    trip.await(); // 調用 Condition 的 await 進行等待
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos); // 待超時的等待
            }
        }
    } finally {
        lock.unlock(); // 釋放鎖
    }
}

線程池

 ThreadPoolExecutor 參數說明:
  1. 核心線程池
  2. 最大線程池
  3. 線程空閒時間
  4. 線程空閒時間單位
  5. 阻塞隊列
  6. 線程工廠:建立具備相同特性的一組線程。
  7. 拒絕策略
   CallerRunsPolicy:重試添加當前的任務,會自動重複調用 execute() 方法,直到成功。
   AbortPolicy:對拒絕任務拋棄處理,而且拋出異常。
   DiscardPolicy:對拒絕任務直接無聲拋棄,沒有異常信息。
   DiscardOldestPolicy:對拒絕任務不拋棄,而是拋棄隊列裏面等待最久的一個線程,而後把拒絕任務加到隊列。

   線程池數量理論值 -> CPU 密集型:N+1;IO 密集型:2N+1

   線程的提交方式:
   1. execute():用於提交不須要返回值的任務
   2. submit():用於提交須要返回值的任務,返回future對象。

   線程池線程的執行流程:核心 -> 隊列 -> 最大 -> 拒絕策略
   1. 若是當前運行的線程少於核心線程池時,則建立新的線程執行任務;
   2. 若是當前運行的線程大於等於核心線程池時,則把任務加入阻塞隊列;
   3. 若是阻塞隊列已經滿了,則建立新的線程執行任務;
   4. 若是線程數超過了最大線程數,則調用拒絕策略

相關文章
相關標籤/搜索