Kafka 異步消息也會阻塞?記一次 Dubbo 頻繁超時排查過程

線上某服務 A 調用服務 B 接口完成一次交易,一次晚上的生產變動以後,系統監控發現服務 B 接口頻繁超時,後續甚至返回線程池耗盡錯誤 Thread pool is EXHAUSTED。由於服務 B 依賴外部接口,剛開始誤覺得外部接口延時致使,因此臨時增長服務 B dubbo 線程池線程數量。配置變動以後,重啓服務,服務恢復正常。一段時間以後,服務 B 再次返回線程池耗盡錯誤。此次深刻排查問題以後,才發現 Kafka 異步發送消息阻塞了 dubbo 線程,從而致使調用超時。html

1、問題分析

Dubbo 2.6.5,Kafak maven 0.8.0-beta1java

服務 A 調用服務 B,收到以下錯誤:apache

2019-08-30 09:14:52,311 WARN method [%f [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-xxxx, Pool Size: 1000 (active: 1000, core: 1000, max: 1000, largest: 1000), Task: 6491 (completed: 5491), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://xxxx!, dubbo version: 2.6.0, current host: 127.0.0.1
複製代碼

能夠看到當前 dubbo 線程池已經滿載運行,不能再接受新的調用。正常狀況下 dubbo 線程能夠很快完成任務,而後歸還到線程池中。因爲線程執行的任務發生阻塞,消費者端調用超時。而服務提供者端因爲已有線程被阻塞,線程池必須不斷建立新線程處理任務,直到線程數量達到最大數量,系統返回 Thread pool is EXHAUSTED網絡

線程任務長時間被阻塞可能緣由有:app

  • 頻繁的 fullgc,致使系統暫停。
  • 調用某些阻塞 API,如 socket 鏈接未設置超時時間致使阻塞。
  • 系統內部死鎖

經過分析系統堆棧 dump 狀況,果真發現全部 dubbo 線程都處於 WATTING 狀態。異步

下圖爲應用堆棧 dump 日誌:socket

堆棧日誌

從堆棧日誌能夠看到 dubbo 線程最後阻塞在 LinkedBlockingQueue#put ,而該阻塞發生在 Kafka 發送消息方法內。async

這裏服務 B 須要使用 Kafka 發送監控消息,爲了消息發送不影響主業務,這裏使用 Kafka 異步發送消息。因爲 Kafka 服務端最近更換了對外的端口,而服務 B Kafka 配置未及時變動。最後服務 B 修改配置,服務從新啓動,該問題得以解決。maven

2、Kafka 異步模式

下面分析 Kafka 異步發送消息阻塞的實際緣由。ide

0.8.0 Kafka 默認使用同步模式發送消息,異步發送消息須要設置producer.type=async屬性。同步模式須要等待 Kafka 將消息發送到消息隊列,這個過程固然會阻塞主線程。而異步模式最大的優勢在於無須要等待 Kafka 這個發送過程。

本來認爲這裏的異步是使用子線程去運行任務,可是 Kafka 異步模式並不是這樣。查看 Kafka 官方文檔producer,能夠看到對異步模式描述。

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer has an asynchronous mode that accumulates data in memory and sends out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 100 messages or 5 seconds). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. Since this buffering happens in the client it obviously reduces the durability as any data buffered in memory and not yet sent will be lost in the event of a producer crash.

從上咱們能夠看到,Kafka 異步模式將會把多條消息打包一塊批量發送到服務端。這種模式將會先把消息放到內存隊列中,直到消息到達必定數量(默認爲 200)或者等待時間超限(默認爲 5000ms)。

這麼作最大好處在於提升消息發送的吞吐量,減小網絡 I/O。固然這麼作也存在明顯劣勢,若是生產者宕機,在內存中還未發送消息可能就會丟失。

下面從 kafka 源碼分析這個阻塞過程。

3、Kafka 源碼解析

Kafka 消息發送端採用以下配置:

Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
	// 選擇異步發送
        props.put("producer.type", "async");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("queue.buffering.max.messages","1");
        props.put("batch.num.messages","1");
        Producer<Integer, String> producer= new Producer(new ProducerConfig(props));
        producer.send(new KeyedMessage("test", "hello world"));
複製代碼

這裏設置 producer.type=async,從而使 Kafka 異步發送消息。

send 方法源碼以下

ps: 這個版本 Kafka 源碼採用 Scala 編寫,不過源碼仍是比較簡單,比較容易閱讀。

def send(messages: KeyedMessage[K,V]*) {
    if (hasShutdown.get)
      throw new ProducerClosedException recordStats(messages) sync match {
      case true => eventHandler.handle(messages)
	// 因爲 producer.type=async 異步發送
      case false => asyncSend(messages)
    }
  }
複製代碼

因爲咱們上面設置 producer.type=async,這裏將會使用 asyncSend 異步發送模式。

asyncSend 源碼以下

private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            config.queueEnqueueTimeoutMs < 0 match {
	
            case true =>
              queue.put(message)
              true
            case _ =>
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case e: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }
複製代碼

asyncSend 將會把消息加入到 LinkedBlockingQueue 阻塞隊列中。這裏根據 config.queueEnqueueTimeoutMs參數使用不一樣方法。

config.queueEnqueueTimeoutMs=0,將會調用 LinkedBlockingQueue#offer,若是該隊列未滿,將會把元素插入隊列隊尾。若是隊列未滿,直接返回 false。因此若是此時隊列已滿,消息再也不會加入隊列中,而後 asyncSend 將會拋出 QueueFullException 異常。

config.queueEnqueueTimeoutMs < 0,將會調用 LinkedBlockingQueue#put 加入元素,若是該隊列已滿,該方法將會一直被阻塞直到隊列存在可用空間。

config.queueEnqueueTimeoutMs > 0,將會調用 LinkedBlockingQueue#offer,這裏與上面不一樣之處在於設置超時時間,若是隊列已滿將會阻塞知道超時。

config.queueEnqueueTimeoutMs參數經過 queue.enqueue.timeout.ms 配置生效,默認爲 -1。默認狀況下 LinkedBlockingQueue 最大數量爲 10000,能夠經過設置 queue.buffering.max.messages 改變隊列最大值。

消息放到隊列中後,Kafka 將會使用一個異步線程不斷從隊列中獲取消息,批量發送消息。

異步處理消息代碼以下

private def processEvents() {
    var lastSend = SystemTime.milliseconds
    var events = new ArrayBuffer[KeyedMessage[K,V]]
    var full: Boolean = false

    // drain the queue until you get a shutdown command
    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
      currentQueueItem =>
        val elapsed = (SystemTime.milliseconds - lastSend)
        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
        // returns a null object
        val expired = currentQueueItem == null if(currentQueueItem != null) {
          trace("Dequeued item for topic %s, partition key: %s, data: %s"
              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
          events += currentQueueItem
        }

        // check if the batch size is reached
        full = events.size >= batchSize if(full || expired) {
          if(expired)
            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
          if(full)
            debug("Batch full. Sending..")
          // if either queue time has reached or batch size has reached, dispatch to event handler
          tryToHandle(events)
          lastSend = SystemTime.milliseconds
          events = new ArrayBuffer[KeyedMessage[K,V]]
        }
    }
    // send the last batch of events
    tryToHandle(events)
    if(queue.size > 0)
      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
        .format(queue.size))
  }

複製代碼

這裏異步線程將會不斷從隊列中獲取任務,一旦條件知足,就會批量發送任務。該條件爲:

  1. 批量消息數量達到 200,能夠設置 batch.num.messages 參數改變配置。
  2. 等待時間到達最大的超時時間,默認爲 5000ms,能夠設置 queue.buffering.max.ms 改變改配置。

4、問題解決辦法

上面問題雖然經過更換 Kafka 正確地址解決,可是爲了預防下次該問題再發生,能夠採用以下方案:

  1. 改變 config.queueEnqueueTimeoutMs默認配置,像這種系統監控日誌容許丟失,因此能夠設置 config.queueEnqueueTimeoutMs=0
  2. 升級 Kafka 版本,最新版本 Kafka 使用 Java 重寫發送端邏輯,再也不使用阻塞隊列存儲消息。

本文首發於:studyidea.cn/kafka…

歡迎關注個人公衆號:程序通事,得到平常乾貨推送。若是您對個人專題內容感興趣,也能夠關注個人博客:studyidea.cn

其餘平臺.png
相關文章
相關標籤/搜索