關於Kafka producer管理TCP鏈接的討論

  在Kafka中,TCP鏈接的管理交由底層的Selector類(org.apache.kafka.common.network)來維護。Selector類定義了不少數據結構,其中最核心的當屬java.nio.channels.Selector實例,故全部的IO事件其實是使用Java的Selector來完成的。本文咱們探討一下producer與Kafka集羣進行交互時TCP鏈接的管理與維護。java

1、什麼時候建立TCP鏈接node

  Producer端在建立KafkaProducer實例時就會建立與broker的TCP鏈接——這個表述嚴格來講不是很準確,應當這麼說:在建立KafkaProducer實例時會建立並啓動Sender線程實例。Sender線程開始運行時首先就會建立與broker的TCP鏈接,以下面這段日誌所示:apache

[2018-12-09 09:35:45,620] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: -2 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,622] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: -2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)bootstrap


[2018-12-09 09:35:45,814] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: -1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 09:35:45,815] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)

[2018-12-09 09:35:45,828] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=) to node localhost:9093 (id: -2 rack: null) (org.apache.kafka.clients.NetworkClient:1068)安全

  在個人樣例代碼中,bootstrap.servers指定了"localhost:9092, localhost:9093"。由上面的日誌能夠看到KafkaProducer實例建立後(此時還沒有開始發送消息)producer會建立與這兩臺broker的TCP鏈接。特別注意我標紅的broker id——這裏的id都是負值,我會在後文詳細說說這裏面的事情。另外,上述日誌中最後一行代表producer選擇了向localhost:9093的broker發送METADATA請求去獲取集羣的元數據信息——實際上producer選擇的是當前負載最少的broker。這裏的負載指的是未處理完的網絡請求數。網絡

  總的來講,TCP鏈接是在Sender線程運行過程當中建立的,因此即便producer不發送任何消息(即顯式調用producer.send),底層的TCP鏈接也是會被建立出來的。數據結構

  在轉到下一個話題以前,我想聊聊針對這種設計的一些本身的理解:如社區文檔所說,KafkaProducer類是線程安全的。我雖然沒有詳盡地去驗證過是否真的thread-safe,但根據瀏覽源碼大體能夠得出這樣的結論:producer主線程和Sender線程共享的可變數據結構大概就只有RecordAccumulator類,所以維護RecordAccumulator類的線程安全也就實現了KafkaProducer的線程安全,而RecordAccumulator類中主要的數據結構是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,並且凡是用到Deque的地方基本上都由Java monitor lock來保護,因此基本上能夠認定RecordAccumulator的線程安全性。this

  我這裏真正想說的是,即便KafkaProducer類是線程安全的,我其實也不太贊同建立KafkaProducer實例時當即啓動Sender線程的作法。Brian Goetz大神著做《Java Concurrency in Practice》中明確給出了這樣作的風險:在對象構造器中啓動線程會形成this指針的逃逸——理論上Sender線程徹底可以看到一個未構造完整的KafkaProducer實例。固然在構造KafkaProducer實例時建立Sender線程實例自己沒有任何問題,但最好不要啓動它。spa

2、建立多少個TCP鏈接.net

咱們仍是結合日誌來看。此次producer開始發送消息,日誌以下:

[2018-12-09 10:06:46,761] DEBUG [Producer clientId=producer-1] 開始發送消息...
[2018-12-09 10:06:46,762] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9092 (id: 0 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 10:06:46,762] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)


[2018-12-09 10:06:46,765] DEBUG [Producer clientId=producer-1] Initialize connection to node localhost:9093 (id: 1 rack: null) for sending metadata request (org.apache.kafka.clients.NetworkClient:1084)
[2018-12-09 10:06:46,766] DEBUG [Producer clientId=producer-1] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)


