你真的瞭解LinkedBlockingQueue的put,add和offer的區別嗎

概述

LinkedBlockingQueue的put,add和offer這三個方法功能很類似,都是往隊列尾部添加一個元素。既然都是一樣的功能,爲啥要有有三個方法呢?java

這三個方法的區別在於:node

  • put方法添加元素,若是隊列已滿,會阻塞直到有空間能夠放
  • add方法在添加元素的時候,若超出了度列的長度會直接拋出異常
  • offer方法添加元素,若是隊列已滿,直接返回false

索引這三種不一樣的方法在隊列滿時,插入失敗會有不一樣的表現形式,咱們能夠在不一樣的應用場景中選擇合適的方法。app

用法示例

先看看add方法,less

public class LinkedBlockingQueueTest {

    public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2);
        fruitQueue.add("apple");
        fruitQueue.add("orange");
        fruitQueue.add("berry");
    }

當咱們執行這個方法的時候,會報下面的異常,async

Exception in thread "main" java.lang.IllegalStateException: Queue full
    at java.util.AbstractQueue.add(AbstractQueue.java:98)
    at com.pony.app.LinkedBlockingQueueTest.testAdd(LinkedBlockingQueueTest.java:23)
    at com.pony.app.LinkedBlockingQueueTest.main(LinkedBlockingQueueTest.java:16)

而後再來看看put用法,ide

public class LinkedBlockingQueueTest implements Runnable {

    static LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2);


    public static void main(String[] args) throws InterruptedException {
        new Thread(new LinkedBlockingQueueTest()).start();

        fruitQueue.put("apple");
        fruitQueue.put("orange");
        fruitQueue.put("berry");

        System.out.println(fruitQueue.toString());

    }

    @Override
    public void run() {

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        fruitQueue.poll();
    }
}

運行這段代碼,你會發現首先程序會卡住(隊列阻塞)3秒左右,而後打印隊列的orangeberry兩個元素。源碼分析

由於我在程序的啓動的時候順便啓動了一個線程,這個線程會在3秒後從隊列頭部移除一個元素。ui

最後看看offer的用法,this

public static void main(String[] args) throws InterruptedException {
        LinkedBlockingQueue<String> fruitQueue = new LinkedBlockingQueue<>(2);
        
        System.out.println(fruitQueue.offer("apple"));
        System.out.println(fruitQueue.offer("orange"));
        System.out.println(fruitQueue.offer("berry"));

    }

運行結果:線程

true
true
false

源碼分析

先來看看add方法的實現,

public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

因此add實際上是包裝了一下offer,沒什麼能夠說的。

而後來看看putoffer的實現,兩個放在一塊兒說。

put方法源碼,

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

offer方法源碼,

public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

咱們重點關注他們的區別,offer方法在插入的時候會等一個超時時間timeout,若是時間到了隊列仍是滿的(count.get() == capacity),就會返回false。

而put方法是無限期等待,

while (count.get() == capacity) {
                notFull.await();
            }

因此咱們在應用層使用的時候,若是隊列滿再插入會阻塞。

實際場景應用

在早期版本的kafka中,生產者端發送消息使用了阻塞隊列,代碼以下:

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            if (config.queueEnqueueTimeoutMs < 0) {
              queue.put(message)
              true
            } else {
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case _: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }

能夠看到,config.queueEnqueueTimeoutMs是0的時候,使用的是offer方法,小於0的時候則使用put方法。

咱們在使用kafka的時候,能夠經過queue.enqueue.timeout.ms來決定使用哪一種方式。好比某些應用場景下,好比監控,物聯網等場景,容許丟失一些消息,能夠把queue.enqueue.timeout.ms配置成0,這樣就kafka底層就不會出現阻塞了。

新版的kafka(我印象中是2.0.0版本開始?)用java重寫了,再也不使用阻塞隊列,因此沒有上面說的問題。

相關文章
相關標籤/搜索