Java併發——阻塞隊列集(上)

簡介

阻塞隊列是一個支持兩個附加操做的隊列,這兩個附加操做支持阻塞的插入和移除方法
①.支持阻塞的插入方法:當隊列滿時,隊列會阻塞插入元素的線程,直至隊列不滿
②.支持阻塞的移除方法:當隊列空時,獲取元素的線程會等待隊列變爲非空java

在阻塞隊列不可用時,這兩個附加操做提供了4種處理方式,以下node

方法/處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

阻塞隊列

ArrayBlockingQueue:由數組結構組成的有界阻塞隊列
LinkedBlockingQueue:由鏈表結構組成的有界阻塞隊列
PriorityBlockingQueue:支持優先級排序的無界阻塞隊列
DelayQueue:使用優先級隊列實現的無界阻塞隊列
SynchronousQueue:不存儲元素的阻塞隊列
LinkedTransferQueue:由鏈表結構組成的無界阻塞隊列
LinkedBlockingDeque:由鏈表結構組成的雙向阻塞隊列
編程

ArrayBlockingQueue

ArrayBlockingQueue是一個用數組實現的有界阻塞隊列,隊列按照先進先出(FIFO)原則對元素進行排序。默認採用不公平訪問,由於公平性一般會下降吞吐量。數組

主要屬性

private static final long serialVersionUID = -817911632652898426L;
    /** 數組用來維護ArrayBlockingQueue中的元素 */
    final Object[] items;
    /** 出隊首位置索引 */
    int takeIndex;
    /** 入隊末位置索引 */
    int putIndex;
    /** 元素個數 */
    int count;
    
    final ReentrantLock lock;
    /** 出隊等待隊列 */
    private final Condition notEmpty;
    /** 入隊等待隊列 */
    private final Condition notFull;
複製代碼

put

ArrayBlockingQueue提供了不少方法入隊:add()、offer()、put()等。咱們以阻塞式方法爲主,put()方法其源碼以下
緩存

public void put(E e) throws InterruptedException {
        // 校驗元素是否爲空
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 響應中斷式獲取同步,若線程被中斷會拋出異常
        lock.lockInterruptibly();
        try {
            // 當隊列已滿,將線程添加到notFull等待隊列中
            while (count == items.length)
                notFull.await();
            // 若沒有滿,進行入隊    
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }
    
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
複製代碼

當隊列滿時,會調用Condition的await()方法將線程添加到等待隊列中。若隊列未滿調用enqueue()進行入隊操做(全部入隊方法最終都將調用該方法在隊列尾部插入元素安全

private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        // 入隊
        items[putIndex] = x;
        // 當數組添加滿後,從新從0開始
        if (++putIndex == items.length)
            putIndex = 0;
        // 元素個數+1    
        count++;
        // 喚醒出隊等待隊列中的線程
        notEmpty.signal();
    }
複製代碼

take

出隊方法有:poll()、remove(),take()等,take()方法其源碼以下併發

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 響應中斷式獲取同步,若線程被中斷會拋出異常
        lock.lockInterruptibly();
        try {
            // 若隊列空,將線程添加到notEmpty等待隊列中
            while (count == 0)
                notEmpty.await();
            // 獲取數據    
            return dequeue();
        } finally {
            lock.unlock();
        }
    }
複製代碼

當隊列爲空,會調用condition的await()方法將線程添加到notEmpty等待隊列中,若隊列不爲空則調用dequeue()獲取數據less

private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        // 獲取數據
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        // 元素個數-1    
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 通知入隊等待隊列中的線程
        notFull.signal();
        return x;
    }
複製代碼

從源碼中能夠發現ArrayBlockingQueue經過condition的等待喚醒機制完成可阻塞式的入隊和出隊函數

LinkedBlockingQueue

LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序高併發

主要屬性

