kafka生產者java客戶端

producerbootstrap

  包含一個用於保存待發送消息的緩衝池,緩衝池中消息是還沒來得及傳輸到kafka集羣的消息。
  位於底層的kafka I/O線程負責將緩衝池中的消息轉換成請求發送到集羣。若是在結束produce時,沒有調用close()方法,那麼這些資源會發生泄露。
 緩存

經常使用配置dom

bootstrap.servers異步

 用於初始化時創建連接到kafka集羣,以host:port形式,多個以逗號分隔host1:port1,host2:port2; 

 

ackside

生產者須要server端在接收到消息後,進行反饋確認的尺度,主要用於消息的可靠性傳輸;acks=0表示生產者不須要來自server的確認;
acks=1表示server端將消息保存後便可發送ack,而沒必要等到其餘follower角色的都收到了該消息;acks=all(or acks=-1)意味着server端將等待全部的副本都被接收後才發送確認。

 

retries性能

生產者發送失敗後,重試的次數 

 


batch.sizethis

當多條消息發送到同一個partition時,該值控制生產者批量發送消息的大小,批量發送能夠減小生產者到服務端的請求數,有助於提升客戶端和服務端的性能。 

 


linger.msspa

默認狀況下緩衝區的消息會被當即發送到服務端,即便緩衝區的空間並無被用完。
能夠將該值設置爲大於0的值,這樣發送者將等待一段時間後,再向服務端發送請求,以實現每次請求能夠儘量多的發送批量消息。

 

batch.size線程

batch.size和linger.ms是兩種實現讓客戶端每次請求儘量多的發送消息的機制,它們能夠並存使用,並不衝突。 

 


buffer.memorycode

生產者緩衝區的大小,保存的是還將來得及發送到server端的消息,若是生產者的發送速度大於消息被提交到server端的速度,該緩衝區將被耗盡。 

 

key.serializer,value.serializer

說明了使用何種序列化方式將用戶提供的key和vaule值序列化成字節。

 

Producer

  

public class Producer {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);  
    private KafkaProducer<String, String> kafkaProducer;  
    private Random random = new Random();  
    private String topic;  
    private int retry;  
    
    public Producer() {
        this("my_init");
    }
    
    public Producer(String topic) {
        this(topic,3);  
    }
    
    public Producer(String topic,int retry) {
         this.topic = topic;  
         this.retry = retry; 
         if (null == kafkaProducer) {  
             Properties props = new Properties();  
             InputStream inStream = null;  
             try {  
                 inStream = this.getClass().getClassLoader().getResourceAsStream("kafka-producer.properties");  
                 props.load(inStream);  
                 kafkaProducer = new KafkaProducer<String, String>(props);  
             } catch (IOException e) {  
                 LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);  
             } finally {  
                 if (null != inStream) {  
                     try {  
                         inStream.close();  
                     } catch (IOException e) {  
                         LOGGER.error("kafkaProducer初始化失敗:" + e.getMessage(), e);  
                     }  
                 }  
             }  
         }  
    }
    
    /** 
     * 經過kafkaProducer發送消息 
     * @param topic 消息接收主題 
     * @param partitionNum  哪個分區 
     * @param retry  重試次數 
     * @param message 具體消息值 
     */  
    public RecordMetadata sendKafkaMessage(final String message) {  
  
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", message);  
        Future<RecordMetadata> meta = kafkaProducer.send(record, new Callback() {  
//send方法是異步的,添加消息到緩存區等待發送,並當即返回,這使生產者經過批量發送消息來提升效率 public void onCompletion(RecordMetadata recordMetadata,Exception exception) { if (null != exception) { LOGGER.error("kafka發送消息失敗:" + exception.getMessage(),exception); retryKakfaMessage(message); } } }); RecordMetadata metadata = null; try { metadata = meta.get(); } catch (InterruptedException e) { } catch (ExecutionException e) {} return metadata; } /** * 當kafka消息發送失敗後,重試 */ private void retryKakfaMessage(final String retryMessage) { ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, random.nextInt(3), "", retryMessage); for (int i = 1; i <= retry; i++) { try { kafkaProducer.send(record); return; } catch (Exception e) { LOGGER.error("kafka發送消息失敗:" + e.getMessage(), e); retryKakfaMessage(retryMessage); } } } /** * kafka實例銷燬 */ public void close() { if (null != kafkaProducer) { kafkaProducer.flush(); kafkaProducer.close(); } } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public int getRetry() { return retry; } public void setRetry(int retry) { this.retry = retry; } }

 

TestProducer

  

public class TestProducer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);  
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for(int i=0;i<3;i++){
            executor.submit(new Runnable() {
                @Override
                public void run() {
                     String topic = "2017-11-6-test";
                     Producer p = new Producer(topic);
                     for(int n=1;n<=5;n++){
                         String str = "hello world => "+n;
                         RecordMetadata message = p.sendKafkaMessage(str);
                         LOGGER.info("發送信息: "+message.topic()+"---"+message.partition()+"---"+message.offset());
                     }
                     p.close();
                }
            });
        }
        System.out.println("this is main");
        executor.shutdown();//這個表示 線程執行完以後自動退出
        System.out.println("hello world");
    }
}
相關文章
相關標籤/搜索