Concurrent下的線程安全集合

1.ArrayBlockingQueuejava

ArrayBlockingQueue是由數組支持的線程安全的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。此類支持對等待的生產者線程和消費者線程進行排序的可選公平策略。默認狀況下,不保證是這種排序。然而,經過將公平性 (fairness) 設置爲 true 而構造的隊列容許按照 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」。 node

 public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

從改造方法能夠看出,ArrayBlockingQueue的實現機制是ReentrantLock和Condition來實現的。數組

 

二、LinkedBlockingDeque緩存

LinkedBlockingDeque是用雙向鏈表實現的,須要說明的是LinkedList也已經加入了Deque的一部分安全

   /** Maximum number of items in the deque */
    private final int capacity;
 /**
     * Creates a {@code LinkedBlockingDeque} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }
 public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }
  1. 要想支持阻塞功能,隊列的容量必定是固定的,不然沒法在入隊的時候掛起線程。也就是capacity是final類型的。
  2. 既然是雙向鏈表,每個結點就須要先後兩個引用,這樣才能將全部元素串聯起來,支持雙向遍歷。也即須要prev/next兩個引用。
  3. 雙向鏈表須要頭尾同時操做,因此須要first/last兩個節點,固然能夠參考LinkedList那樣採用一個節點的雙向來完成,那樣實現起來就稍微麻煩點。
  4. 既然要支持阻塞功能,就須要鎖和條件變量來掛起線程。這裏使用一個鎖兩個條件變量來完成此功能。

 

三、LinkedBlockingQueue多線程

LinkedBlockingQueue是一個基於已連接節點的、範圍任意的blocking queue的實現,也是線程安全的。按 FIFO(先進先出)排序元素。隊列的頭部 是在隊列中時間最長的元素。隊列的尾部 是在隊列中時間最短的元素。併發

 /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

  可選的容量範圍構造方法參數做爲防止隊列過分擴展的一種方法。若是未指定容量,則它等於 Integer.MAX_VALUE。除非插入節點會使隊列超出容量,不然每次插入後會動態地建立連接節點。app

 此外它還不接受null值:less

  public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();

 

四、PriorityBlockingQueue函數

PriorityBlockingQueue是一個無界的線程安全的阻塞隊列,它使用與PriorityQueue相同的順序規則,而且提供了阻塞檢索的操做。

    public PriorityBlockingQueue(int initialCapacity) {
        this(initialCapacity, null);
    }
  public PriorityBlockingQueue(int initialCapacity,
                                 Comparator<? super E> comparator) {
        if (initialCapacity < 1)
            throw new IllegalArgumentException();
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        this.comparator = comparator;
        this.queue = new Object[initialCapacity];
    }

從其構造方法能夠看到到,有一個Comparator的接口。沒錯,這個就是判斷元素Priority的關鍵:當前和其餘對象比較,若是compare方法返回負數,那麼在隊列裏面的優先級就比較高。固然,你在建立PriorityBlockingQueue的時候能夠不指定Comparator對象,可是你被要求在被存放元素中去實現。

  public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            Comparator<? super E> cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
  private static <T> void siftUpComparable(int k, T x, Object[] array) {
        Comparable<? super T> key = (Comparable<? super T>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

  每次offer元素,都會有一個siftUpComparable操做,也就是排序,若是沒有構造的時候傳入本身實現的比較器,就採用天然排序,不然採用比較器規則,進行二分查找,比較,保持列頭是比較器但願的那個最大或則最小元素。

 

五、ConcurrentHashMap、ConcurrentLinkedQueue、ConcurrentLinkedDeque

ConcurrentHashMap支持高併發、高吞吐量的線程安全HashMap實現,其實現原理是鎖分離機制,將數據分Segment管理。每一個Segment擁有獨立的鎖。

    /**
     * The segments, each of which is a specialized hash table
     */
    final Segment<K,V>[] segments;

 下面代碼是Hash鏈中的元素:

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }

  咱們能夠看到Key,hash,HashEntry都是final類型的,這就決定了ConcurrentHashMap的必須在鏈表頭插入,修改也只能從鏈表頭開始遍歷找到對應Key的元素進行修改,而刪除這就須要將要刪除節點的前面全部節點整個複製一遍,最後一個節點指向要刪除結點的下一個結點。注意到Value使用了volatile修飾,這樣程序在讀的時候就不用加鎖也能保證內存可見性。固然,在跨段操做(contains,size)中,仍是會獲取所有Segment中的鎖去操做的,儘可能避免跨段操做。

  ConcurrentLinkedQueue、ConcurrentLinkedDeque分別是使用單向鏈表和雙向鏈表實現,原理仍是鎖分離機制。

