0.10以前的版本分爲高水平API和低水平API,到了0.10徹底使用java寫了客戶端源碼,以前是用的scala,而且再也不依賴zook和scala。java
新版本的 Consumer 綜合統一了以前「High Level」和「Simple」的接口,支持訂閱固定的TopicPartition,手動Commit的Offset等。安全
1、消息的消費流程:less
- KafkaConsumer:
- poll 方法在有限定的時間內不斷調用 pollOnce 方法,一旦有了可消費的消息則返回,返回前會預先調用 Fetcher 的 fetchedRecords 請求消息,或者超時返回空結果集;
- pollOnce 方法:
- 調用 ConsumerCoordinator 的 ensureCoordinatorReady 方法,檢查獲取當前 Consumer Group 的 coordinator (負責協調的 Broker) ;
- 調用 ConsumerCoordinator 的 ensurePartitionAssignment 的方法,得到指派給當前 Consumer 的 Partition;
- 若是不清楚 Partition 目前消費到的 position,例如剛加入 Group 時,調用 ConsumerCoordinator 的 refreshCommittedOffsetsIfNeeded 的方法,對指派到的 Partition 的 position 進行更新;
- 調用 ConsumerNetworkClient 的 executeDelayedTasks 方法,執行 auto commit、heartbeat 等後臺任務;
- 調用 Fetcher 的 fetchedRecords 方法,返回暫存着的、可直接拿來消費的消息。沒有的話調用 Fetcher 的 sendFetches 方法,發送 FetchRequest 給每一個 Partition 對應的 Leader Broker 節點,得到新的供消費的消息;
- Fetcher:
- 使用 records (List<PartitionRecords<K, V>>) 暫存着從 Broker Leader 拉取來的、能夠直接消費的新消息;
- fetchedRecords 方法中從 records 中取出暫存的消息,轉化成 Map<TopicPartition, List<ConsumerRecord<K, V>>> 的結構,並調用 SubscriptionState 的 position 方法,維護各個 Partition 目前消費到的 posision;
- sendFetches 方法中對於每一個 fetchable 的 Partition,帶上這個 Partition 目前消費到的 position,構形成 FetchRequest,發送到對應的 Leader Broker。對 FetchResponse 的處理中,將每一個 Partition 待消費的消息放到 records 中;
- ConsumerCoordinator:
- 初始化的時候,若是 auto commit = true,則產生一個 AutoCommitTask,該任務將經過 ConsumerNetworkClient 將 SubscriptionState 的 assignment 內的各個Partition 消費的 Offset 提交;
- ensureCoordinatorReady 方法中,選取其中一個低負載的 Broker 諮詢當前 Consumer Group 的 coordinator,獲取後調用 NetworkClient 的 ready 方法與 coordinator 創建鏈接;
- ensurePartitionAssignment 方法中,若是剛加入當前 Group 或者 HeartBeat 收到 rebalance 的 response,調用 ConsumerNetworkClient 的 send 方法,向 coordinator 發送 JoinGroupRequest。在 JoinGroupResponseHandler 中若是被 coordinator 指派爲 PartitionAssignor,則進行 Partition 的分配,分爲 Range 和 RoundRobin 兩種,經過 ConsumerNetworkClient 的 send 方法發送 SyncGroupRequest 並將結果上報給 coordinator。從 coordinator 那收到分配結果後,更新 SubscriptionState 內的 assignment;
- refreshCommittedOffsetsIfNeeded 方法中,經過 ConsumerNetworkClient 的 send 方法發送 OffsetFetchRequest,獲取 Partition 的 position,並更新 SubscriptionState 內的 assignment;
- SubscriptionState:
- 使用 assignment (Map<TopicPartition, TopicPartitionState>) 維護目前被指派消費的 Partition,以及各個 Partition 消費到的 position;
- ConsumerNetworkClient:
- send 方法中將發往指定 Broker 的各種請求,暫存到 unsent ( Map<Node, List<ClientRequest>> )中,不進行實際發送,直到 poll 方法被調用,才真正經過 NetworkClient 發送請求
- executeDelayedTasks 方法中,同步執行 delayedTasks 內到時的任務,auto commit、heartbeat 等;
- NetworkClient
- 使用 ClusterConnectionStates (Map<String, NodeConnectionState>) 維護着每一個 Broker 節點的鏈接狀態;
- ready 方法中判斷是否跟指定的 Broker 節點是 connected 的狀態,否的話會經過 Selector 的 connect 方法初始化跟其的鏈接,創建 SocketChannel 並 register,KafkaChannel 會 attach 在SelectionKey 上;
- poll 方法中調用 Selector 的 poll 方法,處理 Selector 內的 completedSends,completedReceives等,處理 ClientResponse, 遍歷 RecordBatch 內的List<Thunk>,完成回調邏輯的處理;
2、KafkaConsumer 線程不安全的問題
官方建議的兩種解決方法:
1. 每一個消費一個線程,而且給每一個消費者建立一個實例。
優勢:(1)很是容易實現; (2) 很是快,不須要 inter-thread線程協調(3) 很是容易實如今每一個分區順序處理數據。
缺點: (1) 消費者越多 TCP鏈接越多 (2) Multiple consumers means more requests being sent to the server and slightly less batching of data which can cause some drop in I/O throughput; (3) The number of total threads across all processes will be limited by the total number of partitions.fetch
2. Decouple Consumption and Processing: have one consumer fetch message from broker and have a pool of processor threads that actually handle the message processing.
優勢: (1) This option allows independently scaling the number of consumers and processors, avoiding any limitation on partitions.
缺點: (1) Guaranteeing order across the processors requires particular care as the threads will execute independently an earlier chunk of data may actually be processed after a later chunk of data; (2) Manually committing the position becomes harder as it requires that all threads co-ordinate to ensure that processing is complete for that partition. ui
3、總結: Kafka 的 Consumer 經過 Fetcher 從 Broker 獲取能夠消費的消息,SubscriptionState 內的 assignment 維護了目前各個 TopicPartition 的消費 position,周而復始運行的 AutoCommitTask 將目前消費的 offset 提交給 Broker。經過 HeartBeat 得知須要 rebalance 的時候,向 Coordinator 發送 JoinGroupRequest,若被選爲 Partition Assignor,則將進行 Partition 的分配,經過 SyncGroupRequest 上報結果。最終從 Coordinator 那收到的分配更新進 SubscriptionState 內的 assignment。