ArrayBlockingQueue與LinkedBlockingQueue

本文主要簡單介紹下ArrayBlockingQueue與LinkedBlockingQueue。node

對比

queue 阻塞與否 是否有界 線程安全保障 適用場景 注意事項
ArrayBlockingQueue 阻塞 有界 一把全局鎖 生產消費模型,平衡兩邊處理速度 用於存儲隊列元素的存儲空間是預先分配的,使用過程當中內存開銷較小(無須動態申請存儲空間)
LinkedBlockingQueue 阻塞 可配置 存取採用2把鎖 生產消費模型,平衡兩邊處理速度 無界的時候注意內存溢出問題,用於存儲隊列元素的存儲空間是在其使用過程當中動態分配的,所以它可能會增長JVM垃圾回收的負擔。
ConcurrentLinkedQueue 非阻塞 無界 CAS 對全局的集合進行操做的場景 size() 是要遍歷一遍集合,慎用

內存方面

  • ArrayBlockingQueue
    用於存儲隊列元素的存儲空間是預先分配的,使用過程當中內存開銷較小(無須動態申請存儲空間)
  • LinkedBlockingQueue
    用於存儲隊列元素的存儲空間是在其使用過程當中動態分配的,所以它可能會增長JVM垃圾回收的負擔。

有界無界

  • ArrayBlockingQueue
    有界,適合已知最大存儲容量的場景
  • LinkedBlockingQueue
    可有界能夠無界

吞吐量

LinkedBlockingQueue在大多數併發的場景下吞吐量比ArrayBlockingQueue,可是性能不穩定。git

Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.github

圖片描述

測試結果代表,LinkedBlockingQueue的可伸縮性要高於ArrayBlockingQueue。初看起來,這個結果有些奇怪:鏈表隊列在每次插入元素時,都必須分配一個鏈表節點對象,這彷佛比基於數組的隊列執行了更多的工做。然而,雖然它擁有更好的內存分配與GC等開銷,但與基於數組的隊列相比,鏈表隊列的put和take等方法支持併發性更高的訪問,由於一些優化後的連接隊列算法能將隊列頭節點的更新操做與尾節點的更新操做分離開來。因爲內存分配操做一般是線程本地的,所以若是算法能經過多執行一些內存分配操做來下降競爭程度,那麼這種算法一般具備更高的可伸縮性。算法

併發方面

  • ArrayBlockingQueue
    採用一把鎖,兩個condition
/** Main lock guarding all access */
    final ReentrantLock lock;

    /** Condition for waiting takes */
    private final Condition notEmpty;

    /** Condition for waiting puts */
    private final Condition notFull;
    
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

    /**
     * Extracts element at current take position, advances, and signals.
     * Call only when holding lock.
     */
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        notFull.signal();
        return x;
    }

此外還支持公平鎖數組

/**
     * Creates an {@code ArrayBlockingQueue} with the given (fixed)
     * capacity and the specified access policy.
     *
     * @param capacity the capacity of this queue
     * @param fair if {@code true} then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if {@code false} the access order is unspecified.
     * @throws IllegalArgumentException if {@code capacity < 1}
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
  • LinkedBlockingQueue
    頭尾各1把鎖
/** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();
    
    /**
     * Inserts the specified element at the tail of this queue if it is
     * possible to do so immediately without exceeding the queue's capacity,
     * returning {@code true} upon success and {@code false} if this queue
     * is full.
     * When using a capacity-restricted queue, this method is generally
     * preferable to method {@link BlockingQueue#add add}, which can fail to
     * insert an element only by throwing an exception.
     *
     * @throws NullPointerException if the specified element is null
     */
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

應用實例

Executors

裏頭用了LinkedBlockingQueue安全

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

使用LinkedBlockingQueue實現logger併發

public class BungeeLogger extends Logger {

    private final ColouredWriter writer;
    private final Formatter formatter = new ConciseFormatter();
//    private final LogDispatcher dispatcher = new LogDispatcher(this);

    private final BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>();

    volatile boolean running = true;

    Thread recvThread = new Thread(){
        @Override
        public void run() {
            while (!isInterrupted() && running) {
                LogRecord record;
                try {
                    record = queue.take();
                } catch (InterruptedException ex) {
                    continue;
                }

                doLog(record);
            }
            for (LogRecord record : queue) {
                doLog(record);
            }
        }
    };

    public BungeeLogger() throws IOException {
        super("BungeeCord", null);
        this.writer = new ColouredWriter(new ConsoleReader());

        try {
            FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true);
            handler.setFormatter(formatter);
            addHandler(handler);
        } catch (IOException ex) {
            System.err.println("Could not register logger!");
            ex.printStackTrace();
        }
        recvThread.start();

        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                running = false;
            }
        });
    }

    @Override
    public void log(LogRecord record) {
        if (running) {
            queue.add(record);
        }
    }

    void doLog(LogRecord record) {
        super.log(record);
        writer.print(formatter.format(record));
    }
}

doc

相關文章
相關標籤/搜索