Kafka Producer網絡層源碼分析

上一篇 )講了Kafka Producer發送消息的主體流程,這一篇咱們關注下Kafka的網絡層是如何實現的。 對於發送消息而言,Producer是客戶端,Broker是服務器端。 Kafka使用了JavaNIO向服務器發送消息,因此在這以前須要瞭解java nio的基本知識。此次網絡層源碼分析從metadata request切入。java

開局一張圖

kafka_producer_network.png

上面是Kafka producer網絡層的主體流程,先看下有一個大致印象。node

Kafka的底層使用的是Java NIO,Kafka中針對NIO的Selector的封裝類也叫Selector,對Channel的封裝類叫作KafkaChannel。後面若是沒有特殊說明,Selector都是指Kafka中的Selector。git

metadata request

先來回顧下Kafka 發送消息的代碼github

KafkaProducer<String,String> producer = createProducer();
for (int i = 0; i < 10;i++) {
  producer.send(new ProducerRecord<>("codeTest","key" + (i+1),"value" + (i+1)));
}
複製代碼

上面的代碼中首先會初始化KafkaProducer,在初始化KafkaProducer的時候,同時咱們也會初始化Kafka發送消息的客戶端apache

KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
        new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                this.metrics, time, "producer", channelBuilder, logContext),
        ...);
複製代碼

建立NetworkClient(implements KafkaClient)的同時會建立一個Selector,這個是對java.nio.Selector的封裝,發送請求,接收響應,處理鏈接完成以及現有的斷開鏈接都是經過它的poll()方法調用完成的。api

等待metadata更新

咱們都知道Kafka讀寫消息都是經過leader的,只有知道了leader才能發送消息到kafka,在咱們的你好,Kafka一文中,咱們講了首先會發起metadata request,從中就能夠獲取到集羣元信息(leader,partiton,ISR列表等),那麼是在哪裏發起metadata request的呢?緩存

調用KafkaProducer的doSend()(send()-->doSend())方法時,第一步就是經過waitOnMetadata等待集羣元數據(topic,partition,node等)可用。服務器

Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
//若是此時已經有partition的信息了
if (partitionsCount != null && (partition == null || partition < partitionsCount))
    return new ClusterAndWaitTime(cluster, 0);

do {
  int version = metadata.requestUpdate();
  //喚醒Sender()線程,Sender會被poll阻塞(參見java.nio.Channels.Selector.poll())
  sender.wakeup();
  //等待metadata的更新,會一直阻塞直到當前版本大於最近一次版本
  metadata.awaitUpdate(version, remainingWaitMs);
  cluster = metadata.fetch();
  elapsed = time.milliseconds() - begin;
  remainingWaitMs = maxWaitMs - elapsed;
  partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
複製代碼

若是metadata中不包含對應topic的metadata信息,那麼就請求更新metadata,若是沒有更新則一直會在這個while循環中,這個循環主要作了如下幾件事網絡

  1. 調用metadata.requestUpdate();將needUpdate屬性設置爲true(表示強制更新),返回當前version(用於判斷是否更新過了)
  2. 喚醒Sender線程,其實是喚醒NetworkClient中Selector,避免Selector一直在poll中等待
  3. 執行metadata.awaitUpdate等待metadata的更新,未更新則一直阻塞。
//等待元數據更新,直到當前版本大於咱們知道的最新版本
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
  long begin = System.currentTimeMillis();
  long remainingWaitMs = maxWaitMs;
  while ((this.version <= lastVersion) && !isClosed()) {
      if (remainingWaitMs != 0)
          wait(remainingWaitMs);
      long elapsed = System.currentTimeMillis() - begin;
      remainingWaitMs = maxWaitMs - elapsed;
  }
}
複製代碼

發起請求

如今咱們知道了會等待元數據更新,那麼究竟是在哪裏更新的呢?上面有講到喚醒了Sender線程,在run()方法中會去調用KafkaClient.poll()方法,這裏會對metadata request進行處理數據結構

@Override
public List<ClientResponse> poll(long timeout, long now) {
    
  //處理metadata
  long metadataTimeout = metadataUpdater.maybeUpdate(now);

  //這裏會調用java.nio.Selector.select()方法
  this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
  ...

  //這裏會對metadata response進行處理,就能夠獲取到kafka metadata的信息
  handleCompletedReceives(responses, updatedNow);
  ...

  return responses;
}

複製代碼

如今看看metadata request如何發送的

