Kafka之Producer

經過https://www.cnblogs.com/tree1123/p/11243668.html 已經對consumer有了必定的瞭解。producer比consumer要簡單一些。html

1、舊版本producer

0.9.0.0版本之前,是由scala編寫的舊版本producer。java

入口類:kafka.producer.Producer算法

代碼示例:apache

Properties properties = new Properties();
		properties.put("metadata.broker.list", "kafka01:9092,kafka02:9092");
		properties.put("serializer.class", "kafka.serializer.StringEncoder");
		properties.put("request.requird.acks", "1");
		ProducerConfig config = new ProducerConfig(properties);
		Producer<String, String> producer = new Producer<String, String>(config);
		KeyedMessage<String,String> msg = new KeyedMessage<String,String>("topic","hello");
		Producer.send(msg);

舊版本是同步機制,等待響應。吞吐性不好。在0.9.0.0版本之後,正式下架了。bootstrap

舊版本的方法:安全

send   發送
close   關閉
sync   異步發送  有丟失消息的可能性

2、新版本producer

舊版本producer由scala編寫,0.9.0.0版本之後,新版本producer由java編寫。網絡

新版本主要入口類是:org.apache.kafka.clients.producer.KafkaProducer多線程

經常使用方法:app

send  實現消息發送主邏輯
close  關閉producer   
metrics  獲取producer的實時監控指標數據 好比發送消息的速率

Kafka producer要比consumer設計簡單一些,主要就是向某個topic的某個分區發送一條消息。partitioner決定向哪一個分區發送消息。用戶指定key,默認的分區器會根據key的哈希值來選擇分區,若是沒有指定key就以輪詢的方式選擇分區。也能夠自定義分區策略。異步

肯定分區後,producer尋找到分區的leader,也就是該leader所在的broker,而後發送消息,leader會進行副本同步ISR。

producer會啓兩個線程,主線程封裝ProducerRecord類,序列化後發給partitioner,而後發送到內存緩衝區。

另外一個I/O線程,提取消息分batch統一發送給對應的broker。

示例代碼:

Properties properties = new Properties();
		properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
		properties.put("acks", "all");
		properties.put("retries", 0);
		properties.put("batch.size", 16384);
		properties.put("linger.ms", 1);
		properties.put("buffer.memory", 33554432);
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
		for (int i = 1; i <= 600; i++) {
			kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
			System.out.println("testkafka"+i);
		}
		kafkaProducer.close();

一、構造Properties對象,bootstrap.servers key.serializer value.serializer是必須指定的。

二、使用Properties構造KafkaProducer對象。

三、構造ProducerRecord 指定topic 分區 key value。

四、KafkaProducer的send方法發送。

五、關閉KafkaProducer。

Properties主要參數:

bootstrap.servers 和consumer同樣,指定部分broker便可。並且broker端若是沒有配ip地址,要寫成主機名。

key.serializer value.serializer 序列化參數 必定要全類名 沒有key也必須設置。

acks 三個值

​ 0: producer徹底無論broker的處理結果 回調也就沒有用了 並不能保證消息成功發送 可是這種吞吐量最高

​ all或者-1: leader broker會等消息寫入 而且ISR都寫入後 纔會響應,這種只要ISR有副本存活就確定不會丟失,但吞 吐量最低。

​ 1: 默認的值 leader broker本身寫入後就響應,不會等待ISR其餘的副本寫入,只要leader broker存活就不會丟失,即保證了不丟失,也保證了吞吐量。

buffer.memory 緩衝區大小 字節 默認是33554432 就是發送消息的內存緩衝區大小 太小的話會影響吞吐量

compression.type 設置是否壓縮消息 默認值是none 壓縮後能夠下降IO開銷提升吞吐,可是會增大CPU開銷。

​ 支持三種: GZIP Snappy LZ4 性能 LZ4 > Snappy > GZIP

retries 發送消息重試的次數 默認0 不重試 重試可能形成重複發送 可能形成亂序

​ retry.backoff.ms 設置重試間隔 默認100毫秒

