本文將學習Kafka生產者的使用和原理,文中使用的kafka-clients版本號爲2.6.0。下面進入正文,先經過一個示例看下如何使用生產者API發送消息。node
public class Producer { public static void main(String[] args) { // 1. 配置參數 Properties properties = new Properties(); properties.put("bootstrap.servers", "localhost:9092"); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 2. 根據參數建立KafkaProducer實例(生產者) KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 3. 建立ProducerRecord實例(消息) ProducerRecord<String, String> record = new ProducerRecord<>("topic-demo", "hello kafka"); // 4. 發送消息 producer.send(record); // 5. 關閉生產者示例 producer.close(); } }
首先建立一個Properties實例,設置了三個必填參數:apache
因爲broker但願接受的是字節數組,因此須要將消息中的鍵值序列化成字節數組。在設置好參數後,根據參數建立KafkaProducer實例,也就是用於發送消息的生產者,接着再建立準備發送的消息ProducerRecord實例,而後使用KafkaProducer的send方法發送消息,最後再關閉生產者。bootstrap
關於KafkaProducer,咱們先記住兩點:api
關於配置咱們先只瞭解這三個必填參數,下面咱們看下send方法,關於發送消息的方式有三種:數組
發送並忘記(fire-and-forget)
在發送消息給Kafka時,不關心消息是否正常到達,只負責成功發送,存在丟失消息的可能。上面給出的示例就是這種方式。緩存
Future<RecordMetadata> recordMetadataFuture = producer.send(record); RecordMetadata recordMetadata = recordMetadataFuture.get();
RecordMetadata對象中包含有消息的一些元數據,如消息的主題、分區號、分區中的偏移量、時間戳等。markdown
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); } else { System.out.println(recordMetadata.topic() + "-" + recordMetadata.partition() + ":" + recordMetadata.offset()); } } });
onCompletion有兩個參數,其類型分別是RecordMetadata和Exception。當消息發送成功時,recordMetadata爲非null,而e將爲null。當消息發送失敗時,則反之。
下面咱們認識下消息對象ProducerRecord,封裝了發送的消息,其定義以下:架構
public class ProducerRecord<K, V> { private final String topic; // 主題 private final Integer partition; // 分區號 private final Headers headers; // 消息頭部 private final K key; // 鍵 private final V value; // 值 private final Long timestamp; // 時間戳 // ...其餘構造方法和成員方法 }
其中主題和值爲必填,其他非必填。例如當給出了分區號,則至關於指定了分區,而當未給出分區號時,若給出了鍵,則可用於計算分區號。關於消息頭部和時間戳,暫不講述。app
在對生產者對象KafkaProducer和消息對象ProducerRecord有了認識後,下面咱們看下在使用生產者發送消息時,會使用到的組件有生產者攔截器、序列化器和分區器。其架構(部分)以下:異步
下面結合代碼來看下處理過程,加深印象。
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // 攔截器,攔截消息進行處理 ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
上面是KafkaProducer的send方法,首先會將消息傳給攔截器的onSend方法,而後進入doSend方法。其中doSend方法較長,但內容並不複雜,下面給出了主要步驟的註釋。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { throwIfProducerClosed(); // 1.確認數據發送到的topic的metadata可用 long nowMs = time.milliseconds(); ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); } catch (KafkaException e) { if (metadata.isClosed()) throw new KafkaException("Producer closed while send in progress", e); throw e; } nowMs += clusterAndWaitTime.waitedOnMetadataMs; long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; // 2.序列化器,序列化消息的key和value byte[] serializedKey; try { serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() + " specified in key.serializer", cce); } byte[] serializedValue; try { serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException cce) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() + " specified in value.serializer", cce); } // 3.分區器,獲取或計算分區號 int partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(), compressionType, serializedKey, serializedValue, headers); ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? nowMs : record.timestamp(); if (log.isTraceEnabled()) { log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); } Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) { transactionManager.failIfNotReadyForSend(); } // 4.消息累加器,緩存消息 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs); if (result.abortForNewBatch) { int prevPartition = partition; partitioner.onNewBatch(record.topic(), cluster, prevPartition); partition = partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (log.isTraceEnabled()) { log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition); } // producer callback will make sure to call both 'callback' and interceptor callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs); } if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); // 5.若是batch滿了或者消息大小超過了batch的剩餘空間須要建立新的batch // 將喚醒sender線程發送消息 if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future; } catch (ApiException e) { log.debug("Exception occurred during message send:", e); if (callback != null) callback.onCompletion(null, e); this.errors.record(); this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (KafkaException e) { this.errors.record(); this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { this.interceptors.onSendError(record, tp, e); throw e; } }
doSend方法主要分爲5個步驟:
關於meatadata本文將不深究,序列化器、分區器前文也給出了介紹。下面咱們主要看下消息累加器。
消息累加器,其做用是用於緩存消息,以便批量發送消息。在RecordAccumulator中用一個ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches的map變量保存消息。做爲key的TopicPartition封裝了topic和分區號,而對應的value爲ProducerBatch的雙端隊列,也就是將發往同一個分區的消息緩存在ProducerBatch中。在發送消息時,Record會被追加在隊列的尾部,即加入到尾部的ProducerBatch中,若是ProducerBatch的空間不足或隊列爲空,則將建立新的ProducerBatch,而後追加。當ProducerBatch滿了或建立新的ProducerBatch時,將喚醒Sender線程從隊列的頭部獲取ProducerBatch進行發送。
RecordAccumulator
在Sender線程中會將待發送的ProducerBatch將轉換成<Integer, List<ProducerBatch>>的形式,按Kafka節點的ID進行分組,而後將同一個node的ProducerBatch放在一個請求中發送。
Kafak生產者的內容就先了解到這,下面經過思惟導圖對本文內容作一個簡單的回顧: