4.kafka API producer

1.Producer流程
首先構建待發送的消息對象ProducerRecord,而後調用KafkaProducer.send方法進行發送。KafkaProducer接收到消息後首先對其進行序列化,而後結合本地緩存的元數據信息一塊兒發送給partitioner去肯定目標分區,最後追加寫入到內存中的消息緩衝池(accumulator)。此時KafkaProducer.send方法成功返回。同時,KafkaProducer中還有一個專門的Sender IO線程負責將緩衝池中的消息分批次發送給對應的broker,完成真正的消息發送邏輯。
java

2.API 實現
2.1建立topic
bin/kafka-topics.sh --create --zookeeper h201:2181,h202:2181,h203:2181 --replication-factor 2 --partitions 3 --topic topic11apache

2.2建立 producer
[hadoop@h201 kkk]$ vi cp.java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class cp {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "h201:9092,h202:9092,h203:9093");//kafka集羣,broker-list
props.put("acks", "all");
props.put("retries", 1);//重試次數
props.put("batch.size", 16384);//批次大小
props.put("linger.ms", 1);//等待時間
props.put("buffer.memory", 33554432);//RecordAccumulator緩衝區大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("topic11", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}bootstrap

[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/javac -classpath /home/hadoop/kafka_2.12-0.10.2.1/libs/kafka-clients-0.10.2.1.jar cp.java
[hadoop@h201 kkk]$ /usr/jdk1.8.0_144/bin/java cp緩存

代碼解釋:
3.1Properties類
繼承於 Hashtable.表示一個持久的屬性集.屬性列表中每一個鍵及其對應值都是一個字符串。
類中properties是配置文件,主要的做用是經過修改配置文件能夠方便的修改代碼中的參數,實現不用改class文件便可靈活變動參數。安全

3.2KafkaProducer類
用於向kafka集羣發佈消息記錄,其中<K, V>爲泛型,指明發送的消息記錄key/value對的類型。
kafka producer(生產者)是線程安全的,多個線程共享一個producer實例,相比於多個producer實例,這樣作的效率更高、更快。
函數

從上述構造函數能夠看出,能夠經過Map和Properties兩種形式傳遞配置信息,用於構造Producer對象,配置信息均爲key/value對。
kafka的包org.apache.kafka.common.serialization中,提供了許多已經實現好的序列化和反序列化的類,能夠直接使用。你也能夠實現本身的序列化和反序列化類(實現Serializer接口),選擇合適的構造函數構造你的Producer類。

方法:
oop

 

3.3 ProducerRecord
消息記錄,記錄了要發送給kafka集羣的消息、分區等信息
topic:必須字段,表示該消息記錄record發送到那個topic。
value:必須字段,表示消息內容。
partition:可選字段,要發送到哪一個分區partition。
key:可選字段,消息記錄的key,可用於計算選定partition。
timestamp:可選字段,時間戳;表示該條消息記錄的建立時間createtime,若是不指定,則默認使用producer的當前時間。
headers:可選字段,(做用暫時不明,待再查證補充)。spa

相關文章
相關標籤/搜索