歡迎你們關注 github.com/hsfxuebao/j… ,但願對你們有所幫助,要是以爲能夠的話麻煩給點一下Star哈java
本文是 Kafka 源碼解析的第四篇,在寫這篇文章以前,專門看了一下 Java NIO 相關的內容,只有理解了 Java NIO 模型才能更好地理解 NIO 在 Kafka 中是如何應用的以及 Producer 如何利用 Java NIO 構建其網絡模型(不瞭解的,能夠先看一下上一篇文章:談一談 Java IO 模型),同時,本文也是對 Producer 整個流程的一個總結,主要講述如下兩個問題:node
KafkaProducer 經過 Sender 進行相應的 IO 操做,而 Sender 又調用 NetworkClient 來進行 IO 操做,NetworkClient 底層是對 Java NIO 進行相應的封裝,其網絡模型以下圖所示(該圖參考:Kafka源碼深度解析-序列3 -Producer -Java NIO,在其基礎上增長一個 KafkaProducer 成員變量的圖形)。git
從圖中能夠看出,Sender 爲最上層的接口,即調用層,Sender 調用 NetworkClient,NetworkClient 調用 Selector,而 Selector 底層封裝了 Java NIO 的相關接口,從右邊的圖也能夠看出它們之間的關係。apache
有了對 Producer 網絡模型的大概框架認識以後,下面再深刻進去,看一下它們之間的調用關係以及 Producer 是如何調用 Java NIO 的相關接口,Producer 端的總體流程以下圖所示。api
這裏涉及到的主要方法是:markdown
KafkaProducer.dosend()
;Sender.run()
;NetworkClient.poll()
(NetworkClient.dosend()
);Selector.poll()
;下面會結合上圖,對這幾個方法作詳細的講解,本文下面的內容都是結合上圖進行講解。網絡
dosend()
方法是讀懂 Producer 的入口,具體能夠參考 dosend(),dosend()
主要作了兩個事情:app
waitOnMetadata()
:請求更新 tp(topic-partition) meta,中間會調用 sender.wakeup()
;accumulator.append()
:將 msg 寫入到其 tp 對應的 deque 中,若是該 tp 對應的 deque 新建了一個 Batch,最後也會調用 sender.wakeup()
。這裏主要關注的是 sender.wakeup()
方法,它的做用是將 Sender 線程從阻塞中喚醒。
這裏來看一下 sender.wakeup()
具體實現:
// org.apache.kafka.clients.producer.internals.Sender
/**
* Wake up the selector associated with this send thread
*/
public void wakeup() {
this.client.wakeup();
}
// org.apache.kafka.clients.NetworkClient
/**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
public void wakeup() {
this.selector.wakeup();
}
// org.apache.kafka.common.network.Selector
/**
* Interrupt the nioSelector if it is blocked waiting to do I/O.
*/
//note: 若是 selector 是阻塞的話,就喚醒
@Override
public void wakeup() {
this.nioSelector.wakeup();
}
複製代碼
這個方法很簡單,但也頗有意思,其調用過程是下面這個樣子:
跟上面兩張圖中 KafkaProducer 的整體調用過程大概一致,它的做用就是將 Sender 線程從 select()
方法的阻塞中喚醒,select()
方法的做用是輪詢註冊在多路複用器上的 Channel,它會一直阻塞在這個方法上,除非知足下面條件中的一個:
不然 select()
將會一直輪詢,阻塞在這個地方,直到條件知足。
分析到這裏,KafkaProducer 中 dosend()
方法調用 sender.wakeup()
方法做用就很明顯的,做用就是:當有新的 RecordBatch 建立後,舊的 RecordBatch 就能夠發送了(或者此時有 Metadata 請求須要發送),若是線程阻塞在 select()
方法中,就將其喚醒,Sender 從新開始運行 run()
方法,在這個方法中,舊的 RecordBatch (或相應的 Metadata 請求)將會被選中,進而能夠及時將這些請求發送出去。
每次循環都是從 Sender 的 run()
方法開始,具體代碼以下:
//note: Sender 線程每次循環具體執行的地方
void run(long now) {
Cluster cluster = metadata.fetch();
//note: Step1 獲取那些已經能夠發送的 RecordBatch 對應的 nodes
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
//note: Step2 若是有 topic-partition 的 leader 是未知的,就強制 metadata 更新
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
//note: 若是與node 沒有鏈接(若是能夠鏈接,會初始化該鏈接),暫時先移除該 node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {//note: 沒有創建鏈接的 broker,這裏會與其創建鏈接
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
//note: Step3 返回該 node 對應的全部能夠發送的 RecordBatch 組成的 batches(key 是 node.id,這些 batches 將會在一個 request 中發送)
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
//note: 保證一個 tp 只有一個 RecordBatch 在發送,保證有序性
//note: max.in.flight.requests.per.connection 設置爲1時會保證
if (guaranteeMessageOrder) {
// Mute all the partitions draine
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
//note: 將因爲元數據不可用而致使發送超時的 RecordBatch 移除
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
//note: Step4 發送 RecordBatch
sendProduceRequests(batches, now);
//note: 若是有 partition 能夠立馬發送數據,那麼 pollTimeout 爲0.
//note: Step5 關於 socket 的一些實際的讀寫操做
this.client.poll(pollTimeout, now);
}
複製代碼
Sender.run()
的大概流程總共有如下五步:
accumulator.ready()
:遍歷全部的 tp(topic-partition),若是其對應的 RecordBatch 能夠發送(大小達到 batch.size
大小或時間達到 linger.ms
),就將其對應的 leader 選出來,最後會返回一個能夠發送 Produce request 的 Set<Node>
(實際返回的是 ReadyCheckResult
實例,不過 Set<Node>
是最主要的成員變量);requestUpdate()
方法更新 metadata,實際上仍是在第一步對 tp 的遍歷中,遇到沒有 leader 的 tp 就將其加入到一個叫作 unknownLeaderTopics
的 set 中,而後會請求這個 tp 的 meta(meta 的更新策略能夠參考以前的一篇博客 Producer Metadata 的更新策略);accumulator.drain()
:遍歷每一個 leader (第一步中選出)上的全部 tp,若是該 tp 對應的 RecordBatch 不在 backoff 期間(沒有重試過,或者重試了可是間隔已經達到了 retryBackoffMs ),而且加上這個 RecordBatch 其大小不超過 maxSize(一個 request 的最大限制,默認爲 1MB),那麼就把這個 RecordBatch 添加 list 中,最終返回的類型爲 Map<Integer, List<RecordBatch>>
,key 爲 leader.id,value 爲要發送的 RecordBatch 的列表;sendProduceRequests()
:發送 Produce 請求,從圖中,能夠看出,這個方法會調用 NetworkClient.send()
來發送 clientRequest;NetworkClient.poll()
:關於 socket 的 IO 操做都是在這個方法進行的,它仍是調用 Selector 進行的相應操做,而 Selector 底層則是封裝的 Java NIO 的相關接口,這個下面會詳細講述。在第三步中,能夠看到,若是要向一個 leader 發送 Produce 請求,那麼這 leader 對應 tp,若是其 RecordBatch 沒有達到要求(batch.size
或 linger.ms
都沒達到)仍是可能會發送,這樣作的好處是:能夠減小 request 的頻率,有利於提供發送效率。
這個方法也是一個很是重要的方法,其做用簡單來講有三點:
具體代碼以下所示:
public List<ClientResponse> poll(long timeout, long now) {
//note: Step1 判斷是否須要更新 meta,若是須要就更新(請求更新 metadata 的地方)
long metadataTimeout = metadataUpdater.maybeUpdate(now);
//note: Step2 調用 Selector.poll() 進行 socket 相關的 IO 操做
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
//note: Step3 處理完成後的操做
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
//note: 處理已經完成的 send(不須要 response 的 request,如 send)
handleCompletedSends(responses, updatedNow);//note: 經過 selector 中獲取 Server 端的 response
//note: 處理從 server 端接收到 Receive(如 Metadata 請求)
handleCompletedReceives(responses, updatedNow);//note: 在返回的 handler 中,會處理 metadata 的更新
//note: 處理鏈接失敗那些鏈接,從新請求 meta
handleDisconnections(responses, updatedNow);
//note: 處理新創建的那些鏈接(還不能發送請求,好比:還未認證)
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
// invoke callbacks
for (ClientResponse response : responses) {
try {
response.onComplete();
} catch (Exception e) {
log.error("Uncaught error in request completion:", e);
}
}
return responses;
}
複製代碼
這個方法大體分爲三步,這裏詳述講述一下:
metadataUpdater.maybeUpdate()
:若是 Metadata 須要更新,那麼就選擇鏈接數最小的 node,發送 Metadata 請求,詳細流程能夠參考以前那篇博客Producer 的 Metadata 更新流程;selector.poll()
:進行 socket IO 相關的操做,下面會詳細講述;select()
過程以後的相關處理。
handleAbortedSends(responses)
:處理那麼在發送過程出現 UnsupportedVersionException
異常的 request;handleCompletedSends(responses, updatedNow)
:處理那些已經完成的 request,若是是那些不須要 response 的 request 的話,這裏直接調用 request.completed()
,標誌着這個 request 發送處理完成;handleCompletedReceives(responses, updatedNow)
:處理那些從 Server 端接收的 Receive,metadata 更新就是在這裏處理的(以及 ApiVersionsResponse
);handleDisconnections(responses, updatedNow)
:處理鏈接失敗那些鏈接,從新請求 metadata;handleConnections()
:處理新創建的那些鏈接(還不能發送請求,好比:還未認證);handleInitiateApiVersionRequests(updatedNow)
:對那些新創建的鏈接,發送 apiVersionRequest(默認狀況:第一次創建鏈接時,須要向 Broker 發送 ApiVersionRequest 請求);handleTimedOutRequests(responses, updatedNow)
:處理 timeout 的鏈接,關閉該鏈接,並刷新 Metadata。Selector 類是 Kafka 對 Java NIO 相關接口的封裝,socket IO 相關的操做都是這個類中完成的,這裏先看一下 poll()
方法,主要的操做都是這個方法中調用的,其代碼實現以下:
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
//note: Step1 清除相關記錄
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout = 0;
/* check ready keys */
//note: Step2 獲取就緒事件的數
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
//note: Step3 處理 io 操做
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
}
//note: Step4 將處理獲得的 stagedReceives 添加到 completedReceives 中
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
//note: 每次 poll 以後會調用一次
//TODO: 鏈接雖然關閉了,可是 Client 端的緩存依然存在
maybeCloseOldestConnection(endSelect);
}
複製代碼
Selector.poll()
方法會進行四步操做,這裏分別來介紹一些。
clear()
方法是在每次 poll()
執行的第一步,它做用的就是清理上一次 poll 過程產生的部分緩存。
//note: 每次 poll 調用前都會清除如下緩存
private void clear() {
this.completedSends.clear();
this.completedReceives.clear();
this.connected.clear();
this.disconnected.clear();
// Remove closed channels after all their staged receives have been processed or if a send was requested
for (Iterator<Map.Entry<String, KafkaChannel>> it = closingChannels.entrySet().iterator(); it.hasNext(); ) {
KafkaChannel channel = it.next().getValue();
Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
boolean sendFailed = failedSends.remove(channel.id());
if (deque == null || deque.isEmpty() || sendFailed) {
doClose(channel, true);
it.remove();
}
}
this.disconnected.addAll(this.failedSends);
this.failedSends.clear();
}
複製代碼
Selector 的 select()
方法在實現上底層仍是調用 Java NIO 原生的接口,這裏的 nioSelector
其實就是 java.nio.channels.Selector
的實例對象,這個方法最壞狀況下,會阻塞 ms 的時間,若是在一次輪詢,只要有一個 Channel 的事件就緒,它就會立馬返回。
private int select(long ms) throws IOException {
if (ms < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}
複製代碼
這部分是 socket IO 的主要部分,發送 Send 及接收 Receive 都是在這裏完成的,在 poll()
方法中,這個方法會調用兩次:
第一次調用的目的是:處理已經就緒的事件,進行相應的 IO 操做;
第二次調用的目的是:處理新創建的那些鏈接,添加緩存及傳輸層(Kafka 又封裝了一次,這裏後續文章會講述)的握手與認證。
private void pollSelectionKeys(Iterable selectionKeys, boolean isImmediatelyConnected, long currentTimeNanos) { Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key);
// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
//note: 處理一些剛創建 tcp 鏈接的 channel
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {//note: 鏈接已經創建
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
} else
continue;
}
/* if channel is not ready finish prepare */
//note: 處理 tcp 鏈接還未完成的鏈接,進行傳輸層的握手及認證
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)//note: 知道讀取一個完整的 Receive,才添加到集合中
addToStagedReceives(channel, networkReceive);//note: 讀取數據
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send = channel.write();
if (send != null) {
this.completedSends.add(send);//note: 將完成的 send 添加到 list 中
this.sensors.recordBytesSent(channel.id(), send.size());
}
}
/* cancel any defunct sockets */
//note: 關閉斷開的鏈接
if (!key.isValid())
close(channel, true);
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException)
log.debug("Connection with {} disconnected", desc, e);
else
log.warn("Unexpected error from {}; closing connection", desc, e);
close(channel, true);
}
}
}
複製代碼
這個方法的目的是處理接收到的 Receive,因爲 Selector 這個類在 Client 和 Server 端都會調用,這裏分兩種狀況講述一下:
mute(String id)
和 unmute(String id)
,對該 KafkaChannel 作標記來保證同時只能處理這個 Channel 的一個 request(能夠理解爲排它鎖)。當 Server 端接收到 request 後,先將其放入 stagedReceives
集合中,此時該 Channel 還未 mute,這個 Receive 會被放入 completedReceives
集合中。Server 在對 completedReceives
集合中的 request 進行處理時,會先對該 Channel mute,處理後的 response 發送完成後再對該 Channel unmute,而後才能處理該 Channel 其餘的請求;mute()
和 unmute()
方法,client 的時序性而是經過 InFlightRequests
和 RecordAccumulator 的 mutePartition
來保證的(下篇文章會講述),所以對於 Client 端而言,這裏接收到的全部 Receive 都會被放入到 completedReceives
的集合中等待後續處理。這個方法只有配合 Server 端的調用才能看明白其做用,它統一 Client 和 Server 調用的 api,使得均可以使用 Selector 這個類。
/**
* checks if there are any staged receives and adds to completedReceives
*/
private void addToCompletedReceives() {
if (!this.stagedReceives.isEmpty()) {//note: 處理 stagedReceives
Iterator<Map.Entry<KafkaChannel, Deque<NetworkReceive>>> iter = this.stagedReceives.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
KafkaChannel channel = entry.getKey();
if (!channel.isMute()) {
Deque<NetworkReceive> deque = entry.getValue();
addToCompletedReceives(channel, deque);
if (deque.isEmpty())
iter.remove();
}
}
}
}
private void addToCompletedReceives(KafkaChannel channel, Deque<NetworkReceive> stagedDeque) {
NetworkReceive networkReceive = stagedDeque.poll();
this.completedReceives.add(networkReceive); //note: 添加到 completedReceives 中
this.sensors.recordBytesReceived(channel.id(), networkReceive.payload().limit());
}
複製代碼
至此,文章的主要內容已經講述得差很少了,第二張圖中最上面的那個調用關係已經講述完,下面講述一下另一個小分支,也就是從 Sender.run()
調用 NetworkClient.send()
開始的那部分,其調用過程以下:
Sender.run()
Sender.sendProduceRequests()
NetworkClient.send()
NetworkClient.dosend()
Selector.send()
KafkaChannel.setSend()
複製代碼
//note: 發送請求
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
// If this request came from outside the NetworkClient, validate
// that we can send data. If the request is internal, we trust
// that that internal code has done this validation. Validation
// will be slightly different for some internal requests (for
// example, ApiVersionsRequests can be sent prior to being in
// READY state.)
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest request = null;
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
//note: 構建 AbstractRequest, 檢查其版本信息
try {
NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
// Note: if versionInfo is null, we have no server version information. This would be
// the case when sending the initial ApiVersionRequest which fetches the version
// information itself. It is also the case when discoverBrokerVersions is set to false.
if (versionInfo == null) {
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending message of type {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), nodeId, builder.version());
} else {
short version = versionInfo.usableVersion(clientRequest.apiKey());
builder.setVersion(version);
}
// The call to build may also throw UnsupportedVersionException, if there are essential
// fields that cannot be represented in the chosen version.
request = builder.build();//note: 當爲 Produce 請求時,轉化爲 ProduceRequest,Metadata 請求時,轉化爲 Metadata 請求
} catch (UnsupportedVersionException e) {
// If the version is not supported, skip sending the request over the wire.
// Instead, simply add it to the local queue of aborted requests.
log.debug("Version mismatch when attempting to send {} to {}",
clientRequest.toString(), clientRequest.destination(), e);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(),
clientRequest.callback(), clientRequest.destination(), now, now,
false, e, null);
abortedSends.add(clientResponse);
return;
}
RequestHeader header = clientRequest.makeHeader();
if (log.isDebugEnabled()) {
int latestClientVersion = ProtoUtils.latestVersion(clientRequest.apiKey().id);
if (header.apiVersion() == latestClientVersion) {
log.trace("Sending {} to node {}.", request, nodeId);
} else {
log.debug("Using older server API v{} to send {} to node {}.",
header.apiVersion(), request, nodeId);
}
}
//note: Send是一個接口,這裏返回的是 NetworkSend,而 NetworkSend 繼承 ByteBufferSend
Send send = request.toSend(nodeId, header);
InFlightRequest inFlightRequest = new InFlightRequest(
header,
clientRequest.createdTimeMs(),
clientRequest.destination(),
clientRequest.callback(),
clientRequest.expectResponse(),
isInternalRequest,
send,
now);
this.inFlightRequests.add(inFlightRequest);
//note: 將 send 和對應 kafkaChannel 綁定起來,並開啓該 kafkaChannel 底層 socket 的寫事件
selector.send(inFlightRequest.send);
}
複製代碼
Producer 端的請求都是經過 NetworkClient.dosend()
來發送的,其做用就是:
apiKey()
構建 Request;NetworkSend
實例;Selector.send
發送該 Send。這個方法就比較容易理解了,它的做用就是獲取該 Send 對應的 KafkaChannel,調用 setSend()
向 KafkaChannel 註冊一個 Write
事件。
//note: 發送請求
public void send(Send send) {
String connectionId = send.destination();
if (closingChannels.containsKey(connectionId))
this.failedSends.add(connectionId);
else {
KafkaChannel channel = channelOrFail(connectionId, false);
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(connectionId);
close(channel, false);
}
}
}
複製代碼
//note: 每次調用時都會註冊一個 OP_WRITE 事件
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
//note: 調用 send() 發送 Send
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
//note: 發送完成後,就刪除這個 WRITE 事件
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
複製代碼
setSend()
方法須要配合 write()
(該方法是在 Selector.poll()
中調用的) 方法一塊兒來看
setSend()
:將當前 KafkaChannel 的 Send 賦值爲要發送的 Send,並註冊一個 OP_WRITE
事件;write()
:發送當前的 Send,發送完後刪除註冊的 OP_WRITE
事件。最後,簡單總結一下,能夠回過頭再看一下第一張圖,對於 KafkaProducer 而言,其直接調用是 Sender,而 Sender 底層調用的是 NetworkClient,NetworkClient 則是經過 Selector 實現,Selector 則是對 Java NIO 原生接口的封裝。
轉自:Kafka 源碼分析系列