生產者代碼以下:java
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.1</version>
</dependency>
複製代碼
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerStart {
private static final String topic = "topic-demo";
private static final String brokeList = "localhost:9092";
public static Properties initConfig() {
Properties props = new Properties();
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokeList);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
// props.put("acks", "all");
// props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("client.id", "producer.client.id.demo");
// props.put("bootstrap.server", brokeList);
return props;
}
public static void main(String[] args) {
Properties properties = initConfig();
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello,Kafka!");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
複製代碼
先來看看KafkaProducer
類的註釋,通常來講,開源項目源碼都是言簡意賅apache
The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server as well as a background I/O thread that is responsible for turning these records into requests and transmitting them to the cluster. Failure to close the producer after use will leak these resources. The {@link #send(ProducerRecord) send()} method is asynchronous. When called it adds the record to a buffer of pending record sends and immediately returns. This allows the producer to batch together individual records for efficiency. The <code>acks</code> config controls the criteria under which requests are considered complete. The "all" setting we have specified will result in blocking on the full commit of the record, the slowest but most durable setting. 複製代碼
我來翻譯一下: 第一句意思是生成者由一個緩衝空間池組成,其中保存了還沒有傳輸到服務器的消息 以及有個後臺I/O線程負責將這些消息轉換爲請求並將其傳輸到集羣的。這說明Producer
類並非真正的發送類,而是將消息存儲起來,等待另外一個線程將消息發送到服務端。bootstrap
第二句,send()方法是異步的,當被調用時,它將消息添加到掛起消息發送的緩衝區中並當即返回。這容許生產者批量處理單個記錄以提升效率。這裏也說明Producer
不是真正的發送者,而是將消息存在緩衝區中就返回。bash
第三局,acks配置控制了被認爲是完整請求的條件。示例代碼中指定acks=all,這將致使阻塞記錄的完整提交,這是最慢但最持久的設置。查看org.apache.kafka.clients.producer.ProducerConfig#ACKS_CONFIG
發現,下面還有個ACKS的解釋ACKS_DOC。 這個參數用來指定分區中必須有多少個副本收到這條消息,以後生產者纔會認爲這條消息是成功寫入的。acks有3種類型的值(字符串類型)服務器
發送消息主要有三種模式:發後即忘(fire-and-forget),同步(sync),異步(async)。示例代碼中用的就是發後即忘,只管往Kafka中發送消息而不關係消息是否正確到達。在大多數狀況下,這種發送方式沒有什麼問題,不過在某些時候會形成消息的丟失。該方式性能最高,可靠性最差。 KakfaProducer
的send()方法是 Future 類型。send有兩個重載方法,具體定義以下:異步
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
return send(record, null);
}
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) 複製代碼
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
複製代碼
實際上send方法自己就是異步的。調用get()方法來阻塞等待kafka的響應,直到消息發送成功或者發生異常。若是發生異常,那麼須要捕獲並交由外層邏輯處理。 KakfaProducer
中通常會出現兩種類型異常:一種是可重試異常與不可重試異常。 對於可重試異常,若是配置了retries參數,那麼只要在規定的重試次數內自行恢復,就不會拋出異常。retries參數默認爲0,配置方式以下:async
props.put(ProducerConfig.RETRIES_CONFIG,3);
//這表示配置了3次重試,若是重試了3次以後尚未恢復,那麼仍會拋出異常,進外層邏輯就要處理這些異常了
複製代碼
同步發送的方式性能很低,須要阻塞等待一條消息發送完之後才能發出下一條。ide
通常是send()方法裏面指定一個Callback的回調函數,Kafka在返回響應時調用該函數來實現異步的發送確認。 使用Callback的方式很是簡單明瞭,Kafka有響應就會回調,要麼發送成功要麼拋出異常,發送方式以下:函數
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
//異常不爲空,說明發送失敗
//實際項目中能夠根據具體業務進行相應的處理
exception.printStackTrace();
} else {
//說明消息發送成功
System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset());
}
}
});
複製代碼
onCompletion()方法裏面的兩個參數是互斥的。之因此是互斥的,是由於這個。這個下次再講post
org.apache.kafka.clients.producer.internals.ProducerBatch#completeFutureAndFireCallbacks
private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
// Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
produceFuture.set(baseOffset, logAppendTime, exception);
// execute callbacks
for (Thunk thunk : thunks) {
try {
// 判斷異常是否爲空
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
if (thunk.callback != null)
thunk.callback.onCompletion(metadata, null);
} else {
if (thunk.callback != null)
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
複製代碼
消息發送成功時,metadata不爲null而exception爲null;反之metadata爲null而exception不爲null。
若是有地方有疑惑或者寫的有很差,能夠評論或者經過郵箱聯繫我creazycoder@sina.com
相關參考: 《深刻理解Kafka核心設計與實踐原理》