batch.size 調優重要的參數 batch小 吞吐量也會小 batch大 內存壓力會大 默認值是16384 16KB

linger.ms 發送延時 默認是0 0的話不用等batch滿就發送 延時的話能夠提升吞吐 看具體狀況進行調整

max.request.size producer可以發送最大消息的大小 默認1048576字節 若是消息很大 須要修改它

request.timeout.ms 發送請求後broker在規定時間返回 默認30秒 超過就是超時了。

Send方法

fire and forget 就是上邊的示例

Properties properties = new Properties();
		properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
		properties.put("acks", "all");
		properties.put("retries", 0);
		properties.put("batch.size", 16384);
		properties.put("linger.ms", 1);
		properties.put("buffer.memory", 33554432);
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
		for (int i = 1; i <= 600; i++) {
			kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i));
			System.out.println("testkafka"+i);
		}
		kafkaProducer.close();

異步回調 不阻塞

Properties properties = new Properties();
		properties.put("bootstrap.servers", "kafka01:9092,kafka02:9092");
		properties.put("acks", "all");
		properties.put("retries", 0);
		properties.put("batch.size", 16384);
		properties.put("linger.ms", 1);
		properties.put("buffer.memory", 33554432);
		properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
		for (int i = 1; i <= 600; i++) {
			kafkaProducer.send(new ProducerRecord<String, String>("z_test_20190430", "testkafka0613"+i),new Callback(){
              public void onCompletion(RecordMetadata metadata, Exception e) {
                         if(e != null) {
                            e.printStackTrace();
                         } else {
                            System.out.println("The offset of the record we just sent is: " + 		metadata.offset());
                         }
                     }           
			});
			System.out.println("testkafka"+i);
		}
		kafkaProducer.close();

同步發送 無限等待返回

producer.send(record).get()

重試機制

若是須要自定義重試機制,就要在回調裏對不一樣異常區別對待,常見的幾種以下:

可重試異常

LeaderNotAvailableException :分區的Leader副本不可用,這多是換屆選舉致使的瞬時的異常,重試幾回就能夠恢復 NotControllerException:Controller主要是用來選擇分區副本和每個分區leader的副本信息,主要負責統一管理分區信息等,也多是選舉所致。

NetWorkerException :瞬時網絡故障異常所致。

不可重試異常

SerializationException:序列化失敗異常

RecordToolLargeException:消息尺寸過大致使。

示例代碼:

producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e ==null){
                               //正常處理邏輯
                               System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
                               
                           }else{
                                   
                                 if(e instanceof RetriableException) {
                                    //處理可重試異常
                                    ......
                                 } else {
                                    //處理不可重試異常
                                    ......
                                 }
                           }
                       }
                   });
分區機制

partitioner決定向哪一個分區發送消息。用戶指定key,默認的分區器會根據key的哈希值來選擇分區,若是沒有指定key就以輪詢的方式選擇分區。也能夠自定義分區策略。

對於有key的消息,java版本的producer自帶的partitioner會根據murmur2算法計算消息key的哈希值。而後對總分區數求模獲得消息要被髮送到的目標分區號。

自定義分區策略:

建立一個類,實現org.apache.kafka.clients.producer.Partitioner接口

主要分區邏輯在Partitioner.partition中實現:經過topic key value 一同肯定分區

在構造KafkaProducer得Properties中設置partitioner.class 爲自定義類 注意是全類名

序列化機制

經常使用的serializer

ByteArraySerializer.class

ByteBufferSerializer.class

BytesSerializer.class

DoubleSerializer.class

IntegerSerializer.class

LongSerializer.class

StringSerializer.class

可是其餘一些複雜的就須要自定義序列化:

一、定義數據格式

二、建立自定義序列化類,實現org.apache.kafka.common.serialization.Serializer接口

三、在KafkaProducer的Properties中設置key.serializer value.serializer爲自定義類

以上均爲單線程的狀況,但producer是線程安全的,單線程適合分區較少的狀況,分區較多能夠多線程但對內存損耗較大。

更多實時計算,Kafka等相關技術博文,歡迎關注實時流式計算

相關文章
相關標籤/搜索