kafka2.12_1.0.1生產者示範代碼

import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;java

public class ProducerDemo {
//主題
private final static String TOPIC="test02";
//要發送的數據
private final static String CONNECT="this is a message";
public static void main(String[] args) {
// 聲明鏈接屬性
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.157.131:2181");// 聲明zk
properties.put("group.id", "g");// 必需要使用別的組名稱,
// 若是生產者和消費者都在同一組,則不能訪問同一組內的topic數據
properties.put("auto.offset.reset", "smallest");
properties.put("metadata.broker.list", "192.168.157.131:9092");
properties.put("serializer.class", "kafka.serializer.StringEncoder"); //根據本身存儲數據類型選擇不一樣的編碼器

ProducerConfig conf=new ProducerConfig(properties);
//建立Kafka的生產者, key是消息的key的類型, value是消息的類型
Producer<String,String> producer=new Producer<String,String>(conf);
KeyedMessage<String,String> message=new KeyedMessage<String,String>(TOPIC,CONNECT);
producer.send(message);
}
}apache

producer包含一個用於保存待發送消息的緩衝池,緩衝池中消息是還沒來得及傳輸到kafka集羣的消息。位於底層的kafka I/O線程負責將緩衝池中的消息轉換成請求發送到集羣。若是在結束produce時,沒有調用close()方法,那麼這些資源會發生泄露。 
用於創建消費者的相關參數說明及其默認值參見producerconfigs,此處對代碼中用到的幾個參數進行解釋: 
bootstrap.servers:用於初始化時創建連接到kafka集羣,以host:port形式,多個以逗號分隔host1:port1,host2:port2; 
acks:生產者須要server端在接收到消息後,進行反饋確認的尺度,主要用於消息的可靠性傳輸;acks=0表示生產者不須要來自server的確認;acks=1表示server端將消息保存後便可發送ack,而沒必要等到其餘follower角色的都收到了該消息;acks=all(or acks=-1)意味着server端將等待全部的副本都被接收後才發送確認。 
retries:生產者發送失敗後,重試的次數 
batch.size:當多條消息發送到同一個partition時,該值控制生產者批量發送消息的大小,批量發送能夠減小生產者到服務端的請求數,有助於提升客戶端和服務端的性能。 
linger.ms:默認狀況下緩衝區的消息會被當即發送到服務端,即便緩衝區的空間並無被用完。能夠將該值設置爲大於0的值,這樣發送者將等待一段時間後,再向服務端發送請求,以實現每次請求能夠儘量多的發送批量消息。 
batch.size和linger.ms是兩種實現讓客戶端每次請求儘量多的發送消息的機制,它們能夠並存使用,並不衝突。 
buffer.memory:生產者緩衝區的大小,保存的是還將來得及發送到server端的消息,若是生產者的發送速度大於消息被提交到server端的速度,該緩衝區將被耗盡。 
key.serializer,value.serializer說明了使用何種序列化方式將用戶提供的key和vaule值序列化成字節。bootstrap

注意:代碼運行IP必需要配置host映射關係,zookeeper裏記錄的是host名,運行代碼鏈接kafka裏是利用zookeeper裏記錄的host名來訪問kafka,若沒有配置host映射,則會提示「kafka Failed to send messages after 3 tries」異常api

相關文章
相關標籤/搜索