[2018-12-09 10:06:46,770] DEBUG [Producer clientId=producer-1] Sending metadata request (type=MetadataRequest, topics=test) to node localhost:9092 (id: 0 rack: null) (org.apache.kafka.clients.NetworkClient:1068)

  日誌告訴咱們,producer又建立了與localhost:909二、localhost:9093的TCP鏈接。加上最開始建立的兩個TCP鏈接,目前producer總共建立了4個TCP鏈接,連向localhost:9092和localhost:9093各有兩個。再次注意標紅的broker id——此時id再也不是負值了,或者說此時它們是真正的broker id了(即在server.properties中broker.id指定的值)。這個結論告訴了咱們一個有意思的事實:當前版本下(2.1.0),Kafka producer會爲bootstrap.servers中指定的每一個broker都建立兩個TCP鏈接:第一個TCP鏈接用於首次獲取元數據信息;第二個TCP鏈接用於消息發送以及以後元數據信息的獲取。注意,第一個TCP鏈接中broker id是假的;第二個TCP鏈接中broker id纔是真實的broker id。

  另外,注意上面日誌的最後一行。當producer再次發送METADATA請求時它使用的是新建立的TCP鏈接,而非最開始的那個TCP鏈接。這點很是關鍵!這揭示了一個事實:最開始建立的TCP鏈接將再也不被使用,或者說徹底被廢棄掉了。

3、什麼時候關閉TCP鏈接

  Producer端關閉TCP鏈接的方式有兩種:一種是用戶主動關閉;一種是Kafka自動關閉。咱們先說第一種,這裏的主動關閉其實是廣義的主動關閉,甚至包括用戶調用kill -9主動「殺掉」producer應用。固然最推薦的方式仍是調用producer.close方法來關閉。第二種則是Kafka幫你關閉,這與producer端參數connections.max.idle.ms的值有關。默認狀況下該參數值是9分鐘,即若是在9分鐘內沒有任何請求「流過」該某個TCP鏈接,那麼Kafka會主動幫你把該TCP鏈接關閉。用戶能夠在producer端設置connections.max.idle.ms=-1禁掉這種機制。一旦被設置成-1,TCP鏈接將成爲永久長鏈接。固然這只是軟件層面的「長鏈接」機制,因爲Kafka建立的這些Socket鏈接都開啓了keepalive,所以keepalive探活機制仍是會遵照的。

4、可能的問題?

  顯然,這種機制存在一個問題:假設你的producer指定了connections.max.idle.ms = -1(由於TCP鏈接的關閉與建立也是有開銷的,故不少時候咱們確實想要禁掉自動關閉機制)並且bootstrap.servers指定了集羣全部的broker鏈接信息。咱們假設你的broker數量是N,那麼producer啓動後它會建立2 * N個TCP鏈接,而其中的N個TCP鏈接在producer正常工做以後不再會被使用且不會被關閉。實際上,producer只須要N個TCP鏈接便可與N個broker進行通信。爲了請求元數據而建立的N個TCP鏈接徹底是浪費——我我的傾向於認爲Kafka producer應該重用最開始建立的那N個鏈接,所以我以爲這是一個bug。

  形成重複建立TCP鏈接的根本緣由在於broker id的記錄。就像以前說到的,最開始producer請求元數據信息時它確定不知道broker的id信息,故它作了一個假的id(從-1開始,而後是-2, -3。。。。),同時它將這個id保存起來以判斷是否存在與這個broker的TCP鏈接。Broker端返回元數據信息後producer獲知了真正的broker id,因而它拿着這個broker id去判斷是否存在與該broker的TCP鏈接——天然是不存在,所以它從新建立了一個新的Socket鏈接。這裏的問題就在於咱們不能僅僅依靠broker id來判斷是否存在鏈接。實際上使用host:port對來判斷多是更好的方法。也許社區能夠考慮在後續修正這個問題。

5、總結

  簡單總結一下當前的結論,針對最新版本Kafka(2.1.0)而言,Java producer端管理TCP鏈接的方式是:

1. KafkaProducer實例建立時啓動Sender線程,從而建立與bootstrap.servers中全部broker的TCP鏈接

2. KafkaProducer實例拿到元數據信息以後還會再次建立與bootstrap.servers中全部broker的TCP鏈接

3. 步驟1中建立的TCP鏈接只用於首次獲取元數據信息(實際上也只是會用到其中的一個鏈接,其餘的N - 1個甚至徹底不會被用到)

4. 若是設置producer端connections.max.idle.ms參數大於0,則步驟1中建立的TCP鏈接會被自動關閉;若是設置該參數=-1,那麼步驟1中建立的TCP鏈接將成爲「殭屍」鏈接

5. 當前producer判斷是否存在與某broker的TCP鏈接依靠的是broker id,這是有問題的,依靠<host, port>對多是更好的方式

相關文章
相關標籤/搜索