> 舒適提示:本文基於 Kafka 2.2.1 版本。本文主要是以源碼的手段一步一步探究消息發送流程,若是對源碼不感興趣,能夠直接跳到文末查看消息發送流程圖與消息發送本地緩存存儲結構。java
從上文 初識 Kafka Producer 生產者,能夠經過 KafkaProducer 的 send 方法發送消息,send 方法的聲明以下:算法
Future<recordmetadata> send(ProducerRecord<k, v> record) Future<recordmetadata> send(ProducerRecord<k, v> record, Callback callback)
從上面的 API 能夠得知,用戶在使用 KafkaProducer 發送消息時,首先須要將待發送的消息封裝成 ProducerRecord,返回的是一個 Future 對象,典型的 Future 設計模式。在發送時也能夠指定一個 Callable 接口用來執行消息發送的回調。設計模式
咱們在學習消息發送流程以前先來看一下用於封裝一條消息的 ProducerRecord 的類圖,先來認識一下 kafka 是如何對一條消息進行抽象的。api
咱們首先來看一下 ProducerRecord 的核心屬性,即構成 消息的6大核心要素:緩存
其中Headers是一系列的 key-value 鍵值對。服務器
在瞭解 ProducerRecord 後咱們開始來探討 Kafka 的消息發送流程。架構
KafkaProducer 的 send 方法,並不會直接向 broker 發送消息,kafka 將消息發送異步化,即分解成兩個步驟,send 方法的職責是將消息追加到內存中(分區的緩存隊列中),而後會由專門的 Send 線程異步將緩存中的消息批量發送到 Kafka Broker 中。併發
消息追加入口爲 KafkaProducer#sendapp
public Future<recordmetadata> send(ProducerRecord<k, v> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<k, v> interceptedRecord = this.interceptors.onSend(record); // [@1](https://my.oschina.net/u/1198) return doSend(interceptedRecord, callback); // @2 }
代碼@1:首先執行消息發送攔截器,攔截器經過 interceptor.classes 指定,類型爲 List< String >,每個元素爲攔截器的全類路徑限定名。 代碼@2:執行 doSend 方法,後續咱們須要留意一下 Callback 的調用時機。異步
接下來咱們來看 doSend 方法。
KafkaProducer#doSend
ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Step1:獲取 topic 的分區列表,若是本地沒有該topic的分區信息,則須要向遠端 broker 獲取,該方法會返回拉取元數據所耗費的時間。在消息發送時的最大等待時間時會扣除該部分損耗的時間。 >舒適提示:本文不打算對該方法進行深刻學習,後續會有專門的文章來分析 Kafka 元數據的同步機制,相似於專門介紹 RocketMQ 的 Nameserver 相似。
KafkaProducer#doSend
byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); }
Step2:序列化 key。注意:序列化方法雖然有傳入 topic、Headers 這兩個屬性,但參與序列化的只是 key 。
KafkaProducer#doSend
byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); }
Step3:對消息體內容進行序列化。
KafkaProducer#doSend
int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition);
Step4:根據分區負載算法計算本次消息發送該發往的分區。其默認實現類爲 DefaultPartitioner,路由算法以下:
KafkaProducer#doSend
setReadOnly(record.headers()); Header[] headers = record.headers().toArray();
Step5:若是是消息頭信息(RecordHeaders),則設置爲只讀。
KafkaProducer#doSend
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize);
Step5:根據使用的版本號,按照消息協議來計算消息的長度,並是否超過指定長度,若是超過則拋出異常。
KafkaProducer#doSend
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
Step6:先初始化消息時間戳,並對傳入的 Callable(回調函數) 加入到攔截器鏈中。
KafkaProducer#doSend
if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp);
Step7:若是事務處理器不爲空,執行事務管理相關的,本節不考慮事務消息相關的實現細節,後續估計會有對應的文章進行解析。
KafkaProducer#doSend
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
Step8:將消息追加到緩存區,這將是本文重點須要探討的。若是當前緩存區已寫滿或建立了一個新的緩存區,則喚醒 Sender(消息發送線程),將緩存區中的消息發送到 broker 服務器,最終返回 future。這裏是經典的 Future 設計模式,從這裏也能得知,doSend 方法執行完成後,此時消息還不必定成功發送到 broker。
KafkaProducer#doSend
} catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method this.interceptors.onSendError(record, tp, e); throw e; }
Step9:針對各類異常,進行相關信息的收集。
接下來將重點介紹如何將消息追加到生產者的發送緩存區,其實現類爲:RecordAccumulator。
RecordAccumulator#append
public RecordAppendResult append(TopicPartition tp, long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long maxTimeToBlock) throws InterruptedException {
在介紹該方法以前,咱們首先來看一下該方法的參數。
RecordAccumulator#append
Deque<producerbatch> dq = getOrCreateDeque(tp); synchronized (dq) { if (closed) throw new KafkaException("Producer closed while send in progress"); RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq); if (appendResult != null) return appendResult; }
Step1:嘗試根據 topic與分區在 kafka 中獲取一個雙端隊列,若是不存在,則建立一個,而後調用 tryAppend 方法將消息追加到緩存中。Kafka 會爲每個 topic 的每個分區建立一個消息緩存區,消息先追加到緩存中,而後消息發送 API 當即返回,而後由單獨的線程 Sender 將緩存區中的消息定時發送到 broker 。這裏的緩存區的實現使用的是 ArrayQeque。而後調用 tryAppend 方法嘗試將消息追加到其緩存區,若是追加成功,則返回結果。
在講解下一個流程以前,咱們先來看一下 Kafka 雙端隊列的存儲結構:
RecordAccumulator#append
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers)); log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition()); buffer = free.allocate(size, maxTimeToBlock);
Step2:若是第一步未追加成功,說明當前沒有可用的 ProducerBatch,則須要建立一個 ProducerBatch,故先從 BufferPool 中申請 batch.size 的內存空間,爲建立 ProducerBatch 作準備,若是因爲 BufferPool 中未有剩餘內存,則最多等待 maxTimeToBlock ,若是在指定時間內未申請到內存,則拋出異常。
RecordAccumulator#append
synchronized (dq) { // Need to check if producer is closed again after grabbing the dequeue lock. if (closed) throw new KafkaException("Producer closed while send in progress"); // 省略部分代碼 MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic); ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds()); FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds())); dq.addLast(batch); incomplete.add(batch); // Don't deallocate this buffer in the finally block as it's being used in the record batch buffer = null; return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true); }
Step3:建立一個新的批次 ProducerBatch,並將消息寫入到該批次中,並返回追加結果,這裏有以下幾個關鍵點:
縱觀 RecordAccumulator append 的流程,基本上就是從雙端隊列獲取一個未填充完畢的 ProducerBatch(消息批次),而後嘗試將其寫入到該批次中(緩存、內存中),若是追加失敗,則嘗試建立一個新的 ProducerBatch 而後繼續追加。
接下來咱們繼續探究如何向 ProducerBatch 中寫入消息。
ProducerBatch #tryAppend
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) { if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) { // [@1](https://my.oschina.net/u/1198) return null; } else { Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); // @2 this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(), recordsBuilder.compressionType(), key, value, headers)); // @3 this.lastAppendTime = now; // FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length, Time.SYSTEM); // @4 // we have to keep every future returned to the users in case the batch needs to be // split to several new batches and resent. thunks.add(new Thunk(callback, future)); // @5 this.recordCount++; return future; } }
代碼@1:首先判斷 ProducerBatch 是否還能容納當前消息,若是剩餘內存不足,將直接返回 null。若是返回 null ,會嘗試再建立一個新的ProducerBatch。
代碼@2:經過 MemoryRecordsBuilder 將消息寫入按照 Kafka 消息格式寫入到內存中,即寫入到 在建立 ProducerBatch 時申請的 ByteBuffer 中。本文先不詳細介紹 Kafka 各個版本的消息格式,後續會專門寫一篇文章介紹 Kafka 各個版本的消息格式。
代碼@3:更新 ProducerBatch 的 maxRecordSize、lastAppendTime 屬性,分別表示該批次中最大的消息長度與最後一次追加消息的時間。
代碼@4:構建 FutureRecordMetadata 對象,這裏是典型的 Future模式,裏面主要包含了該條消息對應的批次的 produceFuture、消息在該批消息的下標,key 的長度、消息體的長度以及當前的系統時間。
代碼@5:將 callback 、本條消息的憑證(Future) 加入到該批次的 thunks 中,該集合存儲了 一個批次中全部消息的發送回執。
流程執行到這裏,KafkaProducer 的 send 方法就執行完畢了,返回給調用方的就是一個 FutureRecordMetadata 對象。
源碼的閱讀比較枯燥,接下來用一個流程圖簡單的闡述一下消息追加的關鍵要素,重點關注一下各個 Future。
上面的消息發送,其實用消息追加來表達更加貼切,由於 Kafka 的 send 方法,並不會直接向 broker 發送消息,而是首先先追加到生產者的內存緩存中,其內存存儲結構以下:ConcurrentMap< TopicPartition, Deque< ProducerBatch>> batches,那咱們天然而然的能夠得知,Kafka 的生產者爲會每個 topic 的每個 分區單獨維護一個隊列,即 ArrayDeque,內部存放的元素爲 ProducerBatch,即表明一個批次,即 Kafka 消息發送是按批發送的。其緩存結果圖以下:
KafkaProducer 的 send 方法最終返回的 FutureRecordMetadata ,是 Future 的子類,即 Future 模式。那 kafka 的消息發送怎麼實現異步發送、同步發送的呢?
其實答案也就蘊含在 send 方法的返回值,若是項目方須要使用同步發送的方式,只須要拿到 send 方法的返回結果後,調用其 get() 方法,此時若是消息還未發送到 Broker 上,該方法會被阻塞,等到 broker 返回消息發送結果後該方法會被喚醒並獲得消息發送結果。若是須要異步發送,則建議使用 send(ProducerRecord< K, V > record, Callback callback),但不能調用 get 方法便可。Callback 會在收到 broker 的響應結果後被調用,而且支持攔截器。
消息追加流程就介紹到這裏了,消息被追加到緩存區後,什麼是會被髮送到 broker 端呢?將在下一篇文章中詳細介紹。
若是文章對您有所幫助的話,麻煩幫忙點個贊,謝謝您的承認與支持。
做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號:中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接:中間件知識星球,一塊兒探討高併發、分佈式服務架構,交流源碼。
</producerbatch></k,></k,></recordmetadata></k,></recordmetadata></k,></recordmetadata>