舒適提示:本文基於 Kafka 2.2.1 版本。node
上文 《源碼分析 Kafka 消息發送流程》 已經詳細介紹了 KafkaProducer send 方法的流程,該方法只是將消息追加到 KafKaProducer 的緩存中,並未真正的向 broker 發送消息,本文未來探討 Kafka 的 Sender 線程。api
@(本節目錄)
在 KafkaProducer 中會啓動一個單獨的線程,其名稱爲 「kafka-producer-network-thread | clientID」,其中 clientID 爲生產者的 id 。緩存
咱們先來看一下其各個屬性的含義:服務器
Sender#run網絡
public void run() { log.debug("Starting Kafka producer I/O thread."); while (running) { try { runOnce(); // @1 } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records."); while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) { // @2 try { runOnce(); } catch (Exception e) { log.error("Uncaught error in kafka producer I/O thread: ", e); } } if (forceClose) { // @3 log.debug("Aborting incomplete batches due to forced shutdown"); this.accumulator.abortIncompleteBatches(); } try { this.client.close(); // @4 } catch (Exception e) { log.error("Failed to close network client", e); } log.debug("Shutdown of Kafka producer I/O thread has completed."); }
代碼@1:Sender 線程在運行狀態下主要的業務處理方法,將消息緩存區中的消息向 broker 發送。
代碼@2:若是主動關閉 Sender 線程,若是不是強制關閉,則若是緩存區還有消息待發送,再次調用 runOnce 方法將剩餘的消息發送完畢後再退出。
代碼@3:若是強制關閉 Sender 線程,則拒絕未完成提交的消息。
代碼@4:關閉 Kafka Client 即網絡通訊對象。數據結構
接下來將分別探討其上述方法的實現細節。併發
Sender#runOnceapp
void runOnce() { // 此處省略與事務消息相關的邏輯 long currentTimeMs = time.milliseconds(); long pollTimeout = sendProducerData(currentTimeMs); // @1 client.poll(pollTimeout, currentTimeMs); // @2 }
本文不關注事務消息的實現原理,故省略了該部分的代碼。
代碼@1:調用 sendProducerData 方法發送消息。
代碼@2:調用這個方法的做用?工具
接下來分別對上述兩個方法進行深刻探究。
接下來將詳細分析其實現步驟。
Sender#sendProducerData
Cluster cluster = metadata.fetch(); // get the list of partitions with data ready to send RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
Step1:首先根據當前時間,根據緩存隊列中的數據判斷哪些 topic 的 哪些分區已經達到發送條件。達到可發送的條件將在 2.1.1.1 節詳細分析。
Sender#sendProducerData
if (!result.unknownLeaderTopics.isEmpty()) { for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics); this.metadata.requestUpdate(); }
Step2:若是在待發送的消息未找到其路由信息,則須要首先去 broker 服務器拉取對應的路由信息(分區的 leader 節點信息)。
Sender#sendProducerData
long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); if (!this.client.ready(node, now)) { iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now)); } }
Step3:移除在網絡層面沒有準備好的分區,而且計算在接下來多久的時間間隔內,該分區都將處於未準備狀態。
一、在網絡環節沒有準備好的標準以下:
二、client pollDelayMs 預估分區在接下來多久的時間間隔內都將處於未轉變好狀態(not ready),其標準以下:
Sender#sendProducerData
// create produce requests Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
Step4:根據已準備的分區,從緩存區中抽取待發送的消息批次(ProducerBatch),而且按照 nodeId:List
Sender#sendProducerData
addToInflightBatches(batches); public void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches) { for (List<ProducerBatch> batchList : batches.values()) { addToInflightBatches(batchList); } } private void addToInflightBatches(List<ProducerBatch> batches) { for (ProducerBatch batch : batches) { List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition); if (inflightBatchList == null) { inflightBatchList = new ArrayList<>(); inFlightBatches.put(batch.topicPartition, inflightBatchList); } inflightBatchList.add(batch); } }
Step5:將抽取的 ProducerBatch 加入到 inFlightBatches 數據結構,該屬性的聲明以下:Map<TopicPartition, List< ProducerBatch >> inFlightBatches,即按照 topic-分區 爲鍵,存放已抽取的 ProducerBatch,這個屬性的含義就是存儲待發送的消息批次。能夠根據該數據結構得知在消息發送時以分區爲維度反饋 Sender 線程的「積壓狀況」,max.in.flight.requests.per.connection 就是來控制積壓的最大數量,若是積壓達到這個數值,針對該隊列的消息發送會限流。
Sender#sendProducerData
accumulator.resetNextBatchExpiryTime(); List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now); List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now); expiredBatches.addAll(expiredInflightBatches);
Step6:從 inflightBatches 與 batches 中查找已過時的消息批次(ProducerBatch),判斷是否過時的標準是系統當前時間與 ProducerBatch 建立時間之差是否超過120s,過時時間能夠經過參數 delivery.timeout.ms 設置。
Sender#sendProducerData
if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation"; failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } }
Step7:處理已超時的消息批次,通知該批消息發送失敗,即經過設置 KafkaProducer#send 方法返回的憑證中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之調用其 get 方法不會阻塞。
Sender#sendProducerData
sensors.updateProduceRequestMetrics(batches);
Step8:收集統計指標,本文不打算詳細分析,但後續會專門對 Kafka 的 Metrics 設計進行一個深刻的探討與學習。
Sender#sendProducerData
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now); pollTimeout = Math.max(pollTimeout, 0); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); pollTimeout = 0; }
Step9:設置下一次的發送延時,待補充詳細分析。
Sender#sendProducerData
sendProduceRequests(batches, now); private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) { for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet()) sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); }
Step10:該步驟按照 brokerId 分別構建發送請求,即每個 broker 會將多個 ProducerBatch 一塊兒封裝成一個請求進行發送,同一時間,每個 與 broker 鏈接只會只能發送一個請求,注意,這裏只是構建請求,並最終會經過 NetworkClient#send 方法,將該批數據設置到 NetworkClient 的待發送數據中,此時並無觸發真正的網絡調用。
sendProducerData 方法就介紹到這裏了,既然這裏尚未進行真正的網絡請求,那在何時觸發呢?
咱們繼續回到 runOnce 方法。
public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); // @1 try { this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // @2 } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); // @3 handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); // @4 return responses; }
本文並不會詳細深刻探討其網絡實現部分,Kafka 的 網絡通信後續我會專門詳細的介紹,在這裏先點出其關鍵點。
代碼@1:嘗試更新雲數據。
代碼@2:觸發真正的網絡通信,該方法中會經過收到調用 NIO 中的 Selector#select() 方法,對通道的讀寫就緒事件進行處理,當寫事件就緒後,就會將通道中的消息發送到遠端的 broker。
代碼@3:而後會消息發送,消息接收、斷開鏈接、API版本,超時等結果進行收集。
代碼@4:並依次對結果進行喚醒,此時會將響應結果設置到 KafkaProducer#send 方法返回的憑證中,從而喚醒發送客戶端,完成一次完整的消息發送流程。
Sender 發送線程的流程就介紹到這裏了,接下來首先給出一張流程圖,而後對上述流程中一些關鍵的方法再補充深刻探討一下。
根據上面的源碼分析得出上述流程圖,圖中對重點步驟也詳細標註了其關鍵點。下面咱們對上述流程圖中 Sender 線程依賴的相關類的核心方法進行解讀,以便加深 Sender 線程的理解。
因爲在講解 Sender 發送流程中,大部分都是調用 RecordAccumulator 方法來實現其特定邏輯,故接下來重點對上述涉及到RecordAccumulator 的方法進行一個詳細剖析,增強對 Sender 流程的理解。
該方法主要就是根據緩存區中的消息,判斷哪些分區已經達到發送條件。
RecordAccumulator#ready
public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { // @1 TopicPartition part = entry.getKey(); Deque<ProducerBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); // @2 synchronized (deque) { if (leader == null && !deque.isEmpty()) { // @3 // This is a partition for which leader is not known, but messages are available to send. // Note that entries are currently not removed from batches when deque is empty. unknownLeaderTopics.add(part.topic()); } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) { // @4 ProducerBatch batch = deque.peekFirst(); if (batch != null) { long waitedTimeMs = batch.waitedTimeMs(nowMs); boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed || flushInProgress(); if (sendable && !backingOff) { // @5 readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // Note that this results in a conservative estimate since an un-sendable partition may have // a leader that will later be found to have sendable data. However, this is good enough // since we'll just wake up and then sleep again for the remaining time. nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
代碼@1:對生產者緩存區 ConcurrentHashMap<TopicPartition, Deque< ProducerBatch>> batches 遍歷,從中挑選已準備好的消息批次。
代碼@2:從生產者元數據緩存中嘗試查找分區(TopicPartition) 的 leader 信息,若是不存在,當將該 topic 添加到 unknownLeaderTopics (代碼@3),稍後會發送元數據更新請求去 broker 端查找分區的路由信息。
代碼@4:若是不在 readyNodes 中就須要判斷是否知足條件,isMuted 與順序消息有關,本文暫時不關注,在後面的順序消息部分會重點探討。
代碼@5:這裏就是判斷是否準備好的條件,先一個一個來解讀局部變量的含義。
RecordAccumulator#drain
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { // @1 if (nodes.isEmpty()) return Collections.emptyMap(); Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); for (Node node : nodes) { List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now); // @2 batches.put(node.id(), ready); } return batches; }
代碼@1:咱們首先來介紹該方法的參數:
代碼@2:遍歷全部節點,調用 drainBatchesForOneNode 方法抽取數據,組裝成 Map<Integer /** brokerId */, List< ProducerBatch>> batches。
接下來重點來看一下 drainBatchesForOneNode。
RecordAccumulator#drainBatchesForOneNode
private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) { int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); // @1 List<ProducerBatch> ready = new ArrayList<>(); int start = drainIndex = drainIndex % parts.size(); // @2 do { // @3 PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); this.drainIndex = (this.drainIndex + 1) % parts.size(); if (isMuted(tp, now)) continue; Deque<ProducerBatch> deque = getDeque(tp); // @4 if (deque == null) continue; synchronized (deque) { // invariant: !isMuted(tp,now) && deque != null ProducerBatch first = deque.peekFirst(); // @5 if (first == null) continue; // first != null boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // @6 // Only drain the batch if it is not during backoff period. if (backoff) continue; if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // @7 break; } else { if (shouldStopDrainBatchesForPartition(first, tp)) break; // 這裏省略與事務消息相關的代碼,後續會重點學習。 batch.close(); // @8 size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); } } } while (start != drainIndex); return ready; }
代碼@1:根據 brokerId 獲取該 broker 上的全部主分區。
代碼@2:初始化 start。這裏首先來闡述一下 start 與 drainIndex 。
代碼@3:循環從緩存區抽取對應分區中累積的數據。
代碼@4:根據 topic + 分區號從生產者發送緩存區中獲取已累積的雙端Queue。
代碼@5:從雙端隊列的頭部獲取一個元素。(消息追加時是追加到隊列尾部)。
代碼@6:若是當前批次是重試,而且還未到阻塞時間,則跳過該分區。
代碼@7:若是當前已抽取的消息總大小 加上新的消息已超過 maxRequestSize,則結束抽取。
代碼@8:將當前批次加入到已準備集合中,並關閉該批次,即不在容許向該批次中追加消息。
關於消息發送就介紹到這裏,NetworkClient 的 poll 方法內部會調用 Selector 執行就緒事件的選擇,並將抽取的消息經過網絡發送到 Broker 服務器,關於網絡後面的具體實現,將在後續文章中單獨介紹。
做者介紹:
丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。歡迎加入個人知識星球,構建一個高質量的技術交流社羣。