producerbootstrap
包含一個用於保存待發送消息的緩衝池,緩衝池中消息是還沒來得及傳輸到kafka集羣的消息。
位於底層的kafka I/O線程負責將緩衝池中的消息轉換成請求發送到集羣。若是在結束produce時,沒有調用close()方法,那麼這些資源會發生泄露。
緩存
經常使用配置dom
bootstrap.servers異步
用於初始化時創建連接到kafka集羣,以host:port形式,多個以逗號分隔host1:port1,host2:port2;
ackside
生產者須要server端在接收到消息後,進行反饋確認的尺度,主要用於消息的可靠性傳輸;acks=0表示生產者不須要來自server的確認;
acks=1表示server端將消息保存後便可發送ack,而沒必要等到其餘follower角色的都收到了該消息;acks=all(or acks=-1)意味着server端將等待全部的副本都被接收後才發送確認。
retries性能
生產者發送失敗後,重試的次數
batch.sizethis
當多條消息發送到同一個partition時,該值控制生產者批量發送消息的大小,批量發送能夠減小生產者到服務端的請求數,有助於提升客戶端和服務端的性能。
linger.msspa
默認狀況下緩衝區的消息會被當即發送到服務端,即便緩衝區的空間並無被用完。
能夠將該值設置爲大於0的值,這樣發送者將等待一段時間後,再向服務端發送請求,以實現每次請求能夠儘量多的發送批量消息。
batch.size線程
batch.size和linger.ms是兩種實現讓客戶端每次請求儘量多的發送消息的機制,它們能夠並存使用,並不衝突。
buffer.memorycode
生產者緩衝區的大小,保存的是還將來得及發送到server端的消息,若是生產者的發送速度大於消息被提交到server端的速度,該緩衝區將被耗盡。
key.serializer,value.serializer
說明了使用何種序列化方式將用戶提供的key和vaule值序列化成字節。
Producer
public class Producer { private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class); private KafkaProducer<String, String> kafkaProducer; private Random random = new Random(); private String topic; private int retry; public Producer() { this("my_init"); } public Producer(String topic) { this(topic,3); } public Producer(String topic,int retry) { this.topic = topic; this.retry = retry; if (null == kafkaProducer) { Properties props = new Properties(); InputStream inStream = null; try { inStream = this.getClass().getClassLoader().getResourceAsStream("kafka-producer.properties"); props.load(inStream); kafkaProducer = new KafkaProducer<String, String>(props); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e); } finally { if (null != inStream) { try { inStream.close(); } catch (IOException e) { LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e); } } } } } /** * 經過kafkaProducer發送消息 * @param topic 消息接收主題 * @param partitionNum 哪個分區 * @param retry 重試次數 * @param message 具體消息值 */ public RecordMetadata sendKafkaMessage(final String message) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", message); Future<RecordMetadata> meta = kafkaProducer.send(record, new Callback() {
//send方法是異步的,添加消息到緩存區等待發送,並當即返回,這使生產者經過批量發送消息來提升效率 public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null != exception) { LOGGER.error("kafka發送消息失敗:" + exception.getMessage(),exception); retryKakfaMessage(message); } } }); RecordMetadata metadata = null; try { metadata = meta.get(); } catch (InterruptedException e) { } catch (ExecutionException e) {} return metadata; } /** * 當kafka消息發送失敗後,重試 */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka發送消息失敗:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka實例銷燬 */ public void close() { if (null != kafkaProducer) { kafkaProducer.flush(); kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }
TestProducer
public class TestProducer { private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class); public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); for(int i=0;i<3;i++){ executor.submit(new Runnable() { @Override public void run() { String topic = "2017-11-6-test"; Producer p = new Producer(topic); for(int n=1;n<=5;n++){ String str = "hello world => "+n; RecordMetadata message = p.sendKafkaMessage(str); LOGGER.info("發送信息: "+message.topic()+"---"+message.partition()+"---"+message.offset()); } p.close(); } }); } System.out.println("this is main"); executor.shutdown();//這個表示 線程執行完以後自動退出 System.out.println("hello world"); } }