關於控制Broker端入站鏈接數的討論

Kafka Broker端處理請求採用Reactor模型。每臺Broker上有個相似於Dispatcher的Acceptor線程,還有若干個處理請求的Processor線程(固然真正處理請求邏輯的線程不是Processor,其實是KafkaRequestHandler)。每一個Processor線程啓動後大體作如下這麼幾件事情:java

1. 設置新的入站鏈接bootstrap

2. 處理新的請求響應(所謂的處理也就是放入到響應隊列中)安全

3. 執行Selector.select操做獲取那些準備完畢的IO操做socket

4. 接收新的入站請求spa

5. 執行已發送響應的回調邏輯.net

6. 處理已斷開鏈接線程

每一個Broker啓動以後它建立的Processor線程會不停地執行以上這些動做,循環往復,直至Broker被關閉。debug

咱們重點看看第一步中的邏輯,如下是1.1.1版本的源碼(選擇1.1.1版本不是特地的,其實全部2.3版本以前都是差很少的情形):code

/**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        selector.register(connectionId(channel.socket), channel)
      } catch {
        // We explicitly catch all exceptions and close the socket to avoid a socket leak.
        case e: Throwable =>
          val remoteAddress = channel.socket.getRemoteSocketAddress
          // need to close the channel here to avoid a socket leak.
          close(channel)
          processException(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

注意我標成紅色的語句。基本上Processor線程設置新入站鏈接的方式就是一次性處理完才罷休。代碼中的newConnections是java.util.concurrent.ArrayBlockingQueue實例。Acceptor線程也會訪問newConnections,所以必須是線程安全的。server

這種一次性處理完成才收手的作法在某些狀況下是有風險的,好比當Kafka集羣遭遇到DDOS攻擊時,外部IP會建立海量的入站鏈接所有砸向newConnections中。此時Processor線程運行時會一直嘗試消耗掉這些新鏈接,不然它不會幹其餘事情——好比處理請求等。換句話說,目前Kafka對新入站鏈接的處理優先級要高於已有鏈接。當遭遇鏈接風暴時,Kafka Broker端會優先處理新鏈接,所以可能形成已有鏈接上的請求處理被暫停,並最終致使超時。這樣客戶端獲得請求超時通知後會會進一步地發送新的請求,於是出現雪崩效應。

 

另外Broker端維護每一個鏈接也不是沒有開銷的。鏈接信息自己確定要佔用一些內容資源。若是是啓用了SSL的鏈接,Kafka爲額外爲其維護一個48KB的臨時緩衝區。所以一旦遭遇鏈接風暴,OOM錯誤是很常見的。

 

鑑於這些緣由,社區在2.3版本改進了Broker端處理新鏈接請求的方式。首先阻塞隊列保存新鏈接的個數再也不是沒有限制了,而是被固定爲20,即每一個Processor的新鏈接隊列最大就是20個鏈接——這個寫死在代碼裏面了,目前無法修改。第2、社區引入了新參數max.connections,用於控制Broker端所容許鏈接的最大鏈接數。你能夠調節這個參數來控制一個Broker最多能接收多少個入站鏈接。這個參數能夠在server.properties中被設置,也可使用kafka-configs腳本動態修改。max.connections是全局性的,你也能夠給每一個監聽器設置不一樣的鏈接數上限。好比你的監聽器中同時使用了PLAINTEXT和SSL,那麼你可以使用listener.name.plaintext.max.connections和listener.name.ssl.max.connections來爲這兩個listeners配置各自的鏈接數,命令以下:

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config max.connections=100$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.plaintext.max.connections=80
Completed updating config for broker: 0.

$ bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config listener.name.ssl.max.connections=80
Completed updating config for broker: 0.

 

第三是Kafka Broker的每一個Processor線程會在每輪任務結束以前嘗試去關閉多餘的鏈接。判斷是否須要關閉多餘鏈接的依據有兩點:1. 總的鏈接數超過了max.connections值;2. 你爲Broker設置了多個監聽器,但Kafka會保護Broker內部鏈接使用的那個監聽器。好比你若是設置了多個監聽器:PLAINTEXT://9092, SSL://9093,SASL://9094,而後設置inter.broker.listener.name=SSL,那麼SSL這套監聽器下的鏈接是不會被Processor強行關閉的。

 

最後提一句,若是全部Processor的阻塞隊列都滿了, 那麼前面的Acceptor線程會阻塞住,不會再接收任何入站請求。社區新增長了一個JMX指標來計算Acceptor線程被阻塞的時間比例:kafka.network:type=Acceptor,name=AcceptorBlockedPercent,listener={listenerName}

相關文章
相關標籤/搜索