Kafka在0.8.1版本的時候重寫了Producer。在0.9版本中又重寫了Consumer,純Java,沒有了對Scala和ZK的依賴。網絡
1、消息的發送流程:app
- KafkaProducer:
- 等待 topic meteData 數據的更新,序列化 key,value;
- 根據 topic 的 partition 個數和 key 的值,計算該條消息所屬的 partition,將消息 append 給 RecordAccumulator;
- RecordAccumulator:
- 使用Map類型的 batches ( ConcurrentMap<TopicPartition, Deque<RecordBatch>> ) 維護髮向全部 topic 的消息數據;
- append 方法內將發送的這條消息 tryAppend 進對應 Deque 最後一個 RecordBatch 中。若是空間不夠,該 RecordBatch 就會 flip ByteBuffer,進入只讀狀態。空間不夠或者失敗則會在 Deque 末端嘗試新起一個 RecordBatch;
- Sender:
- KafkaProducer 初始化的時候會啓一個 KafkaThread 線程,運行 Runnable 的 Sender 對象,不停地發送 RecordAccumulator 內累積的消息;
- 調用 RecordAccumulator 的 ready 方法收集到這次發送任務的目的地,即 Broker Leader 的列表,消息都是發送給所屬 Partition 目前是 Leader 的那個 Broker 節點的;
- 調用 NetworkClient 的 ready 方法,判斷收集到的每一個 Leader 節點是不是 connected 狀態,否的話會被移除;
- 調用 RecordAccumulator 的 drain 方法,得到發送給每一個 Broker 節點的 RecordBatch 列表。將發往每一個 Broker 節點的 RecordBatch 數據,封裝成一個 ClientRequest,主要的消息內容由 RequestSend 內的 Struct 結構表示。Struct 內部已將消息按 topic 分開,並是按 kafka 消息的 schema 生成,具備以下的嵌套結構:{「acks」:1,」topic_data」:[{"topic": "xxx", "data": [{"partition": 1, "record_set": ByteBuffer}]}]}。ClientRequest 內還包括髮完消息後的 CallBack 處理邏輯;
- 遍歷每一個 ClientRequest,調用 NetworkClient 的 send 方法,將 RequestSend 放進 Selector.channels 內對應的 KafkaChannel 中;
- 調用 NetworkClient 的 poll 方法,將 RequestSend 真正的發送給 Broker;
- RecordAccumulator:
- ready 方法中檢查每一個 Deque 的第一個 RecordBatch 是不是 ready 的狀態,並把 RecordBatch 對應的 Broker Leader 節點收集起來好向它們發送消息。判斷 RecordBatch 是否 ready 涉及到這個 bath 是否滿了、距離上一次檢查是否夠久等。例如若是 RecordBatch 所在的 Deque 長度大於1,證實這個 RecordBatch 曾今被 append 的時候發現已經滿了,如今是隻讀待發狀態,是 ready 的。須要等待的時長受是否處在 backoff 時期,是否超過 linger 時長等影響;
- drain 方法中遍歷收集到的、 connected 狀態的 Broker Leader 節點,根據每一個節點下歸屬的 Partition 對應從 batches 中的 Deque 中取出第一個 RecordBatch,拼裝成 Map<Integer, List<RecordBatch>> 的結構,key 是 Broker 節點的 id, value 是發給該節點的 RecordBatch 列表;
- NetworkClient:
- 使用 ClusterConnectionStates (Map<String, NodeConnectionState>) 維護着每一個 Broker 節點的鏈接狀態;
- ready 方法中判斷是否跟指定的 Broker 節點是 connected 的狀態,否的話會經過 Selector 的 connect 方法初始化跟其的鏈接,創建 SocketChannel 並 register,KafkaChannel 會 attach 在 SelectionKey 上 ;
- poll 方法中調用 Selector 的 poll 方法,處理 Selector 內的 completedSends,completedReceives等,處理 ClientResponse, 遍歷 RecordBatch 內的List<Thunk>,完成回調邏輯的處理;
- Selector:
- 使用 channels (Map<String, KafkaChannel>) 維護着與每一個 Broker 節點的 Channel;
- 使用 completedSends (List<Send>) 維護着已經發送完畢的 RequestSend
- 使用 completedReceives (List<NetworkReceive>) 維護着來自 Broker 的 response;
- poll 方法中遍歷 SelectionKey, 若是 KafkaChannel ready + SelectionKeywritable,那麼就將 KafkaChannel 中的 RequestSend 發送,並維護更新 completedSends;若是 KafkaChannel ready + SelectionKey readable,那麼就接受來自 Broker 的 NetworkReceive,並維護更新 completedReceives;
2、延遲與吞吐量的問題:
Case1: Producer將消息一條接一條發送到 Broker,假設發送延遲是 2ms,那麼 1s 能夠發送 500 條消息;ui
Case2: Producer將消息延遲 8ms 發送,假設 8ms 內收集到 20 條消息,那麼 1s 能夠發送 2000 條消息;this
兩個重要的參數:
batch.size: This is an upper limit of how many messages Kafka Producer will attempt to batch before sending, specified in bytes (default is 16K bytes). Kafka may send batches before this limit is, but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.線程
linger.ms: How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch. (default is 0). Sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency.對象
3、總結:
Kafka 的 Producer 經過把將要發送的消息先放在 RecordAccumulator 的 batches 內累積一段時間,而後進行小批量提交給 Broker 的方式,減小網絡往返的開銷,犧牲一點latency 換取 throughput。ip