Kafka2.0生產者客戶端源碼分析 - Sender線程

  Kafka 在初始化生產者客戶端時,建立並啓動 Sender 線程。經過 Sender 線程來發送消息、處理消息的響應。經過「volatile boolean running」狀態控制 Sender 線程不斷輪詢,調用 NetworkClient 的 poll 方法。NetworkClient 是 Kafka 實現的用來和 broker 通訊的類,實現了 KafkaClient 接口,底層實際上就是利用 JDK NIO 來實現的,而 Kafka 把 NIO 又封裝成 Selector。node

調用關係

  Sender 的執行過程能夠粗略地分爲:發送準備、開始發送。緩存

void run(long now) {
    long pollTimeout = sendProducerData(now); // 發送準備
    client.poll(pollTimeout, now); // 開始發送
}

發送準備

  1. 取出記錄累加器中的記錄,轉換成節點->消息隊列的映射 Map<Integer, List> batches
  2. 使用上述 batches 構造能夠發送的請求,緩存到 InFlightRequests
  3. 獲取 KafkaChannel,添加消息 NetworkSend,並註冊寫事件 OP_WRITE
private long sendProducerData(long now) {
    // 把分區->消息隊列的映射關係轉換成節點->消息隊列的映射關係
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
    // 準備發送消息
    sendProduceRequests(batches, now);
    return pollTimeout;
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
    Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
    final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

    ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
            produceRecordsByPartition, transactionalId);
    RequestCompletionHandler callback = new RequestCompletionHandler() {
        public void onComplete(ClientResponse response) { // 請求完成後的回調
            handleProduceResponse(response, recordsByPartition, time.milliseconds());
        }
    };
    // 構造請求對象
    ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, requestTimeoutMs, callback); 
    client.send(clientRequest, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String destination = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    // 構造 Send 的實現類 NetworkSend
    Send send = request.toSend(destination, header);
    InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
    // 加入 InFlightRequests
    this.inFlightRequests.add(inFlightRequest);
    // 將 NetworkSend 綁定到 KafkaChannel,並註冊寫操做
    selector.send(send);
}
public void send(Send send) {
    String connectionId = send.destination();
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId); // 獲取 KafkaChannel 通道
    channel.setSend(send);
}
public void setSend(Send send) {
    this.send = send; // 綁定到當前 KafkaChannel
    this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); // 註冊寫操做
}

開始發送

  1. 調用 NIO.Selector.select() 方法阻塞輪詢,當有事件時,返回準備就緒的 key 數量
  2. 根據事件類型(可讀/可寫)處理通道內的記錄
  3. 把不一樣事件處理後的響應加入集合,回調準備階段實現的請求完成處理器來處理響應
  4. 把處理完的響應再次回調 Trunk.onCompletion(),即發送消息時定義的異步回調
// 真正開始發送
public List<ClientResponse> poll(long timeout, long now) {
    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); // 調用 kafka.Selector.poll()

    // 處理響應
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    ...
    completeResponses(responses); // 回調處理響應

    return responses;
}
// kafka.Selector
public void poll(long timeout) throws IOException {
    // 執行 NIO.Selector 當有通道準備就緒時,返回 key 的數量
    int numReadyKeys = select(timeout); 
    long endSelect = time.nanoseconds();

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
        // Poll from channels where the underlying socket has more data
        pollSelectionKeys(readyKeys, false, endSelect);
    }

    // 把已經接收完成的加入 completedReceives 集合
    addToCompletedReceives();
}
// 處理 SelectionKey 準備就緒的 IO
void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
    for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
        KafkaChannel channel = channel(key);
        try {
            // 判斷通道是否可讀
           if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel) && !explicitlyMutedChannels.contains(channel)) {
                NetworkReceive networkReceive;
                while ((networkReceive = channel.read()) != null) { // 保證接收到了完整消息
                    madeReadProgressLastPoll = true;
                    addToStagedReceives(channel, networkReceive);
                }
            }
            // 判斷通道是否可寫
            if (channel.ready() && key.isWritable()) {
                Send send = channel.write(); // 寫到 SocketChannel
            }
        }
    }
}

總體流程

總體流程

相關文章
相關標籤/搜索