上一篇 )講了Kafka Producer發送消息的主體流程,這一篇咱們關注下Kafka的網絡層是如何實現的。 對於發送消息而言,Producer是客戶端,Broker是服務器端。 Kafka使用了JavaNIO向服務器發送消息,因此在這以前須要瞭解java nio的基本知識。此次網絡層源碼分析從metadata request切入。java
上面是Kafka producer網絡層的主體流程,先看下有一個大致印象。node
Kafka的底層使用的是Java NIO,Kafka中針對NIO的Selector的封裝類也叫Selector,對Channel的封裝類叫作KafkaChannel。後面若是沒有特殊說明,Selector都是指Kafka中的Selector。git
先來回顧下Kafka 發送消息的代碼github
KafkaProducer<String,String> producer = createProducer();
for (int i = 0; i < 10;i++) {
producer.send(new ProducerRecord<>("codeTest","key" + (i+1),"value" + (i+1)));
}
複製代碼
上面的代碼中首先會初始化KafkaProducer,在初始化KafkaProducer的時候,同時咱們也會初始化Kafka發送消息的客戶端apache
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
...);
複製代碼
建立NetworkClient(implements KafkaClient)的同時會建立一個Selector,這個是對java.nio.Selector的封裝,發送請求,接收響應,處理鏈接完成以及現有的斷開鏈接都是經過它的poll()方法調用完成的。api
咱們都知道Kafka讀寫消息都是經過leader的,只有知道了leader才能發送消息到kafka,在咱們的你好,Kafka一文中,咱們講了首先會發起metadata request,從中就能夠獲取到集羣元信息(leader,partiton,ISR列表等),那麼是在哪裏發起metadata request的呢?緩存
調用KafkaProducer的doSend()(send()-->doSend())方法時,第一步就是經過waitOnMetadata等待集羣元數據(topic,partition,node等)可用。服務器
Cluster cluster = metadata.fetch();
Integer partitionsCount = cluster.partitionCountForTopic(topic);
//若是此時已經有partition的信息了
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
do {
int version = metadata.requestUpdate();
//喚醒Sender()線程,Sender會被poll阻塞(參見java.nio.Channels.Selector.poll())
sender.wakeup();
//等待metadata的更新,會一直阻塞直到當前版本大於最近一次版本
metadata.awaitUpdate(version, remainingWaitMs);
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null);
複製代碼
若是metadata中不包含對應topic的metadata信息,那麼就請求更新metadata,若是沒有更新則一直會在這個while循環中,這個循環主要作了如下幾件事網絡
//等待元數據更新,直到當前版本大於咱們知道的最新版本
public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
while ((this.version <= lastVersion) && !isClosed()) {
if (remainingWaitMs != 0)
wait(remainingWaitMs);
long elapsed = System.currentTimeMillis() - begin;
remainingWaitMs = maxWaitMs - elapsed;
}
}
複製代碼
如今咱們知道了會等待元數據更新,那麼究竟是在哪裏更新的呢?上面有講到喚醒了Sender線程,在run()方法中會去調用KafkaClient.poll()方法,這裏會對metadata request進行處理數據結構
@Override
public List<ClientResponse> poll(long timeout, long now) {
//處理metadata
long metadataTimeout = metadataUpdater.maybeUpdate(now);
//這裏會調用java.nio.Selector.select()方法
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
...
//這裏會對metadata response進行處理,就能夠獲取到kafka metadata的信息
handleCompletedReceives(responses, updatedNow);
...
return responses;
}
複製代碼
如今看看metadata request如何發送的
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
//已經鏈接,且Channel已經ready,且沒有請求被髮送到給定節點
if (canSendRequest(nodeConnectionId, now)) {
//這裏會最終調用NetworkClient.doSend()方法,其實是將Send對象設置到KafkaChannel中,並無進行網絡IO
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
return defaultRequestTimeoutMs;
}
if (connectionStates.canConnect(nodeConnectionId, now)) {
//初始化鏈接
initiateConnect(node, now);
return reconnectBackoffMs;
}
return Long.MAX_VALUE;
}
複製代碼
上面的代碼先查看是否能夠發送請求,若是能夠發送請求就直接將數據設置到KafkaChannel中。若是不能發送就查看當前是否能夠鏈接,若是能夠則初始化鏈接,初始化鏈接的代碼在Selector.connect()方法中
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
SocketChannel socketChannel = SocketChannel.open();
//設置keepAlive以及socket的一些屬性(好比發送數據緩存區大小以及接收數據緩衝區大小)
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
//實際上調用socketChannel.connect(address);
boolean connected = doConnect(socketChannel, address);
//將socketChannel註冊到nioSelector中,同時將生成KafkaChannel(對java.nio.Channel的封裝)
//並將KafkaChannel綁定到java.nio.SelectionKey中
SelectionKey key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
// connectct爲true表明該鏈接不會再觸發CONNECT事件,因此這裏要單獨處理
if (connected) {
// 加入到一個單獨的集合中
immediatelyConnectedKeys.add(key);
// 取消對該鏈接的CONNECT事件的監聽
key.interestOps(0);
}
}
private SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
this.channels.put(id, channel);
return key;
}
複製代碼
若是熟悉NIO的話,上面的代碼看上去就很熟悉,主要就是設置設置channel以及selectionKey的關係。 須要注意的是doConnect()方法返回爲true的狀況,在非阻塞模式下,對於local connection,鏈接可能立刻就創建好了,那該方法會返回true,對於這種狀況,不會再觸發以後的connect事件。所以kafka用一個單獨的集合immediatelyConnectedKeys將這些特殊的鏈接記錄下來。在接下來的步驟會進行特殊處理。 這裏要留意到KafkaChannel就是在這裏被建立的。到這裏咱們就要來看看KafkaChannel和Selector有哪些屬性是須要咱們注意的
public class KafkaChannel {
//繼承java.nio.channels.Channel,可讀可寫,對socketChannel的封裝
private final TransportLayer transportLayer;
//經過它來建立Buffer和回收Buffer
private final MemoryPool memoryPool;
//收到的數據
private NetworkReceive receive;
//發送的數據
private Send send;
}
public class Selector implements Selectable, AutoCloseable {
//java nio中的Selector
private final java.nio.channels.Selector nioSelector;
//kafka服務器節點和Channel之間對應關係
private final Map<String, KafkaChannel> channels;
//發送完成的請求
private final List<Send> completedSends;
//完整的消息響應
private final List<NetworkReceive> completedReceives;
//暫存的消息響應
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
//當即鏈接上的SelectionKey
private final Set<SelectionKey> immediatelyConnectedKeys;
//用於分配ByteBuffer
private final MemoryPool memoryPool;
}
複製代碼
初始化鏈接完成以後,這個時候就是開始輪詢了,在Selector.poll()方法中關於數據讀寫的邏輯以下
public void poll(long timeout) throws IOException {
/* check ready keys */
int numReadyKeys = select(timeout);
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
pollSelectionKeys(readyKeys, false, endSelect);
// 清除全部SelectionKey,避免下一次在進行處理
readyKeys.clear();
//處理髮起鏈接時,立刻就創建鏈接的請求,這種通常只在broker和client在同一臺機器上才存在
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
}
//將暫存起來的網絡響應添加到已完成網絡響應集合裏面
addToCompletedReceives();
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
boolean sendFailed = false;
//READ事件
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
NetworkReceive networkReceive;
//read方法會從channel中將數據讀取到Buffer中(仍是經過KafkaChannel中的transportLayer),
while ((networkReceive = channel.read()) != null) {
if (!stagedReceives.containsKey(channel))
stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());
//將讀到的請求存起來
Deque<NetworkReceive> deque = stagedReceives.get(channel);
deque.add(receive);
}
}
//寫事件
if (channel.ready() && key.isWritable()) {
//從buffer中寫入數據到Channel(KafkaChannel中的transportLayer)
Send send = channel.write();
}
}
複製代碼
上面的代碼主要作了兩件事兒
讀出數據後,會先放到stagedReceives集合中,而後在addToCompletedReceives()方法中對於每一個channel都會從stagedReceives取出一個NetworkReceive(若是有的話),放入到completedReceives中。
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
//被mute的channel會被放到explicitlyMutedChannels中,chanel被mute是在服務端(scala)執行的
if (!explicitlyMutedChannels.contains(channel)) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
複製代碼
根據官方代碼註釋這樣作的緣由有2點,還能夠參考(github.com/apache/kafk…)。
如今響應數據已經收到了,在KafkaClient.poll方法中會調用handleCompletedReceives()方法處理已經完成的響應
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
//根據返回的數據結構解析對應body
AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
//處理MetadataResponse數據,從中解析topic,partition,broker的對應關係
if (req.isInternalRequest && body instanceof MetadataResponse)
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
}
}
複製代碼
至此,發送metadata request的流程已經分析完畢,發送消息的流程和metadata request的流程大致是一致的,這裏就不作過多分析了。
總結下發送流程
通過上述 sender 線程三次調用 poll()方法,所請求的 metadata 信息纔會獲得更新,此時 Producer 線程也不會再阻塞,開始發送消息。
分析Kafka網絡層的構成的時候,必定要搞清楚NIO的處理流程,進一步理解Kafka中的Selector和KafkaChannel。
本次源代碼分析基於kafka-client-2.0.0版本。