歡迎你們關注 github.com/hsfxuebao/j… ,但願對你們有所幫助,要是以爲能夠的話麻煩給點一下Star哈html
今天把 Kafka Producer 最後一部分給講述一下,Producer 大部份內容都已經在前面幾篇文章介紹過了,這裏簡單作個收尾,但並非對前面的總結,本文從兩塊來說述:RecordAccumulator 類的實現、Kafka Producer 如何保證其順序性以及 Kafka Producer 的配置說明,每一個 Producer 線程都會有一個 RecordAccumulator 對象,它負責緩存要發送 RecordBatch、記錄發送的狀態而且進行相應的處理,這裏會詳細講述 Kafka Producer 如何保證單 Partition 的有序性。最後,簡單介紹一下 Producer 的參數配置說明,只有正確地理解 Producer 相關的配置參數,才能更好地使用 Producer,發揮其相應的做用。java
這裏再看一下 RecordAccumulator 的數據結構,以下圖所示,每一個 topic-partition 都有一個對應的 deque,deque 中存儲的是 RecordBatch,它是發送的基本單位,只有這個 topic-partition 的 RecordBatch 達到大小或時間要求才會觸發發送操做(但並非只有達到這兩個條件之一纔會被髮送,這點要理解清楚)。node
再看一下 RecordAccumulator 類的主要方法介紹,以下圖所示。github
RecordAccumulator 主要方法及其說明apache
這張圖基本上涵蓋了 RecordAccumulator 的主要方法,下面會選擇其中幾個方法詳細講述,會圍繞着 Kafka Producer 如何實現單 Partition 順序性這個主題來說述。緩存
先看下 mutePartition()
與 unmutePartition()
這兩個方法,它們是保證有序性關鍵之一,其主要作用就是將指定的 topic-partition 從 muted 集合中加入或刪除,後面會看到它們的做用。markdown
private final Set<TopicPartition> muted;
public void mutePartition(TopicPartition tp) {
muted.add(tp);
}
public void unmutePartition(TopicPartition tp) {
muted.remove(tp);
}
複製代碼
這裏先說一下這兩個方法調用的條件,這樣的話,下面在介紹其餘方法時纔會更容易理解:數據結構
mutePartition()
:若是要求保證順序性,那麼這個 tp 對應的 RecordBatch 若是要開始發送,就將這個 tp 加入到 muted
集合中;unmutePartition()
:若是 tp 對應的 RecordBatch 發送完成,tp 將會從 muted
集合中移除。也就是說,muted
是用來記錄這個 tp 是否有還有未完成的 RecordBatch。less
ready()
是在 Sender 線程中調用的,其做用選擇那些能夠發送的 node,也就是說,若是這個 tp 對應的 batch 能夠發送(達到時間或大小要求),就把 tp 對應的 leader 選出來。
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
TopicPartition part = entry.getKey();
Deque<RecordBatch> deque = entry.getValue();
Node leader = cluster.leaderFor(part);
synchronized (deque) {
if (leader == null && !deque.isEmpty()) {
// This is a partition for which leader is not known, but messages are available to send.
// Note that entries are currently not removed from batches when deque is empty.
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !muted.contains(part)) {//note: part 若是 mute 就不會遍歷
RecordBatch batch = deque.peekFirst();
if (batch != null) {
boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs;
//note: 是不是在重試
long waitedTimeMs = nowMs - batch.lastAttemptMs;
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
boolean full = deque.size() > 1 || batch.isFull(); //note: batch 滿了
boolean expired = waitedTimeMs >= timeToWaitMs; //note: batch 超時
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
readyNodes.add(leader);// note: 將能夠發送的 leader 添加到集合中
} else {
// Note that this results in a conservative estimate since an un-sendable partition may have
// a leader that will later be found to have sendable data. However, this is good enough
// since we'll just wake up and then sleep again for the remaining time.
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
複製代碼
能夠看到這一行 (!readyNodes.contains(leader) && !muted.contains(part))
,若是 muted
集合包含這個 tp,那麼在遍歷時將不會處理它對應的 deque,也就是說,若是一個 tp 加入了 muted
集合中,即便它對應的 RecordBatch 能夠發送了,也不會觸發引發其對應的 leader 被選擇出來。
drain()
是用來遍歷可發送請求的 node,而後再遍歷在這個 node 上全部 tp,若是 tp 對應的 deque 有數據,將會被選擇出來直到超過一個請求的最大長度(max.request.size
)爲止,也就說說即便 RecordBatch 沒有達到條件,但爲了保證每一個 request 儘快多地發送數據提升發送效率,這個 RecordBatch 依然會被提早選出來並進行發送。
//note: 返回該 node 對應的能夠發送的 RecordBatch 的 batches,並從 queue 中移除(最大的大小爲maxSize,超過的話,下次再發送)
public Map<Integer, List<RecordBatch>> drain(Cluster cluster,
Set<Node> nodes,
int maxSize,
long now) {
if (nodes.isEmpty())
return Collections.emptyMap();
Map<Integer, List<RecordBatch>> batches = new HashMap<>();
for (Node node : nodes) {
int size = 0;
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<RecordBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
int start = drainIndex = drainIndex % parts.size();
do {
PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
// Only proceed if the partition has no in-flight batches.
if (!muted.contains(tp)) {//note: 被 mute 的 tp 依然不會被遍歷
Deque<RecordBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
if (deque != null) {
synchronized (deque) {
RecordBatch first = deque.peekFirst();
if (first != null) {
boolean backoff = first.attempts > 0 && first.lastAttemptMs + retryBackoffMs > now;
// Only drain the batch if it is not during backoff period.
if (!backoff) {
if (size + first.sizeInBytes() > maxSize && !ready.isEmpty()) {
// there is a rare case that a single batch size is larger than the request size due
// to compression; in this case we will still eventually send this batch in a single
// request
break;
} else {
RecordBatch batch = deque.pollFirst();
batch.close();
size += batch.sizeInBytes();
ready.add(batch);
batch.drainedMs = now;
}
}
}
}
}
}
this.drainIndex = (this.drainIndex + 1) % parts.size();
} while (start != drainIndex);
batches.put(node.id(), ready);
}
return batches;
}
複製代碼
在遍歷 node 的全部 tp 時,能夠看到是有條件的 —— !muted.contains(tp)
,若是這個 tp 被添加到 muted
集合中,那麼它將不會被遍歷,也就不會做爲 request 一部分被髮送出去,這也就保證了 tp 若是還有未完成的 RecordBatch,那麼其對應 deque 中其餘 RecordBatch 即便達到條件也不會被髮送,就保證了 tp 在任什麼時候刻只有一個 RecordBatch 在發送。
是否保證順序性,仍是在 Sender 線程中實現的,mutePartition()
與 unmutePartition()
也都是在 Sender 中調用的,這裏看一下 KafkaProducer 是如何初始化一個 Sender 對象的。
// from KafkaProducer
this.sender = new Sender(client,
this.metadata,
his.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
Time.SYSTEM,
this.requestTimeoutMs);//NOTE: Sender 實例,發送請求的後臺線程
// from Sender
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
Metrics metrics,
Time time,
int requestTimeout) {
this.client = client;
this.accumulator = accumulator;
this.metadata = metadata;
this.guaranteeMessageOrder = guaranteeMessageOrder;
this.maxRequestSize = maxRequestSize;
this.running = true; //note: 默認爲 true
this.acks = acks;
this.retries = retries;
this.time = time;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}
複製代碼
對於上述過程能夠這樣進行解讀
this.guaranteeMessageOrder = (config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1)
若是 KafkaProducer 的 max.in.flight.requests.per.connection
設置爲1,那麼就能夠保證其順序性,不然的話,就不保證順序性,從下面這段代碼也能夠看出。
//from Sender
//note: max.in.flight.requests.per.connection 設置爲1時會保證
if (guaranteeMessageOrder) {
// Mute all the partitions draine
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
複製代碼
也就是說,若是要保證單 Partition 的順序性,須要在 Producer 中配置 max.in.flight.requests.per.connection=1
,而其實現機制則是在 RecordAccumulator 中實現的。
這裏是關於 Kafka Producer 一些配置的說明,內容來自官方文檔Producer Configs以及本身的一些我的理解,這裏以官方文檔保持一致,按其重要性分爲三個級別進行講述(涉及到權限方面的參數,這裏先不介紹)。
下面的這些參數雖然被描述爲 medium,但實際上對 Producer 的吞吐量等影響也一樣很大,在實踐中跟 high 參數的重要性基本同樣。
至此,Kafka Producer 部分的源碼分析已經結束,從下週開始將開始對 Kafka Consumer 部分進行分析。對於不一樣的場景,合理配置相應的 Kafka Producer 參數。
轉自:Kafka 源碼分析系列