private long maybeUpdate(long now, Node node) {
  String nodeConnectionId = node.idString();
  //已經鏈接,且Channel已經ready,且沒有請求被髮送到給定節點
  if (canSendRequest(nodeConnectionId, now)) {
      //這裏會最終調用NetworkClient.doSend()方法,其實是將Send對象設置到KafkaChannel中,並無進行網絡IO
      sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
      return defaultRequestTimeoutMs;
  }
  if (connectionStates.canConnect(nodeConnectionId, now)) {
      //初始化鏈接
      initiateConnect(node, now);
      return reconnectBackoffMs;
  }
  return Long.MAX_VALUE;
}
複製代碼

上面的代碼先查看是否能夠發送請求,若是能夠發送請求就直接將數據設置到KafkaChannel中。若是不能發送就查看當前是否能夠鏈接,若是能夠則初始化鏈接,初始化鏈接的代碼在Selector.connect()方法中

@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
    
  SocketChannel socketChannel = SocketChannel.open();
  //設置keepAlive以及socket的一些屬性(好比發送數據緩存區大小以及接收數據緩衝區大小)
  configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);

  //實際上調用socketChannel.connect(address);
  boolean connected = doConnect(socketChannel, address);

  //將socketChannel註冊到nioSelector中,同時將生成KafkaChannel(對java.nio.Channel的封裝)
  //並將KafkaChannel綁定到java.nio.SelectionKey中
  SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
// connectct爲true表明該鏈接不會再觸發CONNECT事件,因此這裏要單獨處理
  if (connected) {
      // 加入到一個單獨的集合中
      immediatelyConnectedKeys.add(key);
      // 取消對該鏈接的CONNECT事件的監聽
      key.interestOps(0);
  }
}

private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
  SelectionKey key = socketChannel.register(nioSelector, interestedOps);
  KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
  this.channels.put(id, channel);
  return key;
}
複製代碼

若是熟悉NIO的話,上面的代碼看上去就很熟悉,主要就是設置設置channel以及selectionKey的關係。 須要注意的是doConnect()方法返回爲true的狀況,在非阻塞模式下,對於local connection,鏈接可能立刻就創建好了,那該方法會返回true,對於這種狀況,不會再觸發以後的connect事件。所以kafka用一個單獨的集合immediatelyConnectedKeys將這些特殊的鏈接記錄下來。在接下來的步驟會進行特殊處理。 這裏要留意到KafkaChannel就是在這裏被建立的。到這裏咱們就要來看看KafkaChannel和Selector有哪些屬性是須要咱們注意的

public class KafkaChannel {
  //繼承java.nio.channels.Channel,可讀可寫,對socketChannel的封裝
  private final TransportLayer transportLayer;

  //經過它來建立Buffer和回收Buffer
  private final MemoryPool memoryPool;

  //收到的數據
  private NetworkReceive receive;

  //發送的數據
  private Send send;
}

public class Selector implements Selectable, AutoCloseable {

  //java nio中的Selector
  private final java.nio.channels.Selector nioSelector;

  //kafka服務器節點和Channel之間對應關係
  private final Map<String, KafkaChannel> channels;
  //發送完成的請求
  private final List<Send> completedSends;
  //完整的消息響應
  private final List<NetworkReceive> completedReceives;

  //暫存的消息響應
  private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;

  //當即鏈接上的SelectionKey
  private final Set<SelectionKey> immediatelyConnectedKeys;

  //用於分配ByteBuffer
  private final MemoryPool memoryPool;

}
複製代碼

輪詢數據

初始化鏈接完成以後,這個時候就是開始輪詢了,在Selector.poll()方法中關於數據讀寫的邏輯以下

public void poll(long timeout) throws IOException {
    
  /* check ready keys */
  int numReadyKeys = select(timeout);

  if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
      Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

      pollSelectionKeys(readyKeys, false, endSelect);
      // 清除全部SelectionKey,避免下一次在進行處理
      readyKeys.clear();
      
      //處理髮起鏈接時,立刻就創建鏈接的請求,這種通常只在broker和client在同一臺機器上才存在
      pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
      immediatelyConnectedKeys.clear();
  }
  //將暫存起來的網絡響應添加到已完成網絡響應集合裏面
  addToCompletedReceives();
}

void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
  for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
    KafkaChannel channel = channel(key);
 
    boolean sendFailed = false;
    //READ事件
    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
    && !explicitlyMutedChannels.contains(channel)) {

      NetworkReceive networkReceive;

      //read方法會從channel中將數據讀取到Buffer中(仍是經過KafkaChannel中的transportLayer),
      while ((networkReceive = channel.read()) != null) {
        if (!stagedReceives.containsKey(channel))
          stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

        //將讀到的請求存起來
        Deque<NetworkReceive> deque = stagedReceives.get(channel);
        deque.add(receive);
      }
  }

  //寫事件
  if (channel.ready() && key.isWritable()) {
    //從buffer中寫入數據到Channel(KafkaChannel中的transportLayer)
    Send send = channel.write();
  }
}

