本篇是《關於Kafka producer管理TCP鏈接的討論》的續篇,主要討論Kafka java consumer是如何管理TCP鏈接。實際上,這兩篇大部分的內容是相同的,即consumer也是把TCP鏈接的管理交由底層的Selector類(org.apache.kafka.common.network)來維護。咱們依然以「什麼時候建立/建立多少/什麼時候關閉/潛在問題/總結」的順序來討論。和上一篇同樣,本文將無差異地混用名詞TCP和Socket。html
1、什麼時候建立TCP鏈接java
首先明確的是,在構建KafkaConsumer實例時是不會建立任何TCP鏈接的;另外在調用諸如subscribe或assign的時候也不會建立任何TCP鏈接。那麼TCP鏈接是在何時建立的呢?嚴格來講有幾個可能的時間點。從粗粒度層面來講,咱們能夠安全地認爲Socket鏈接是在調用consumer.poll()建立的;從細粒度層面來講,TCP鏈接建立的時機有3個:1. 請求METADATA時;2. 進行組協調時;3. 發送數據時。node
2、建立多少個TCP鏈接apache
對於每臺broker而言,kafka consumer實例一般會建立3個TCP鏈接,第一個TCP鏈接是consumer請求集羣元數據時建立的,以後consumer會使用這個Socket繼續請求元數據以及尋找group對應的coordinator,以下列日誌所示:緩存
[2019-01-01 17:38:22,301] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:914)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending METADATA {topics=[bar,foo],allow_auto_topic_creation=true} with correlation id 2 to node -1 (org.apache.kafka.clients.NetworkClient:492)
...
[2019-01-01 17:38:22,360] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {coordinator_key=test,coordinator_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:492)安全
至於這裏爲何是node -1是由於首次請求元數據時尚不肯定broker.id,因此只能先用-1替代。session
第二個TCP鏈接供consumer執行組協調操做使用,這裏的組協調操做包括:JOIN_GROUP(加入組)、SYNC_GROUP(等待組分配方案)、HEARTBEAT(心跳請求)、OFFSET_FETCH(獲取位移)、OFFSET_COMMIT(提交位移)以及其餘請求(好比LEAVE_GROUP,但本例中沒有演示組成員退出的情形,故日誌中沒有出現這個請求類型),以下列日誌所示:測試
[2019-01-01 17:38:22,379] TRACE [Consumer clientId=consumer-1, groupId=test] Sending JOIN_GROUP {group_id=test,session_timeout=10000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} with correlation id 3 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,382] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for JOIN_GROUP with correlation id 3, received {throttle_time_ms=0,error_code=0,generation_id=9,group_protocol=range,leader_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,members=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=20 cap=20]}]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,386] TRACE [Consumer clientId=consumer-1, groupId=test] Sending SYNC_GROUP {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,group_assignment=[{member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]}]} with correlation id 5 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:22,388] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for SYNC_GROUP with correlation id 5, received {throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=36 cap=36]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:22,396] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_FETCH {group_id=test,topics=[{topic=bar,partitions=[{partition=0}]},{topic=foo,partitions=[{partition=0}]}]} with correlation id 6 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-03 17:38:22,397] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_FETCH with correlation id 6, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]},{topic=foo,partition_responses=[{partition=0,offset=0,leader_epoch=-1,metadata=,error_code=0}]}],error_code=0} (org.apache.kafka.clients.NetworkClient:810)
...
[2019-01-01 17:38:23,401] TRACE [Consumer clientId=consumer-1, groupId=test] Sending OFFSET_COMMIT {group_id=test,generation_id=9,member_id=consumer-1-b595483a-9a12-4de3-9c26-a04110ed1bfa,topics=[{topic=bar,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]},{topic=foo,partitions=[{partition=0,offset=0,leader_epoch=-1,metadata=}]}]} with correlation id 10 to node 2147483647 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,403] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 10, received {throttle_time_ms=0,responses=[{topic=bar,partition_responses=[{partition=0,error_code=0}]},{topic=foo,partition_responses=[{partition=0,error_code=0}]}]} (org.apache.kafka.clients.NetworkClient:810)url
上面標紅的節點ID看上去有些奇怪,實際上它是由Integer.MAX_VALUE - coordinator的broker.id計算得來的,由於個人測試環境中只有一臺broker且該id是0,因此這個Socket鏈接的節點ID就是Integer.MAX_VALUE,即2147483647。針對這個node ID的計算方式,Kafka代碼是故意爲之的,目的就是要讓組協調請求和真正的數據獲取請求使用不一樣的Socket鏈接。
第三個Socket鏈接就很是好理解了,就是用於發送FETCH請求的。當consumer代碼使用第一個Socket鏈接獲取到集羣元數據以後,每一個broker的真實ID已經緩存在consumer本地的內存中,所以此時代碼會使用真實的ID建立第三個Socket鏈接並用於消息獲取,以下列日誌所示:spa
[2019-01-01 17:38:23,424] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=2,topics=[],forgotten_topics_data=[]} with correlation id 11 to node 0 (org.apache.kafka.clients.NetworkClient:492)
[2019-01-01 17:38:23,927] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node 0 for FETCH with correlation id 11, received {throttle_time_ms=0,error_code=0,session_id=104064890,responses=[]} (org.apache.kafka.clients.NetworkClient:810)
[2019-01-01 17:38:23,928] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=104064890,session_epoch=3,topics=[],forgotten_topics_data=[]} with correlation id 12 to node 0 (org.apache.kafka.clients.NetworkClient:492)
...
上面標紅的節點0是真實的broker.id,可見consumer是使用這個Socket進行消息獲取操做的。值得一提的是,當這個Socket鏈接成功創建以後,第一個Socket鏈接就會被廢棄掉,以後全部的元數據請求都經過第三個Socket發送。
3、什麼時候關閉TCP鏈接
和Producer原理相同,consumer關閉Socket也分爲主動關閉和Kafka自動關閉。主動關閉依然是由用戶發起,顯式調用consumer.close()以及相似方法亦或是kill -9;而Kafka自動關閉一樣由connections.max.idle.ms參數值控制。和producer有些不一樣的是,若是用戶寫consumer程序時使用了循環的方式來poll消息,那麼上面提到的全部請求都會不斷地發送到broker,故這些Socket鏈接上老是能保證有請求在發送,所以實現了「長鏈接」的效果。
4、可能的問題?
Consumer端和producer端的問題是同樣的,即第一個Socket鏈接僅僅是爲了首次(最多也就是幾回)獲取元數據之用,後面就會被廢棄掉。根本的緣由在於它使用了「假」的broker id去註冊,當 後面consumer獲取了真實的broker id以後它沒法區分哪一個broker id對應這個假ID,因此只能從新建立另外的Socket鏈接。
5、總結
最後總結一下當前的結論,針對最新版本Kafka(2.1.0)而言,Java consumer端管理TCP鏈接的方式是:
1. KafkaConsumer實例建立時不會建立任何Socket鏈接,實例建立以後首次請求元數據時會建立第一個Socket鏈接
2. KafkaConsumer實例拿到元數據信息以後隨機尋找其中一個broker去發現對應的coordinator,而後向coordinator所在broker建立第二個Socket鏈接。以後全部的組協調請求處理都經由該Socket
3. 步驟1中建立的TCP鏈接只用於首次獲取元數據信息,後面會被廢棄掉
4. 若是設置consumer端connections.max.idle.ms參數大於0,則步驟1中建立的TCP鏈接會被自動關閉;若是設置該參數=-1,那麼步驟1中建立的TCP鏈接將成爲「殭屍」鏈接
5. 當前consumer判斷是否存在與某broker的TCP鏈接依靠的是broker id,這是有問題的,依靠<host, port>對多是更好的方式