緊接着Kafka Producer發送消息這一篇文章繼續往深刻探索。java
這裏介紹一下ProducerRecord
,他不是單純的消息,它包含了多個屬性。類定義以下:apache
public class ProducerRecord<K, V> {
private final String topic; //主題
private final Integer partition;//分區號
private final Headers headers;//消息頭部
private final K key;//鍵
private final V value;//值
private final Long timestamp;//消息時間戳
//省略其餘
}
複製代碼
key用來指定消息的鍵,它不只是消息的附加信息,還能夠用來計算分區號進而讓消息發往特定的分區。後面再說 對於的構造函數有不少種,咱們用的是最簡單的一種。參考以下:api
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) public ProducerRecord(String topic, V value) //示例代碼中用的這個,至關於將其餘屬性所有設置爲null 複製代碼
再來看看send(ProducerRecord,Callback)方法緩存
public class KafkaProducer<K, V> implements Producer<K, V> {
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
1.1
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
···
}
複製代碼
消息在發送前會先調用org.apache.kafka.clients.producer.ProducerInterceptor#onSend
方法來對消息作定製化處理。 doSend(ProducerRecord, Callback)方法bash
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
// 確認生產者實例沒有被關閉
throwIfProducerClosed();
// 首先確認主題的元數據是可用的
ClusterAndWaitTime clusterAndWaitTime;
try {
// 2.1
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
// 2.2
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
// 2.3
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 2.4
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
// 2.5
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
// 2.6
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
// 2.7
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
···
}
複製代碼
KafkaProducer#waitOnMetadata
,負責觸發Kafka集羣元數據的更新,並阻塞主線程等待更新完畢,底層會喚醒sender線程更新metadata保存的Kafka集羣元信。網絡
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
// 獲取Cluster信息
Cluster cluster = metadata.fetch();
// 若是cluster中的不合法主題列表包含指定主題,則拋出異常InvalidTopicException
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
// 添加主題,下面會介紹
metadata.add(topic);
// 獲取Topic中分區數量
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 若是分區數不爲空,而且partition不爲空或者partition小於分區數,則返回一個ClusterAndWaitTime
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long begin = time.milliseconds();
long remainingWaitMs = maxWaitMs;
long elapsed;
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic);
// 獲取當前元數據版本號
int version = metadata.requestUpdate();
// 喚醒sender線程
sender.wakeup();
try {
// 阻塞等到元數據更新
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
// 再次獲取cluster
cluster = metadata.fetch();
elapsed = time.milliseconds() - begin;
// 若是更新元數據的時間超過了最大等待時間,則跑出TimeoutException異常
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
// 這一步其實就是校驗元數據更新後,topic是否合法,若是不合法,則拋出異常
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
// 獲取獲取Topic中分區數量
partitionsCount = cluster.partitionCountForTopic(topic);
// 循環條件:分區數等於空或者傳進來的partition不等於空而且partition小於分區數
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
複製代碼
// 元數據添加主題
metadata.add(topic);
org.apache.kafka.clients.producer.internals.ProducerMetadata#add(String)
public synchronized void add(String topic) {
Objects.requireNonNull(topic, "topic cannot be null");
// 若是topics裏面不包含傳入的topic,則更新topic列表
if (topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE) == null) {
requestUpdateForNewTopics();
}
}
public synchronized void requestUpdateForNewTopics() {
// Override the timestamp of last refresh to let immediate update.
this.lastRefreshMs = 0; // 將最近一次刷新時間置爲0
this.requestVersion++;
requestUpdate();
}
public synchronized int requestUpdate() {
this.needUpdate = true; // 將更新字段needUpdate設置true,表示須要強制更新
return this.updateVersion; // 返回更新後版本值
}
複製代碼
序列化record.key()併發
序列化record.valueapp
消息發送的過程當中嗎,若是ProducerRecord
中指定了partition字段,那麼就不須要分區器的做用,由於該字段表明的就是所要發往的分區號。 若是沒有指定partition字段,那麼就須要依賴分區器,根據key這個字段來計算partition的值。分區器的做用就是爲消息分配分區。 Kafka中默認的分區器是org.apache.kafka.clients.producer.internals.DefaultPartitioner#partition
。ide
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
// 若是key不爲空,則會根據key進行哈希算出分區號,具備相同key的消息會被寫入同一個分區
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
複製代碼
private void ensureValidRecordSize(int size) {
if (size > this.maxRequestSize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the maximum request size you have configured with the " +
ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
" configuration.");
if (size > this.totalMemorySize)
throw new RecordTooLargeException("The message is " + size +
" bytes when serialized which is larger than the total memory buffer you have configured with the " +
ProducerConfig.BUFFER_MEMORY_CONFIG +
" configuration.");
}
複製代碼
RecordAccumulator
中RecordAccumulator
主要用來緩存消息以便Sender線程能夠批量發送,進而減小網絡傳輸的資源消耗。函數
public final class RecordAccumulator {
// 指定每一個ProducerBatch底層ByteBuffer的大小
private final int batchSize;
// 壓縮類型
private final CompressionType compression;
// BufferPool對象
private final BufferPool free;
// TopicPartition與ProducerBatch集合的映射關係
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
// 沒有應答的ProducerBatch集合,包括髮送與沒發送的,底層是一個Set<ProducerBatch>
private final IncompleteBatches incomplete;
private int drainIndex;
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
// 2.6.1
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 2.6.2
buffer = free.allocate(size, maxTimeToBlock);
// 2.6.3
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
return appendResult;
}
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
dq.addLast(batch);
incomplete.add(batch);
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
複製代碼
查找TopicPartition對應的Deque,如查不到,則建立新Deque,並添加到batchs集合,加鎖調用tryAppend()試圖添加消息,若添加成功則返回RecordAppendResult。
2.6.1 追加消息失敗後,嘗試從BufferPool中申請新的ByteBuffer,可能會致使阻塞,因此有這裏沒加鎖,可能多個線程同時申請ByteBuffer。
從新加鎖重試,調用tryAppend方法,是爲了防止多個線程併發申請空間後,形成內部碎片。追加成功,則返回,若失敗,則使用2.6.2 申請的ByteBuffer建立ProducerBatch
,而後將消息添加到新建立的ProducerBatch
中,將ProducerBatch
添加到Deque,添加到incomplete集合中,返回RecordAppendResult。
// 查找Deque裏面最後一個ProducerBatch對象,並將消息追加到ProducerBatch
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
複製代碼
喚醒線程的條件是消息所在隊列的最後一個ProducerBatch
滿了或此隊列中不止一個ProducerBatch
或者這是一個新建立的ProducerBatch
Sender線程將在下一篇文章詳細介紹
整個生產者客戶端有兩個線程協調進行,分別是主線程與sender線程(也就是實際的發送線程)。主線程首先將業務數據封裝成ProductRecord
對象,,而後經過攔截器、序列化器與分區器的做用之後將消息放入RecordAccumulator
(也被稱爲消息收集器)。Sender線程負責將消息信息構成請求,並最終執行網絡I/O的線程,從RecordAccumulator
中獲取消息並批量發送到Kafka中。
若是有地方有疑惑或者寫的有很差,能夠評論或者經過郵箱聯繫我creazycoder@sina.com
相關參考:
圖片來自《深刻理解Kafka核心設計與實踐原理》
《Apache Kafka 源碼剖析》
《深刻理解Kafka核心設計與實踐原理》