複製代碼

上面的代碼主要作了兩件事兒

  1. 讀取網絡返回的請求,從Channel讀進Buffer,Buffer是有容量限制的,因此可能一次只能讀取一個req的部分數據。只有讀到一個完整的req的狀況下,channel.read()方法才返回非null
  2. 發送數據,從Buffer寫入Channel,這裏發起了真正的網絡IO

讀出數據後,會先放到stagedReceives集合中,而後在addToCompletedReceives()方法中對於每一個channel都會從stagedReceives取出一個NetworkReceive(若是有的話),放入到completedReceives中。

private void addToCompletedReceives() {
  if (!this.stagedReceives.isEmpty()) {
    Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
    while (iter.hasNext()) {
        Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
        KafkaChannel channel = entry.getKey();
        //被mute的channel會被放到explicitlyMutedChannels中,chanel被mute是在服務端(scala)執行的
        if (!explicitlyMutedChannels.contains(channel)) {
            Deque<NetworkReceive> deque = entry.getValue();
            addToCompletedReceives(channel, deque);
            if (deque.isEmpty())
                iter.remove();
        }
    }
  }
}
複製代碼

根據官方代碼註釋這樣作的緣由有2點,還能夠參考(github.com/apache/kafk…)。

  1. 對於SSL鏈接,數據內容是加密的,不能精準的肯定本次須要讀取的數據大小,只能儘量的多讀,這樣會致使可能會比請求的數據讀的要多。那若是該channel以後沒有數據能夠讀,會致使多讀的數據將不會被處理。
  2. kafka須要確保一個channel上request被處理的順序是其發送的順序。所以對於每一個channel而言,每次poll上層最多隻能看見一個請求,當該請求處理完成以後,再處理其餘的請求。對Server端和Client端來講處理方式不同。Selector這個類在Client和Server端都會調用,因此這裏存在兩種狀況
  3. 應用在 Server 端時,Server 爲了保證消息的時序性,在 Selector 中提供了兩個方法:mute(String id) 和 unmute(String id),對該 KafkaChannel 作標記來保證同時只能處理這個 Channel 的一個 request(能夠理解爲排它鎖)。當 Server 端接收到 request 後,先將其放入 stagedReceives 集合中,此時該 Channel 還未 mute,這個 Receive 會被放入 completedReceives 集合中。Server 在對 completedReceives 集合中的 request 進行處理時,會先對該 Channel mute,處理後的 response 發送完成後再對該 Channel unmute,而後才能處理該 Channel 其餘的請求
  4. 應用在 Client 端時,Client 並不會調用 Selector 的 mute() 和 unmute() 方法,client 發送消息的時序性而是經過 InFlightRequests(保存了max.in.flight.requests.per.connection參數的值) 和 RecordAccumulator 的 mutePartition 來保證的,所以對於 Client 端而言,這裏接收到的全部 Receive 都會被放入到 completedReceives 的集合中等待後續處理。

處理響應

如今響應數據已經收到了,在KafkaClient.poll方法中會調用handleCompletedReceives()方法處理已經完成的響應

private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        InFlightRequest req = inFlightRequests.completeNext(source);
        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
            throttleTimeSensor, now);
      
       //根據返回的數據結構解析對應body
        AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
        maybeThrottle(body, req.header.apiVersion(), req.destination, now);
        //處理MetadataResponse數據,從中解析topic,partition,broker的對應關係
        if (req.isInternalRequest && body instanceof MetadataResponse)
            metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            responses.add(req.completed(body, now));
    }
}
複製代碼

至此,發送metadata request的流程已經分析完畢,發送消息的流程和metadata request的流程大致是一致的,這裏就不作過多分析了。

總結

總結下發送流程

  1. sender 線程第一次調用 poll() 方法時,初始化與 node 的鏈接;
  2. sender 線程第二次調用 poll() 方法時,發送 Metadata 請求;
  3. sender 線程第三次調用 poll() 方法時,獲取 metadataResponse,並更新 metadata。

通過上述 sender 線程三次調用 poll()方法,所請求的 metadata 信息纔會獲得更新,此時 Producer 線程也不會再阻塞,開始發送消息。

分析Kafka網絡層的構成的時候,必定要搞清楚NIO的處理流程,進一步理解Kafka中的Selector和KafkaChannel。

本次源代碼分析基於kafka-client-2.0.0版本。

相關文章
相關標籤/搜索