七、ConcurrentSkipListMap

 ConcurrentSkipListMap提供了一種線程安全的併發訪問的排序映射表。內部是SkipList(跳錶)結構實現,在理論上可以在O(log(n))時間內完成查找、插入、刪除操做。 在非多線程的狀況下,應當儘可能使用TreeMap。此外對於併發性相對較低的並行程序可使用Collections.synchronizedSortedMap將TreeMap進行包裝,也能夠提供較好的效率。對於高併發程序,應當使用ConcurrentSkipListMap,可以提供更高的併發度。一樣,ConcurrentSkipListMap支持Map的鍵值進行排序(參考:http://hi.baidu.com/yao1111yao/item/0f3008163c4b82c938cb306d)

 

  concurrentHashMap與ConcurrentSkipListMap性能測試   在4線程1.6萬數據的條件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。 
 但ConcurrentSkipListMap有幾個ConcurrentHashMap 不能比擬的優勢:   一、ConcurrentSkipListMap 的key是有序的。 
二、ConcurrentSkipListMap 支持更高的併發。ConcurrentSkipListMap 的存取時間是log(N),和線程數幾乎無關。也就是說在數據量必定的狀況下,併發的線程越多,ConcurrentSkipListMap越能體現出他的優點(參考:http://wenku.baidu.com/link?url=n40zltjgTbXUuV2CtXX1E4sBila9SI5rBs_qK1flOkwmJThF5ICLpF1xvU504PyUYGxx5RmqDdJdnYljcMro9gQ8AQe7RXgxKVfs2MV1J7m)。 

八、ConcurrentSkipListSet

ConcurrentSkipListSet是線程安全的有序的集合,適用於高併發的場景。ConcurrentSkipListSet和TreeSet,它們雖然都是有序的集合。可是,第一,它們的線程安全機制不一樣,TreeSet是非線程安全的,而ConcurrentSkipListSet是線程安全的。第二,ConcurrentSkipListSet是經過ConcurrentSkipListMap實現的,而TreeSet是經過TreeMap實現的。

 

九、CopyOnWriteArrayList、CopyOnWriteArraySet

傳統的List在多線程同時讀寫的時候會拋出java.util.ConcurrentModificationException,而CopyOnWriteArrayList是使用CopyOnWrite(寫時複製)技術解決了這個問題,這通常須要很大的開銷,可是當遍歷操做的數量大大超過可變操做的數量時,這種方法可能比其餘替代方法更有效。

寫時複製:

 /**
     * Appends the specified element to the end of this list.
     *
     * @param e element to be appended to this list
     * @return {@code true} (as specified by {@link Collection#add})
     */
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

咱們能夠看到寫的過程當中加了鎖,由於若是不加鎖的話,每條線程都會生成一個快照,形成內存消耗。先Arrays.copyOf了一分內存快照,而後寫這分內存快照,寫完最後將這分內存快照的應用轉移到CopyOnWriteArrayList中。

/** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

關於讀,存儲的變量使用volatile關鍵字,能夠不加鎖的狀況下解決內存可見性的問題。對於CopyOnWriteArraySet而言就簡單多了,只是持有一個CopyOnWriteArrayList,僅僅在add/addAll的時候檢測元素是否存在,若是存在就不加入集合中。

  最後關於CopyOnWrite的建議,因爲插入會Copy內存,最後會致使垃圾回收,因此儘可能少使用add操做,若是須要,儘可能使用批量插入操做。對於常常插入的容器是不建議用這個的。

 

十、DelayQueue

  DelayQueue是一個無界阻塞隊列,只有在延遲期滿時才能從中提取元素。該隊列的頭部是延遲期滿後保存時間最長的Delayed 元素。根據這個特性麼咱們可使用DelayQueue來實現緩存系統、實時調度系統等。

  DelayQueue是一個BlockingQueue,其特化的參數是Delayed。Delayed擴展了Comparable接口,比較的基準爲延時的時間值,Delayed接口的實現類getDelay的返回值應爲固定值(final)。DelayQueue內部是使用PriorityQueue實現的。咱們能夠說DelayQueue = BlockingQueue + PriorityQueue + Delayed;

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

查看器take方法的實現,能夠了解到它確實是根據元素的延遲期來決定是否可讀的:

  public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

 

十一、LinkedTransferQueue

LinkedTransferQueue=ConcurrentLinkedQueue+SynchronousQueue (in 「fair」 mode)+LinkedBlockingQueue,LinkedTransferQueue實現了一個重要的接口TransferQueue,該接口含有下面幾個重要方法:

1. transfer(E e)   若當前存在一個正在等待獲取的消費者線程,即馬上移交之;不然,會插入當前元素e到隊列尾部,而且等待進入阻塞狀態,到有消費者線程取走該元素。2. tryTransfer(E e)   若當前存在一個正在等待獲取的消費者線程(使用take()或者poll()函數),使用該方法會即刻轉移/傳輸對象元素e;   若不存在,則返回false,而且不進入隊列。這是一個不阻塞的操做。3. tryTransfer(E e, long timeout, TimeUnit unit)   若當前存在一個正在等待獲取的消費者線程,會當即傳輸給它; 不然將插入元素e到隊列尾部,而且等待被消費者線程獲取消費掉,   若在指定的時間內元素e沒法被消費者線程獲取,則返回false,同時該元素被移除。4. hasWaitingConsumer()   判斷是否存在消費者線程5. getWaitingConsumerCount()   獲取全部等待獲取元素的消費線程數量

相關文章
相關標籤/搜索