/** 容量 */
    private final int capacity;
    /** 元素個數 */
    private final AtomicInteger count = new AtomicInteger();
    /** 頭節點 */
    transient Node head;
    /** 尾節點 */
    private transient Node last;
    /** 出隊鎖 */
    private final ReentrantLock takeLock = new ReentrantLock();
    /** 出隊等待隊列 */
    private final Condition notEmpty = takeLock.newCondition();
    /** 入隊鎖 */
    private final ReentrantLock putLock = new ReentrantLock();
    /** 入隊等待隊列 */
    private final Condition notFull = putLock.newCondition();
複製代碼

從屬性上來看LinkedBlockingQueue維護兩個鎖在入隊和出隊時保證線程安全,兩個鎖下降線程因爲線程沒法獲取lock而進入WAITING狀態的可能性提升了線程併發執行的效率,而且count屬性使用AtomicInteger原子操做類(可能兩個線程一個出隊一個入隊操做count,各自的鎖顯然起不到用處)

put

public void put(E e) throws InterruptedException {
        // 若新增元素爲null拋異常
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        // 獲取當前元素個數
        final AtomicInteger count = this.count;
        // 響應中斷式獲取鎖,若線程被中斷會拋出異常
        putLock.lockInterruptibly();
        try {
            // 若當前隊列已滿,將線程添加到notFull等待隊列中
            while (count.get() == capacity) {
                notFull.await();
            }
            // 若沒有滿,進行入隊
            enqueue(node);
            // 元素個數+1
            c = count.getAndIncrement();
            // 若當前元素個數+1還未到定義的最大容量,則喚醒入隊等待隊列中的線程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
 複製代碼

take

public E take() throws InterruptedException {
        E x;
        int c = -1;
        // 獲取當前元素個數
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        // 響應中斷式獲取鎖,若線程被中斷會拋出異常
        takeLock.lockInterruptibly();
        try {
            // 若當前隊列爲空,則將線程添加到notEmpty等待隊列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 獲取數據
            x = dequeue();
            // 當前元素個數-1
            c = count.getAndDecrement();
            // 若隊列中還有元素,喚醒阻塞的出隊線程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
 複製代碼

PriorityBlockingQueue

PriorityBlockingQueue是一個支持優先級的無界阻塞隊列,雖然無界但因爲資源耗盡,嘗試的添加可能會失敗(致使OutOfMemoryError ),默認狀況下元素採起天然順序升序排序,也能夠經過構造函數來指定Comparator來對元素進行排序,須要注意的是PriorityBlockingQueue不能保證同優先級元素的順序

主要屬性

/** 默認容量 */
    private static final int DEFAULT_INITIAL_CAPACITY = 11;
    /** 最大容量 */
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    /** 內置數組 */
    private transient Object[] queue;
    /** 元素個數 */
    private transient int size;
    /** 比較器,爲空則天然排序 */
    private transient Comparator comparator;
    
    private final ReentrantLock lock;
    /** 出隊等待隊列 */
    private final Condition notEmpty;
    /** 用於CAS擴容時用 */
    private transient volatile int allocationSpinLock;

    private PriorityQueue q;
複製代碼

能夠發現PriorityBlockingQueue只有一個condition,由於PriorityBlockingQueue是一個無界隊列,插入始終成功,也正由於此因此其入隊用lock.lock()方法不響應中斷,而出隊用lock.lockInterruptibly()響應中斷式獲取鎖

put

public void put(E e) {
        // 不須要阻塞
        offer(e); // never need to block
    }
    
    public boolean offer(E e) {
        // 判空
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        // 獲取鎖
        lock.lock();
        int n, cap;
        Object[] array;
        // 若大於等於當前數組長度則擴容
        while ((n = size) >= (cap = (array = queue).length))
            tryGrow(array, cap);
        try {
            // 獲取比較器
            Comparator cmp = comparator;
            if (cmp == null)
                siftUpComparable(n, e, array);
            else
                siftUpUsingComparator(n, e, array, cmp);
            // 元素個數+1    
            size = n + 1;
            // 喚醒
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
複製代碼

tryGrow擴容

private void tryGrow(Object[] array, int oldCap) {
        // 必須先釋放鎖
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // CAS設置佔用
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {
            try {
                // 新容量
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                //  新容量若超過最大值                  
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                // 若新容量大於舊容量且當前數組相等,建立新容量數組
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        // CAS設置allocationSpinLock失敗,代表有其餘線程也正在擴容,讓給其餘線程處理
        if (newArray == null) // back off if another thread is allocating
            Thread.yield();
        // 獲取鎖    
        lock.lock();
        if (newArray != null && queue == array) {
            queue = newArray;
            // 數組複製
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }
複製代碼

從源碼中能夠發現爲了儘量提升併發效率,先釋放鎖在計算新容量時利用CAS設置allocationSpinLock來保證線程安全,再最後獲取鎖進行數組複製擴容。擴容完後,根據比較器的排序規則進行新增

siftUpComparable(),比較器comparator爲null時採起天然排序調用此方法

private static  void siftUpComparable(int k, T x, Object[] array) {
        Comparable key = (Comparable) x;
        // 若當前元素個數大於0,即隊列不爲空
        while (k > 0) {
            // (n - 1) / 2
            int parent = (k - 1) >>> 1;
            // 獲取parent位置上的元素
            Object e = array[parent];
            // 從隊列的最後往上調整堆,直到不小於其父節點爲止
            if (key.compareTo((T) e) >= 0)
                break;
            // 若是當前節點小於其父節點,則將其與父節點進行交換,並繼續往上訪問父節點    
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }
複製代碼

此方法爲建堆過程,假定PriorityBlockingQueue內部數組以下:

轉換爲堆(堆是一種二叉樹結構):

往其添加元素2,k爲當前元素個數12,計算parent爲5,e爲6,e大於2,交換位置

第二次循環,k=5,parent=2,e=5,5>2交換位置

第三次循環,k=2,parent=0,e=1,1<2退出循環,第2個位置給新元素2

其主要思路 末位置尋找其父節點,若新增元素小於父節點則將其與父節點進行交換,並繼續往上訪問父節點,直到大於等於其父節點爲止

siftUpUsingComparator(),當比較器不爲null,採用指定比較器,調用此方法

private static  void siftUpUsingComparator(int k, T x, Object[] array,
                                       Comparator cmp) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (cmp.compare(x, (T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = x;
    }
複製代碼

take

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
複製代碼

獲取鎖後,調用dequeue()

private E dequeue() {
        // 若隊列爲空,返回null
        int n = size - 1;
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 出隊元素,首元素
            E result = (E) array[0];
            // 最後一個元素
            E x = (E) array[n];
            array[n] = null;
            Comparator cmp = comparator;
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }
複製代碼

天然排序處理siftDownComparable()

private static  void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        if (n > 0) {
            Comparable key = (Comparable)x;
            int half = n >>> 1;           // loop while a non-leaf
            while (k < half) {
                // 左節點
                int child = (k << 1) + 1; // assume left child is least
                Object c = array[child];
                // 右節點
                int right = child + 1;
                if (right < n &&
                    ((Comparable) c).compareTo((T) array[right]) > 0)
                    c = array[child = right];
                if (key.compareTo((T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }
    }
複製代碼

指定排序siftDownUsingComparator()

private static  void siftDownUsingComparator(int k, T x, Object[] array,
                                                    int n,
                                                    Comparator cmp) {
        if (n > 0) {
            int half = n >>> 1;
            while (k < half) {
                int child = (k << 1) + 1;
                Object c = array[child];
                int right = child + 1;
                if (right < n && cmp.compare((T) c, (T) array[right]) > 0)
                    c = array[child = right];
                if (cmp.compare(x, (T) c) <= 0)
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = x;
        }
    }
複製代碼

以上面最後一個圖爲基礎出隊第一個元素

第一次循環:k=0,n=12,half=6,child=1,c爲圖中節點3,right=2,通過子節點比較找出較小值2,2與末尾值節點6相比,末位置更大,首位置與右子節點交換位置

第二次循環:k=2,child=5,c爲圖中節點5,right=6,通過子節點比較找出較小值5,5與末位置節點6相比,末位置更大,與左子節點交換位置

第三次循環:k=5,child=11,c爲圖中節點8,right=12,通過子節點比較找出較小值末位置節點6相比

其主要思路:首位置尋找其子節點,找出兩個子節點的較小的與末尾位置節點比較若末尾節點小,則將其置入首位置,不然首位置與較小子節點替換位置,以此略推繼續往下找

DelayQueue

DelayQueue是一個支持延時獲取元素的無界阻塞隊列,隊列使用PriorityQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素,只有在延遲期滿時才能從隊列中提取元素,能夠將其應用在緩存、定時任務調度等場景

Delayed接口

DelayQueue隊列中的元素必須實現Delayed接口,咱們先看Delayed接口繼承關係

從圖中咱們能夠知道,實現Delayed接口,咱們必須實現其自定義的getDelay()方法以及繼承過來的compareTo()方法

主要屬性

private final transient ReentrantLock lock = new ReentrantLock();
    /** 優先級隊列 */
    private final PriorityQueue q = new PriorityQueue();

    private Thread leader = null;

    private final Condition available = lock.newCondition();
複製代碼

put

public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 向PriorityQueue添加元素
            q.offer(e);
            // 若當前元素
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
複製代碼

其添加操做基於PriorityQueue的offer方法

public boolean offer(E e) {
        // 判空
        if (e == null)
            throw new NullPointerException();
        // 修改次數
        modCount++;
        int i = size;
        // 判斷是否須要擴容
        if (i >= queue.length)
            grow(i + 1);
        // 元素個數+1    
        size = i + 1;
        // 若隊列爲空,首元素置爲e
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);
        return true;
    }
    
    private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        // 天然排序
        else
            siftUpComparable(k, x);
    }
    
    /**
     * 天然排序
     */
    private void siftUpComparable(int k, E x) {
        Comparable key = (Comparable) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
    }
    
    /**
     * 指定比較器
     */
    private void siftUpUsingComparator(int k, E x) {
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (comparator.compare(x, (E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = x;
    }
複製代碼

PriorityQueue的天然排序或指定比較器處理新增操做與PriorityBlockingQueue的邏輯差很少,這裏就再也不過多分析,可是從源碼咱們發現了modCount,代表PriorityQueue是線程不安全的,可是因爲DelayQueue能夠依靠ReentrantLock來確保同步安全。新增完後會判斷新增元素是否爲隊列首元素,如果將leader設置爲空,並喚醒全部等待線程

take

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            // 死循環
            for (;;) {
                // 獲取隊列首元素,若隊列爲空返回null
                E first = q.peek();
                // 若隊列爲空
                if (first == null)
                    available.await();
                else {
                    // 獲取剩餘延遲時間
                    long delay = first.getDelay(NANOSECONDS);
                    // 若小於0代表已過時,出隊
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    // 若leader!= null 代表有其餘線程正在操做
                    if (leader != null)
                        available.await();
                    else {
                        // 不然將leader置爲當前線程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 指定時間等待
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
複製代碼

總體出隊邏輯再也不多述,來講下leader和first

  • leader
  • 從源碼咱們能夠看到leader屬性在put()與take()方法中都有出現,其做用在於減小沒必要要的競爭,若leader不爲空說明已經有線程正在操做,直接一直等待便可不必再爭。舉個例子假定有線程A、B、C依次要出隊,線程A先獲取鎖因爲首元素未過時,指定剩餘時間等待,若不採用leader直接一直等待,線程B和C也指定時間等待,那麼會形成三個線程同時競爭首元素,原本A→B→C的順序可能致使亂序不是線程所想要的元素

  • first
  • 在take()方法中爲何要將first置爲null,英文註解當等待時不要持有依賴。若不置空假定線程A等待,其棧幀中會存有first局部變量所指元素引用,線程B請求仍然等待其棧幀也存有first局部變量所指元素引用,以此略推後來線程等待後棧幀中都會存有,那麼當線程A成功出隊首元素,其餘線程依然佔有其引用, 致使一直回收不了,這樣就可能會形成內存泄漏

    感謝

    《java併發編程的藝術》
    cmsblogs.com/?p=2407

    相關文章
    相關標籤/搜索