talk is easy,show me the code,先來看一段建立producer的代碼java
public class KafkaProducerDemo {
public static void main(String[] args) {
KafkaProducer<String,String> producer = createProducer();
//指定topic,key,value
ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");
//異步發送
producer.send(record);
producer.close();
System.out.println("發送完成");
}
public static KafkaProducer<String,String> createProducer() {
Properties props = new Properties();
//bootstrap.servers 必須設置
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.131:9092");
// key.serializer 必須設置
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value.serializer 必須設置
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//client.id
props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-0");
//retries
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//acks
props.put(ProducerConfig.ACKS_CONFIG, "all");
//max.in.flight.requests.per.connection
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
//linger.ms
props.put(ProducerConfig.LINGER_MS_CONFIG, 100);
//batch.size
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10240);
//buffer.memory
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240);
return new KafkaProducer<>(props);
}
}
複製代碼
生產者的API使用仍是比較簡單,建立一個ProducerRecord對象(這個對象包含目標主題和要發送的內容,固然還能夠指定鍵以及分區),而後調用send方法就把消息發送出去了。在發送ProducerRecord對象時,生產者要先把鍵和值對象序列化成字節數組,這樣才能在網絡上進行傳輸。 在深刻源碼以前,我先給出一張源碼分析圖給你們(其實應該在結尾的時候給出來),這樣看着圖再看源碼跟容易些 node
簡要說明:算法
new KafkaProducer()
後建立一個後臺線程KafkaThread(實際運行線程是Sender,KafkaThread是對Sender的封裝)掃描RecordAccumulator中是否有消息apache
調用KafkaProducer.send()
發送消息,實際是將消息保存到RecordAccumulator中,實際上就是保存到一個Map中(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>
),這條消息會被記錄到同一個記錄批次(相同主題相同分區算同一個批次)裏面,這個批次的全部消息會被髮送到相同的主題和分區上bootstrap
後臺的獨立線程掃描到RecordAccumulator
中有消息後,會將消息發送到kafka集羣中(不是一有消息就發送,而是要看消息是否ready)數組
若是發送成功(消息成功寫入kafka),就返回一個RecordMetaData
對象,它包換了主題和分區信息,以及記錄在分區裏的偏移量。緩存
若是寫入失敗,就會返回一個錯誤,生產者在收到錯誤以後會嘗試從新發送消息(若是容許的話,此時會將消息在保存到RecordAccumulator中),幾回以後若是仍是失敗就返回錯誤消息安全
KafkaClient client = new NetworkClient(...);
this.sender = new Sender(.,client,...);
String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
複製代碼
上面的代碼就是構造KafkaProducer
時核心邏輯,它會構造一個KafkaClient
負責和broker通訊,同時構造一個Sender
並啓動一個異步線程,這個線程會被命名爲:kafka-producer-network-thread|${clientId},若是你在建立producer的時候指定client.id
的值爲myclient,那麼線程名稱就是kafka-producer-network-thread|myclientbash
KafkaProducer<String,String> producer = createProducer();
//指定topic,key,value
ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");
//異步發送,能夠設置回調函數
producer.send(record);
//同步發送
//producer.send(record).get();
複製代碼
發送消息有同步發送以及異步發送兩種方式,咱們通常不使用同步發送,畢竟太過於耗時,使用異步發送的時候能夠指定回調函數,當消息發送完成的時候(成功或者失敗)會經過回調通知生產者。服務器
發送消息其實是將消息緩存起來,核心代碼以下:
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp,
serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);
複製代碼
RecordAccumulator
的核心數據結構是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>
,會將相同主題相同Partition的數據放到一個Deque(雙向隊列)中,這也是咱們以前提到的同一個記錄批次裏面的消息會發送到同一個主題和分區的意思。append()方法的核心源碼以下:
//從batchs(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>)中
//根據主題分區獲取對應的隊列,若是沒有則new ArrayDeque<>返回
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
//計算同一個記錄批次佔用空間大小,batchSize根據batch.size參數決定
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
maxUsableMagic, compression, key, value, headers));
//爲同一個topic,partition分配buffer,若是同一個記錄批次的內存不足,
//那麼會阻塞maxTimeToBlock(max.block.ms參數)這麼長時間
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
//建立MemoryRecordBuilder,經過buffer初始化appendStream(DataOutputStream)屬性
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
//將key,value寫入到MemoryRecordsBuilder中的appendStream(DataOutputStream)中
batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
//將須要發送的消息放入到隊列中
dq.addLast(batch);
}
複製代碼
上面已經將消息存儲RecordAccumulator
中去了,如今看看怎麼發送消息。上面咱們提到了建立KafkaProducer的時候會啓動一個異步線程去從RecordAccumulator中取得消息而後發送到Kafka,發送消息的核心代碼是Sender.java
,它實現了Runnable接口並在後臺一直運行處理髮送請求並將消息發送到合適的節點,直到KafkaProducer被關閉
/** * The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata * requests to renew its view of the cluster and then sends produce requests to the appropriate nodes. */
public class Sender implements Runnable {
public void run() {
// 一直運行直到kafkaProducer.close()方法被調用
while (running) {
run(time.milliseconds());
}
//從日誌上看是開始處理KafkaProducer被關閉後的邏輯
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
//當非強制關閉的時候,可能還仍然有請求而且accumulator中還仍然存在數據,此時咱們須要將請求處理完成
while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
run(time.milliseconds());
}
if (forceClose) {
//若是是強制關閉,且還有未發送完畢的消息,則取消發送並拋出一個異常new KafkaException("Producer is closed forcefully.")
this.accumulator.abortIncompleteBatches();
}
...
}
}
複製代碼
KafkaProducer的關閉方法有2個,close()
以及close(long timeout,TimeUnit timUnit)
,其中timeout參數的意思是等待生產者完成任何待處理請求的最長時間,第一種方式的timeout爲Long.MAX_VALUE毫秒,若是採用第二種方式關閉,當timeout=0的時候則表示強制關閉,直接關閉Sender(設置running=false)。
run(long)方法中咱們先跳過對transactionManager的處理,查看發送消息的主要流程以下:
//將記錄批次轉移到每一個節點的生產請求列表中
long pollTimeout = sendProducerData(now);
//輪詢進行消息發送
client.poll(pollTimeout, now);
複製代碼
首先查看sendProducerData()方法,它的核心邏輯在sendProduceRequest()
方法(處於Sender.java)中
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
//將ProducerBatch中MemoryRecordsBuilder轉換爲MemoryRecords(發送的數據就在這裏面)
MemoryRecords records = batch.records();
produceRecordsByPartition.put(tp, records);
}
ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
produceRecordsByPartition, transactionalId);
//消息發送完成時的回調
RequestCompletionHandler callback = new RequestCompletionHandler() {
public void onComplete(ClientResponse response) {
//處理響應消息
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
};
//根據參數構造ClientRequest,此時須要發送的消息在requestBuilder中
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
//將clientRequest轉換成Send對象(Send.java,包含了須要發送數據的buffer),
//給KafkaChannel設置該對象,記住這裏尚未發送數據
client.send(clientRequest, now);
複製代碼
上面的client.send()方法最終會定位到NetworkClient.doSend()方法,全部的請求(不管是producer發送消息的請求仍是獲取metadata的請求)都是經過該方法設置對應的Send對象。所支持的請求在ApiKeys.java中都有定義,這裏面能夠看到每一個請求的request以及response對應的數據結構。
上面只是設置了發送消息所須要準備的內容,如今進入到發送消息的主流程,發送消息的核心代碼在Selector.java的pollSelectionKeys()方法中,代碼以下:
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
//底層實際調用的是java8 GatheringByteChannel的write方法
channel.write();
}
複製代碼
就這樣,咱們的消息就發送到了broker中了,發送流程分析完畢,這個是完美的狀況,可是總會有發送失敗的時候(消息過大或者沒有可用的leader),那麼發送失敗後重發又是在哪裏完成的呢?還記得上面的回調函數嗎?沒錯,就是在回調函數這裏設置的,先來看下回調函數源碼
private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
RequestHeader requestHeader = response.requestHeader();
if (response.wasDisconnected()) {
//若是是網絡斷開則構造Errors.NETWORK_EXCEPTION的響應
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);
} else if (response.versionMismatch() != null) {
//若是是版本不匹配,則構造Errors.UNSUPPORTED_VERSION的響應
for (ProducerBatch batch : batches.values())
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);
} else {
if (response.hasResponse()) {
//若是存在response就返回正常的response
...
}
} else {
//若是acks=0,那麼則構造Errors.NONE的響應,由於這種狀況只須要發送不須要響應結果
for (ProducerBatch batch : batches.values()) {
completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
}
}
}
}
複製代碼
而在completeBatch方法中咱們主要關注失敗的邏輯處理,核心源碼以下:
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now, long throttleUntilTimeMs) {
Errors error = response.error;
//若是發送的消息太大,須要從新進行分割發送
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
//發生了錯誤,若是此時能夠retry(retry次數未達到限制以及產生異常是RetriableException)
if (canRetry(batch, response)) {
if (transactionManager == null) {
//把須要重試的消息放入隊列中,等待重試,實際就是調用deque.addFirst(batch)
reenqueueBatch(batch, now);
}
}
}
複製代碼
Producer發送消息的流程已經分析完畢,如今回過頭去看流程圖會更加清晰。
更多關於Kafka協議的涉及能夠參考這個連接
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
//若是key爲null,則使用Round Robin算法
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// 根據key進行散列
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
複製代碼
Kafka中對於分區的算法有兩種狀況
若是你想要實現自定義分區,那麼只須要實現Partitioner接口便可。
分析了KafkaProducer的源碼以後,咱們會發現不少參數是貫穿在整個消息發送流程,下面列出了一些KafkaProducer中用到的配置參數。
acks acks參數指定了必需要有多少個分區副本收到該消息,producer纔會認爲消息寫入是成功的。有如下三個選項
acks=0,生產者不須要等待服務器的響應,也就是說若是其中出現了問題,致使服務器沒有收到消息,生產者就無從得知,消息也就丟失了,當時因爲不須要等待響應,因此能夠以網絡可以支持的最大速度發送消息,從而達到很高的吞吐量。
acks=1, 只須要集羣的leader收到消息,生產者就會收到一個來自服務器的成功響應。若是消息沒法到達leader,生產者會收到一個錯誤響應,此時producer會重發消息。不過若是一個沒有收到消息的節點稱爲leader,消息仍是會丟失。
acks=all,當全部參與複製的節點所有收到消息的時候,生產者纔會收到一個來自服務器的成功響應,最安全不過延遲比較高。
buffer.memory
設置生產者內存緩衝區的大小,若是應用程序發送消息的速度超過生產者發送到服務器的速度,那麼就會致使生產者空間不足,此時send()方法要麼被阻塞,要麼拋出異常。取決於如何設置max.block.ms,表示在拋出異常以前能夠阻塞一段時間。
發送消息到服務器收到的錯誤多是能夠臨時的錯誤(好比找不到leader),這種狀況下根據該參數決定生產者重發消息的次數。注意:此時要根據重試次數以及是不是RetriableException來決定是否重試。
當有多個消息須要被髮送到同一個分區的時候,生產者會把他們放到同一個批次裏面(Deque),該參數指定了一個批次可使用的內存大小,按照字節數計算,當批次被填滿,批次裏的全部消息會被髮送出去。不過生產者並不必定會等到批次被填滿才發送,半滿甚至只包含一個消息的批次也有可能被髮送。
指定了生產者在發送批次以前等待更多消息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次發送出去。把linger.ms設置成比0大的數,讓生產者在發送批次以前等待一下子,使更多的消息加入到這個批次,雖然這樣會增長延遲,當時也會提高吞吐量。
指定了在調用send()方法或者partitionsFor()方法獲取元數據時生產者的阻塞時間。當生產者的發送緩衝區已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到max.block.ms時,就會拋出new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
任意字符串,用來標識消息來源,咱們的後臺線程就會根據它來起名兒,線程名稱是kafka-producer-network-thread|{client.id}
該參數指定了生產者在收到服務器響應以前能夠發送多少個消息。它的值越高,就會佔用越多的內存,不過也會提高吞吐量。把它設爲1能夠保證消息是按照發送的順序寫入服務器的,即使發生了重試。
request.timeout.ms指定了生產者在發送數據時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產者在獲取元數據(好比目標分區的leader)時等待服務器返回響應的時間。若是等待響應超時,那麼生產者要麼重試發送數據,要麼返回一個錯誤。timeout.ms指定了broker等待同步副本返回消息確認的時間,與asks的配置相匹配——若是在指定時間內沒有收到同步副本的確認,那麼broker就會返回一個錯誤。
該參數用於控制生產者發送的請求大小。broker對可接收的消息最大值也有本身的限制(message.max.bytes),因此兩邊的配置最好能夠匹配,避免生產者發送的消息被broker拒絕。
這兩個參數分別制定了TCP socket接收和發送數據包的緩衝區大小(和broker通訊仍是經過socket)。若是他們被設置爲-1,就使用操做系統的默認值。若是生產者或消費者與broker處於不一樣的數據中心,那麼能夠適當增大這些值,由於跨數據中心的網絡通常都有比較高的延遲和比較低的帶寬。