本文主要簡單介紹下ArrayBlockingQueue與LinkedBlockingQueue。node
queue | 阻塞與否 | 是否有界 | 線程安全保障 | 適用場景 | 注意事項 |
---|---|---|---|---|---|
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局鎖 | 生產消費模型,平衡兩邊處理速度 | 用於存儲隊列元素的存儲空間是預先分配的,使用過程當中內存開銷較小(無須動態申請存儲空間) |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取採用2把鎖 | 生產消費模型,平衡兩邊處理速度 | 無界的時候注意內存溢出問題,用於存儲隊列元素的存儲空間是在其使用過程當中動態分配的,所以它可能會增長JVM垃圾回收的負擔。 |
ConcurrentLinkedQueue | 非阻塞 | 無界 | CAS | 對全局的集合進行操做的場景 | size() 是要遍歷一遍集合,慎用 |
LinkedBlockingQueue在大多數併發的場景下吞吐量比ArrayBlockingQueue,可是性能不穩定。git
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.github
測試結果代表,LinkedBlockingQueue的可伸縮性要高於ArrayBlockingQueue。初看起來,這個結果有些奇怪:鏈表隊列在每次插入元素時,都必須分配一個鏈表節點對象,這彷佛比基於數組的隊列執行了更多的工做。然而,雖然它擁有更好的內存分配與GC等開銷,但與基於數組的隊列相比,鏈表隊列的put和take等方法支持併發性更高的訪問,由於一些優化後的連接隊列算法能將隊列頭節點的更新操做與尾節點的更新操做分離開來。因爲內存分配操做一般是線程本地的,所以若是算法能經過多執行一些內存分配操做來下降競爭程度,那麼這種算法一般具備更高的可伸縮性。算法
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
此外還支持公平鎖數組
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
裏頭用了LinkedBlockingQueue安全
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
使用LinkedBlockingQueue實現logger併發
public class BungeeLogger extends Logger { private final ColouredWriter writer; private final Formatter formatter = new ConciseFormatter(); // private final LogDispatcher dispatcher = new LogDispatcher(this); private final BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>(); volatile boolean running = true; Thread recvThread = new Thread(){ @Override public void run() { while (!isInterrupted() && running) { LogRecord record; try { record = queue.take(); } catch (InterruptedException ex) { continue; } doLog(record); } for (LogRecord record : queue) { doLog(record); } } }; public BungeeLogger() throws IOException { super("BungeeCord", null); this.writer = new ColouredWriter(new ConsoleReader()); try { FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true); handler.setFormatter(formatter); addHandler(handler); } catch (IOException ex) { System.err.println("Could not register logger!"); ex.printStackTrace(); } recvThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { running = false; } }); } @Override public void log(LogRecord record) { if (running) { queue.add(record); } } void doLog(LogRecord record) { super.log(record); writer.print(formatter.format(record)); } }