queue | 阻塞與否 | 是否有界 | 線程安全保障 | 適用場景 | 注意事項 |
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局鎖 | 生產消費模型,平衡兩邊處理速度 | 用於存儲隊列元素的存儲空間是預先分配的,使用過程當中內存開銷較小(無須動態申請存儲空間) |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取採用2把鎖 | 生產消費模型,平衡兩邊處理速度 | 無界的時候注意內存溢出問題,用於存儲隊列元素的存儲空間是在其使用過程當中動態分配的,所以它可能會增長JVM垃圾回收的負擔。 |
ConcurrentLinkedQueue | 非阻塞 | 無界 | CAS | 對全局的集合進行操做的場景 | size() 是要遍歷一遍集合,慎用 |
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.github
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue's capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
public class BungeeLogger extends Logger { private final ColouredWriter writer; private final Formatter formatter = new ConciseFormatter(); // private final LogDispatcher dispatcher = new LogDispatcher(this); private final BlockingQueue<LogRecord> queue = new LinkedBlockingQueue<>(); volatile boolean running = true; Thread recvThread = new Thread(){ @Override public void run() { while (!isInterrupted() && running) { LogRecord record; try { record = queue.take(); } catch (InterruptedException ex) { continue; } doLog(record); } for (LogRecord record : queue) { doLog(record); } } }; public BungeeLogger() throws IOException { super("BungeeCord", null); this.writer = new ColouredWriter(new ConsoleReader()); try { FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true); handler.setFormatter(formatter); addHandler(handler); } catch (IOException ex) { System.err.println("Could not register logger!"); ex.printStackTrace(); } recvThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { running = false; } }); } @Override public void log(LogRecord record) { if (running) { queue.add(record); } } void doLog(LogRecord record) { super.log(record); writer.print(formatter.format(record)); } }