Kafka 在初始化生產者客戶端時,建立並啓動 Sender 線程。經過 Sender 線程來發送消息、處理消息的響應。經過「volatile boolean running」狀態控制 Sender 線程不斷輪詢,調用 NetworkClient 的 poll 方法。NetworkClient 是 Kafka 實現的用來和 broker 通訊的類,實現了 KafkaClient 接口,底層實際上就是利用 JDK NIO 來實現的,而 Kafka 把 NIO 又封裝成 Selector。node
Sender 的執行過程能夠粗略地分爲:發送準備、開始發送。緩存
void run(long now) { long pollTimeout = sendProducerData(now); // 發送準備 client.poll(pollTimeout, now); // 開始發送 }
private long sendProducerData(long now) { // 把分區->消息隊列的映射關係轉換成節點->消息隊列的映射關係 Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now); // 準備發送消息 sendProduceRequests(batches, now); return pollTimeout; } private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) { Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size()); final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size()); ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout, produceRecordsByPartition, transactionalId); RequestCompletionHandler callback = new RequestCompletionHandler() { public void onComplete(ClientResponse response) { // 請求完成後的回調 handleProduceResponse(response, recordsByPartition, time.milliseconds()); } }; // 構造請求對象 ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); client.send(clientRequest, now); } private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); // 構造 Send 的實現類 NetworkSend Send send = request.toSend(destination, header); InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now); // 加入 InFlightRequests this.inFlightRequests.add(inFlightRequest); // 將 NetworkSend 綁定到 KafkaChannel,並註冊寫操做 selector.send(send); } public void send(Send send) { String connectionId = send.destination(); KafkaChannel channel = openOrClosingChannelOrFail(connectionId); // 獲取 KafkaChannel 通道 channel.setSend(send); } public void setSend(Send send) { this.send = send; // 綁定到當前 KafkaChannel this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); // 註冊寫操做 }
// 真正開始發送 public List<ClientResponse> poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 調用 kafka.Selector.poll() // 處理響應 long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); ... completeResponses(responses); // 回調處理響應 return responses; } // kafka.Selector public void poll(long timeout) throws IOException { // 執行 NIO.Selector 當有通道準備就緒時,返回 key 的數量 int numReadyKeys = select(timeout); long endSelect = time.nanoseconds(); if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) { Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys(); // Poll from channels where the underlying socket has more data pollSelectionKeys(readyKeys, false, endSelect); } // 把已經接收完成的加入 completedReceives 集合 addToCompletedReceives(); } // 處理 SelectionKey 準備就緒的 IO void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { for (SelectionKey key : determineHandlingOrder(selectionKeys)) { KafkaChannel channel = channel(key); try { // 判斷通道是否可讀 if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) { NetworkReceive networkReceive; while ((networkReceive = channel.read()) != null) { // 保證接收到了完整消息 madeReadProgressLastPoll = true; addToStagedReceives(channel, networkReceive); } } // 判斷通道是否可寫 if (channel.ready() && key.isWritable()) { Send send = channel.write(); // 寫到 SocketChannel } } } }