LinkedBlockingQueue的put,add和offer這三個方法功能很類似,都是往隊列尾部添加一個元素。既然都是一樣的功能,爲啥要有有三個方法呢?java
這三個方法的區別在於:node
索引這三種不一樣的方法在隊列滿時,插入失敗會有不一樣的表現形式,咱們能夠在不一樣的應用場景中選擇合適的方法。app
先看看add
方法,less
public class LinkedBlockingQueueTest { public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2); fruitQueue.add("apple"); fruitQueue.add("orange"); fruitQueue.add("berry"); }
當咱們執行這個方法的時候,會報下面的異常,async
Exception in thread "main" java.lang.IllegalStateException: Queue full at java.util.AbstractQueue.add(AbstractQueue.java:98) at com.pony.app.LinkedBlockingQueueTest.testAdd(LinkedBlockingQueueTest.java:23) at com.pony.app.LinkedBlockingQueueTest.main(LinkedBlockingQueueTest.java:16)
而後再來看看put
用法,ide
public class LinkedBlockingQueueTest implements Runnable { static LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2); public static void main(String[] args) throws InterruptedException { new Thread(new LinkedBlockingQueueTest()).start(); fruitQueue.put("apple"); fruitQueue.put("orange"); fruitQueue.put("berry"); System.out.println(fruitQueue.toString()); } @Override public void run() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } fruitQueue.poll(); } }
運行這段代碼,你會發現首先程序會卡住(隊列阻塞)3秒左右,而後打印隊列的orange
和berry
兩個元素。源碼分析
由於我在程序的啓動的時候順便啓動了一個線程,這個線程會在3秒後從隊列頭部移除一個元素。ui
最後看看offer
的用法,this
public static void main(String[] args) throws InterruptedException { LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2); System.out.println(fruitQueue.offer("apple")); System.out.println(fruitQueue.offer("orange")); System.out.println(fruitQueue.offer("berry")); }
運行結果:線程
true true false
先來看看add
方法的實現,
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
因此add實際上是包裝了一下offer,沒什麼能夠說的。
而後來看看put
和offer
的實現,兩個放在一塊兒說。
put方法源碼,
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
offer方法源碼,
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1; final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return true; }
咱們重點關注他們的區別,offer方法在插入的時候會等一個超時時間timeout
,若是時間到了隊列仍是滿的(count.get() == capacity
),就會返回false。
而put方法是無限期等待,
while (count.get() == capacity) { notFull.await(); }
因此咱們在應用層使用的時候,若是隊列滿再插入會阻塞。
在早期版本的kafka中,生產者端發送消息使用了阻塞隊列,代碼以下:
private def asyncSend(messages: Seq[KeyedMessage[K,V]]) { for (message <- messages) { val added = config.queueEnqueueTimeoutMs match { case 0 => queue.offer(message) case _ => try { if (config.queueEnqueueTimeoutMs < 0) { queue.put(message) true } else { queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS) } } catch { case _: InterruptedException => false } } if(!added) { producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark() producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark() throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString) }else { trace("Added to send queue an event: " + message.toString) trace("Remaining queue size: " + queue.remainingCapacity) } } }
能夠看到,config.queueEnqueueTimeoutMs
是0的時候,使用的是offer
方法,小於0的時候則使用put
方法。
咱們在使用kafka的時候,能夠經過queue.enqueue.timeout.ms
來決定使用哪一種方式。好比某些應用場景下,好比監控,物聯網等場景,容許丟失一些消息,能夠把queue.enqueue.timeout.ms
配置成0,這樣就kafka底層就不會出現阻塞了。
新版的kafka(我印象中是2.0.0版本開始?)用java重寫了,再也不使用阻塞隊列,因此沒有上面說的問題。