開篇一張圖,讀者更幸福,很少說上架構圖。
這個架構圖咱們在前面一篇文章《kafka生產者的蓄水池機制》裏面介紹過,上一篇咱們是介紹了這個圖裏面的消息收集過程(咱們成爲「蓄水池」機制),這裏咱們就介紹它的另一部分,消息的發送機制。
node
全部的消息發送,都是從Sender線程開始,它是一個守護線程,因此咱們首先就須要來看一下Sender的run方法,最外層的run方式是一個主循環不斷調用具體邏輯運行方法run,咱們看下它的具體邏輯處理run方法:api
void run(long now) { //生產者事務管理相關處理,本章節不作具體分析,後面專門章節再作分析,你們先了解一下 if (transactionManager != null) { try { if (transactionManager.shouldResetProducerStateAfterResolvingSequences()) // Check if the previous run expired batches which requires a reset of the producer state. transactionManager.resetProducerId(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) { // as long as there are outstanding transactional requests, we simply wait for them to return client.poll(retryBackoffMs, now); return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, now); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } //實際的數據發送請求,並處理服務端響應 long pollTimeout = sendProducerData(now); client.poll(pollTimeout, now); }
接下來咱們從兩個層面來看,一個是消息發送,一個是消息返回響應處理。緩存
消息的發送markdown
先看下sendProducerData的具體邏輯:網絡
private long sendProducerData(long now) { //獲取集羣信息 Cluster cluster = metadata.fetch(); // 獲取那些能夠發送消息的分區列表信息 RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // 若是這些分區沒有對應的leader,就須要強制對metadata信息進行更新 if (!result.unknownLeaderTopics.isEmpty()) { // 沒有leader的場景例如leader選舉,或者topic已失效,這些都須要將topic從新加入,發送到服務端請求更新,由於如今還須要往這些topic發送消息 for (String topic : result.unknownLeaderTopics) this.metadata.add(topic); this.metadata.requestUpdate(); } // 遍歷全部獲取到的網絡節點,基於網絡鏈接狀態來檢測這些節點是否可用,若是不可用則剔除 Iterator<Node> iter = result.readyNodes.iterator(); long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); //節點鏈接狀態檢查,若是容許鏈接,則從新建立鏈接 if (!this.client.ready(node, now)) { //未準備好的節點刪除 iter.remove(); notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); } } // 獲取全部待發送的批量消息以及其對應的leader節點集合 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); //若是須要保證消息的強順序性,則緩存對應 topic 分區對象,防止同一時間往同一個 topic 分區發送多條處於未完成狀態的消息 if (guaranteeMessageOrder) { // 將每一個batch的分區對象信息加入到mute集合,採起Set實現,重複的topicpartition信息不會被加入 for (List<ProducerBatch> batchList : batches.values()) { for (ProducerBatch batch : batchList) this.accumulator.mutePartition(batch.topicPartition); } } // 獲取本地過時的消息,返回 TimeoutException,並釋放空間 List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now); // 過時的batch消息處理 if (!expiredBatches.isEmpty()) log.trace("Expired {} batches in accumulator", expiredBatches.size()); for (ProducerBatch expiredBatch : expiredBatches) { failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false); if (transactionManager != null && expiredBatch.inRetry()) { // This ensures that no new batches are drained until the current in flight batches are fully resolved. transactionManager.markSequenceUnresolved(expiredBatch.topicPartition); } } //更新度量信息 sensors.updateProduceRequestMetrics(batches); // 設置pollTimeout,若是存在待發送的消息,則設置 pollTimeout 等於 0,這樣能夠當即發送請求,從而可以縮短剩餘消息的緩存時間,避免堆積 long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (!result.readyNodes.isEmpty()) { log.trace("Nodes with data ready to send: {}", result.readyNodes); pollTimeout = 0; } //調用NetWorkClient將消息發送到服務端 sendProduceRequests(batches, now); return pollTimeout; }
概括起來sendProducerData的核心流程以下:架構
1.經過accumulator.ready方法獲取可發送的分區列表信息;併發
2.調用client.ready對獲取到的全部網絡節點進行連通性檢測;負載均衡
3.經過.accumulator.drain獲取全部待發送的批量消息以及其對應的leader節點集合;ide
4.在須要保障分區消息的強順序性的場景下調用accumulator.mutePartition將分區信息添加到mute集合;函數
5.調用sendProduceRequests發送生產消息請求。
下面逐個流程講解:
經過accumulator.ready方法獲取可發送的分區列表信息:
public ReadyCheckResult ready(Cluster cluster, long nowMs) { //可接受消息的節點集合 Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; //記錄未找到leader副本的Topic信息集合 Set<String> unknownLeaderTopics = new HashSet<>(); // 是否有線程在等待 BufferPool 分配空間 boolean exhausted = this.free.queued() > 0; //遍歷待發送的batch裏面的每一個分區信息,對其leader執行斷定 for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) { TopicPartition part = entry.getKey(); Deque<ProducerBatch> deque = entry.getValue(); // 獲取當前 topic 分區 leader 副本所在的節點 Node leader = cluster.leaderFor(part); synchronized (deque) { if (leader == null && !deque.isEmpty()) { // 該分區下的leader未知,可是存在往該分區發送的消息,須要記錄下,在後面的流程當發現有未知leader須要強制向服務端發送metadata的信息更新請求 unknownLeaderTopics.add(part.topic()); } //全部可發送的節點須要不能在mute集合裏面,保障消息有序性,當mute裏面還有消息未發送完成不能繼續追加發送 else if (!readyNodes.contains(leader) && !muted.contains(part)) { ProducerBatch batch = deque.peekFirst(); if (batch != null) { long waitedTimeMs = batch.waitedTimeMs(nowMs); //是否處於重試操做判斷 boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; boolean full = deque.size() > 1 || batch.isFull(); boolean expired = waitedTimeMs >= timeToWaitMs; //標記當前leader是否可發送 boolean sendable = full // 1. 隊列中有多個 RecordBatch,或第一個 RecordBatch 已滿 || expired // 2. 當前等待重試的時間過長 || exhausted // 3. 有其餘線程在等待 BufferPoll 分配空間,即本地消息緩存已滿 || closed // 4. producer 已經關閉 || flushInProgress();// 5. 有線程正在等待 flush 操做完成 if (sendable && !backingOff) { //知足可發送狀態,而且沒有處於重試操做的狀態下,將當前leader加入可發送節點 readyNodes.add(leader); } else { long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); // 更新下次執行 ready 斷定的時間間隔 nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); } } } } } //返回檢查結果 return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); }
調用client.ready對獲取到的全部網絡節點進行連通性檢測:
public boolean ready(Node node, long now) { if (node.isEmpty()) throw new IllegalArgumentException("Cannot connect to empty node " + node); //connectionStates已就緒,直接返回可鏈接 if (isReady(node, now)) return true; //鏈接狀態顯示可鏈接 if (connectionStates.canConnect(node.idString(), now)) // 則調用selector初始化該鏈接 initiateConnect(node, now); return false; }
經過.accumulator.drain獲取全部待發送的批量消息以及其對應的leader節點集合:
public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { if (nodes.isEmpty()) return Collections.emptyMap(); //返回的nodeid對應的發送消息batch信息 Map<Integer, List<ProducerBatch>> batches = new HashMap<>(); //遍歷每個可連通的node for (Node node : nodes) { int size = 0; List<PartitionInfo> parts = cluster.partitionsForNode(node.id()); List<ProducerBatch> ready = new ArrayList<>(); /* drainIndex 用於記錄上次發送中止的位置,本次繼續從當前位置開始發送, * 若是每次都是從 0 位置開始,可能會致使排在後面的分區餓死,這是一個簡單的負載均衡策略 */ int start = drainIndex = drainIndex % parts.size(); do { PartitionInfo part = parts.get(drainIndex); TopicPartition tp = new TopicPartition(part.topic(), part.partition()); //若是是須要保障消息的強順序性,則不能將消息添加進目標分區,不然會致使消息亂序 if (!muted.contains(tp)) { // 獲取當前分區對應的 RecordBatch 集合 Deque<ProducerBatch> deque = getDeque(tp); if (deque != null) { synchronized (deque) { ProducerBatch first = deque.peekFirst(); if (first != null) { //當前第一個batch是否處於重試狀態或者已重試過 boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs; // 沒有重試過,或者重試已超時 if (!backoff) { if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) { // 單次消息數據量已達到上限,結束循環,通常對應一個請求的大小,防止請求消息過大 break; } //處理處於重試狀態下的消息 else { //省略在重試狀態下的事務處理流程 //遍歷每一個節點,節點的起始位置也以一個輪訓方式來遍歷,而且每一個隊列裏面的batch也都是隻取第一個,每一個隊列輪訓着取,全部這些操做都是爲了對消息發送的均衡處理,保障消息公平發送 ProducerBatch batch = deque.pollFirst(); //close表明着消息batch通道被關閉,只能讀取,沒法寫入 batch.close(); size += batch.records().sizeInBytes(); ready.add(batch); batch.drained(now); } } } } } } //更新本次drainIndex this.drainIndex = (this.drainIndex + 1) % parts.size(); } while (start != drainIndex); batches.put(node.id(), ready); } return batches; }
調用accumulator.mutePartition將分區信息添加到mute集合,這個過程比較簡單就是將遍歷待發送的batch消息,若是設置了保障消息時序強一致性,那就將這個分區信息保存在mute集合之中,每次發送消息以前都會去檢查這個隊列是否包含已有的分區,若是有則本次不作發送,每發送完成以後都會調用mute集合去除所在的分區信息,以即可以放入下一個消息進行發送。
調用sendProduceRequests發送生產消息請求:
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { if (batches.isEmpty()) return; Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); // 遍歷全部batch消息,找到最小的版本號信息 byte minUsedMagic = apiVersions.maxUsableProduceMagic(); for (ProducerBatch batch : batches) { if (batch.magic() < minUsedMagic) minUsedMagic = batch.magic(); } // 遍歷 RecordBatch 集合,整理成 produceRecordsByPartition 和 recordsByPartition for (ProducerBatch batch : batches) { TopicPartition tp = batch.topicPartition; MemoryRecords records = batch.records(); // 進行消息的向下兼容轉換操做,例如分區消息的遷移,從一個高版本遷移到低版本,就須要額外從新構造MemoryRecords if (!records.hasMatchingMagic(minUsedMagic)) records = batch.records().downConvert(minUsedMagic, 0, time).records(); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); } String transactionalId = null; if (transactionManager != null && transactionManager.isTransactional()) { transactionalId = transactionManager.transactionalId(); } // 建立 ProduceRequest 請求構造器,produceRecordsByPartition用於構造請求器 ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); // 建立回調對象,用於處理響應,recordsByPartition用於響應回調處理 RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; String nodeId = Integer.toString(destination); // 建立 ClientRequest 請求對象,若是 acks 不等於 0 則表示會等待服務端的響應 ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback); //調用NetWorkClient發送消息 client.send(clientRequest, now); log.trace("Sent produce request to {}: {}", nodeId, requestBuilder); }
接下來就須要瞭解一下NetWorkClient的發送流程,它的發送最終都是調用doSend函數完成:
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { //獲取目標節點id String nodeId = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); //省略日誌信息打印 Send send = request.toSend(nodeId, header); //新建InFlightRequest,並將請求添加進去 InFlightRequest inFlightRequest = new InFlightRequest( header, clientRequest.createdTimeMs(), clientRequest.destination(), clientRequest.callback(), clientRequest.expectResponse(), isInternalRequest, request, send, now); this.inFlightRequests.add(inFlightRequest); //網絡消息發送 selector.send(inFlightRequest.send); }
至此,咱們消息發送講解完成,接下來說解一下消息的響應拉取過程。
消息的響應拉取是從NetworkClient的poll方法開始的,它的邏輯解析以下:
public List<ClientResponse> poll(long timeout, long now) { ensureActive(); if (!abortedSends.isEmpty()) { // 當鏈接斷開,或者版本不支持,須要優先處理這些響應 List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } //metada信息的響應處理 long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //該poll過程處理全部的網絡鏈接、斷開鏈接,初始化新的發送以及處理過程總的發送和接收請求,接收的信息最終會放到completedReceives中 this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // 處理全部的完成操做及響應 long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; }
響應操做的核心處理幾個函數就是handle*的幾個函數,咱們分別介紹一下:
handleCompletedSends該方法遍歷全部發送完成的對象,對於那些不但願接收響應的請求,建立本地響應隊列並添加進去:
private void handleCompletedSends(List<ClientResponse> responses, long now) { // 遍歷全部的發送完成的send對象 for (Send send : this.selector.completedSends()) { //找出最近一次在inFlightRequests的發送請求信息 InFlightRequest request = this.inFlightRequests.lastSent(send.destination()); //對於發送成功,可是不指望服務端響應的請求,建立本地響應隊列並將其添加進去 if (!request.expectResponse) { //inFlightRequests在發送的時候添加,接收完成後去除 this.inFlightRequests.completeLastSent(send.destination()); // 添加到本地響應隊列中 responses.add(request.completed(null, now)); } } }
handleCompletedReceives該方法獲取服務端響應,並依據響應分類處理,分別是metadata、apiversion
private void handleCompletedReceives(List<ClientResponse> responses, long now) { //從completedReceives中遍歷全部的接收信息,completedReceives中的信息是在上一層的selector.poll中添加進去的 for (NetworkReceive receive : this.selector.completedReceives()) { //獲取返回響應的節點 ID String source = receive.source(); //從 inFlightRequests 集合中獲取緩存的 request 對象 InFlightRequest req = inFlightRequests.completeNext(source); //解析響應信息 Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); //省略日誌 AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); if (req.isInternalRequest && body instanceof MetadataResponse) //處理metadata的更新響應信息 metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) // 若是是更新 API 版本的響應,則更新本地緩存的目標節點支持的 API 版本信息 handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else //添加到本地響應隊列 responses.add(req.completed(body, now)); } }
handleDisconnections該方法會最終調用 Selector#disconnected方法獲取斷開鏈接的節點 ID 集合,並更新相應節點的鏈接狀態爲 DISCONNECTED,同時會清空本地緩存的與該節點相關的數據,最終建立一個 disconnected 類型的 ClientResponse 對象添加到結果集合中。若是這一步確實發現了已斷開的鏈接,則標記須要更新本地緩存的節點元數據信息。
handleConnections該方法會調用Selector#connected方法獲取鏈接正常的節點 ID 集合,若是當前節點是第一次創建鏈接,則需獲取節點支持的 API 版本信息,方法會將當前節點的鏈接狀態設置爲CHECKING_API_VERSIONS,並將節點 ID 添加到 NetworkClient#nodesNeedingApiVersionsFetch 集合中,對於其它節點,則更新相應鏈接狀態爲 READY。
handleInitiateApiVersionRequests該方法用於處理NetworkClient#handleConnections 方法中標記的須要獲取支持的API版本信息的節點,即記錄到 NetworkClient#nodesNeedingApiVersionsFetch 集合中的節點。方法會遍歷處理集合中的節點,並在判斷目標節點容許接收請求的狀況下,構建 ApiVersionsRequest 請求以獲取目標節點支持的 API 版本信息,該請求會被包裝成 ClientRequest 對象,並在下次 Selector#poll操做時一併送出。
handleTimedOutRequests該方法會遍歷緩存在 inFlightRequests 中已經超時的相關請求對應的節點集合,針對此類節點將其視做斷開鏈接進行處理。方法會建立一個 disconnected 類型的 ClientResponse 對象添加到結果集合中,並標記須要更新本地緩存的集羣元數據信息。
最後一個是completeResponses,它的流程很簡單,觸發生產者的回調函數,通知服務端的響應信息:
private void completeResponses(List<ClientResponse> responses) { for (ClientResponse response : responses) { try { //遍歷以前全部階段的handle*處理過程當中添加的response,並回調其callback方法,這樣生產者就收到服務端響應信息了 response.onComplete(); } catch (Exception e) { log.error("Uncaught error in request completion:", e); } } }
Note:本公衆號全部kafka系列的架構及源碼分析文章都是基於1.1.2版本,若有特殊會進行額外聲明。
kafka系列:
掃碼關注咱們
互聯網架構師之路
過濾技術雜質,只爲精品呈現
若是喜歡,請關注加星喔