Kafka Producer Consumer

Producer API

org.apache.kafka.clients.producer.KafkaProducerapache

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。bootstrap

 
1 props.put("bootstrap.servers", "192.168.1.128:9092"); 2 props.put("acks", "all"); 3 props.put("retries", 0); 4 props.put("batch.size", 16384); 5 props.put("linger.ms", 1); 6 props.put("buffer.memory", 33554432); 7 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 8 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 9 10 Producer<String, String> producer = new KafkaProducer<String, String>(props); 11 for (int i = 0; i < 10; i++) { 12 producer.send(new ProducerRecord<String, String>("foo", Integer.toString(i), Integer.toString(i)), new Callback() { 13 @Override 14 public void onCompletion(RecordMetadata recordMetadata, Exception e) { 15 if (null != e) { 16 e.printStackTrace(); 17 }else { 18 System.out.println("callback: " + recordMetadata.topic() + " " + recordMetadata.offset()); 19 } 20 } 21 }); 22 } 23 producer.close();

producer由一個緩衝池組成,這個緩衝池中維護着那些尚未被傳送到服務器上的記錄,並且有一個後臺的I/O線程負責將這些記錄轉換爲請求並將其傳送到集羣上去。服務器

send()方法是異步的。當調用它之後就把記錄放到buffer中並當即返回。這就容許生產者批量的發送記錄。異步

acks配置項控制的是完成的標準,即什麼樣的請求被認爲是完成了的。本例中其值設置的是"all"表示客戶端會等待直到全部記錄徹底被提交,這是最慢的一種方式也是持久化最好的一種方式。分佈式

若是請求失敗了,生產者能夠自動重試。由於這裏咱們設置retries爲0,因此它不重試。ide

生產者對每一個分區都維護了一個buffers,其中放的是未被髮送的記錄。這些buffers的大小是經過batch.size配置項來控制的。微服務

默認狀況下,即便一個buffer還有未使用的空間(PS:buffer沒滿)也會當即發送。若是你想要減小請求的次數,你能夠設置linger.ms爲一個大於0的數。這個指令將告訴生產者在發送請求以前先等待多少毫秒,以但願能有更多的記錄到達好填滿buffer。在本例中,咱們設置的是1毫秒,表示咱們的請求將會延遲1毫秒發送,這樣作是爲了等待更多的記錄到達,1毫秒以後即便buffer沒有被填滿,請求也會發送。(PS:稍微解釋一下這段話,producer調用send()方法只是將記錄放到buffer中,而後由一個後臺線程將buffer中的記錄傳送到服務器上。這裏所說的請求指的是從buffer到服務器。默認狀況下記錄被放到buffer之後當即被髮送到服務器,爲了減小請求服務器的次數,能夠經過設置linger.ms,這個配置項表示等多少毫秒之後再發送,這樣作是但願每次請求能夠發送更多的記錄,以此減小請求次數)源碼分析

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。性能

buffer.memory控制的是總的buffer內存數量學習

key.serializer 和 value.serializer表示怎樣將key和value對象轉成字節

從kafka 0.11開始,KafkaProducer支持兩種模型:the idempotent producer and the transactional producer(冪等producer和事務producer)。冪等producer強調的是至少一次精確的投遞。事務producer容許應用程序原子的發送消息到多個分區或者主題。

爲了啓用冪等性,必須將enable.idempotence這個配置的值設爲true。若是你這樣設置了,那麼retries默認是Integer.MAX_VALUE,而且acks默認是all。爲了利用冪等producer的優點,請避免應用程序級別的從新發送。

爲了使用事務producer,你必須配置transactional.id。若是transactional.id被設置,冪等性自動被啓用。

 
1 Properties props = new Properties(); 2 props.put("bootstrap.servers", "192.168.1.128:9092"); 3 props.put("transactional.id", "my-transactional-id"); 4 5 Producer<String, String> producer = new KafkaProducer<String, String>(props, new StringSerializer(), new StringSerializer()); 6 7 producer.initTransactions(); 8 9 try { 10 producer.beginTransaction(); 11 12 for (int i = 11; i < 20; i++) { 13 producer.send(new ProducerRecord<String, String>("bar", Integer.toString(i), Integer.toString(i))); 14 } 15 // This method will flush any unsent records before actually committing the transaction 16 producer.commitTransaction(); 17 } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { 18 producer.close(); 19 } catch (KafkaException e) { 20 // By calling producer.abortTransaction() upon receiving a KafkaException we can ensure 21 // that any successful writes are marked as aborted, hence keeping the transactional guarantees. 22 producer.abortTransaction(); 23 } 24 25 producer.close();

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

Consumer API

org.apache.kafka.clients.consumer.KafkaConsumer

Offsets and Consumer Position

對於分區中的每條記錄,kafka維護一個數值偏移量。這個偏移量是分區中一條記錄的惟一標識,同時也是消費者在分區中的位置。例如,一個消費者在分區中的位置是5,表示它已經消費了偏移量從0到4的記錄,而且接下來它將消費偏移量爲5的記錄。相對於消費者用戶來講,這裏實際上有兩個位置的概念。

消費者的position表示下一條將要消費的記錄的offset。每次消費者經過調用poll(long)接收消息的時候這個position會自動增長。

committed position表示已經被存儲的最後一個偏移量。消費者能夠自動的週期性提交offsets,也能夠經過調用提交API(e.g. commitSync and commitAsync)手動的提交position。

Consumer Groups and Topic Subscriptions

Kafka用"consumer groups"(消費者組)的概念來容許一組進程分開處理和消費記錄。這些處理在同一個機器上進行,也能夠在不一樣的機器上。同一個消費者組中的消費者實例有相同的group.id

組中的每一個消費者能夠動態設置它們想要訂閱的主題列表。Kafka給每一個訂閱的消費者組都投遞一份消息。這歸功於消費者組中全部成員之間的均衡分區,以致於每一個分區均可以被指定到組中精確的一個消費者。假設一個主題有4個分區,一個組中有2個消費者,那麼每一個消費者將處理2個分區。

消費者組中的成員是動態維護的:若是一個消費者處理失敗了,那麼分配給它的分區將會被從新分給組中其它消費者。

在概念上,你能夠把一個消費者組想象成一個單個的邏輯訂閱者,而且每一個邏輯訂閱者由多個進程組成。做爲一個多訂閱系統,Kafka天生就支持對於給定的主題能夠有任意數量的消費者組。

Automatic Offset Committing

 
1 Properties props = new Properties(); 2 props.put("bootstrap.servers", "192.168.1.128:9092"); 3 props.put("group.id", "test"); 4 props.put("enable.auto.commit", "true"); 5 props.put("auto.commit.interval.ms", "1000"); 6 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 7 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 8 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 9 consumer.subscribe(Arrays.asList("foo", "bar")); 10 while (true) { 11 ConsumerRecords<String, String> records = consumer.poll(100); 12 for (ConsumerRecord<String, String> record : records) { 13 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); 14 } 15 }

設置enable.auto.commit意味着自動提交已消費的記錄的offset

Manual Offset Control

代替消費者週期性的提交已消費的offsets,用戶能夠控制何時記錄被認爲是已經消費並提交它們的offsets。

 
1 Properties props = new Properties(); 2 props.put("bootstrap.servers", "localhost:9092"); 3 props.put("group.id", "test"); 4 props.put("enable.auto.commit", "false"); 5 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 6 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 7 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 8 consumer.subscribe(Arrays.asList("foo", "bar")); 9 final int minBatchSize = 200; 10 List<ConsumerRecord<String, String>> buffer = new ArrayList<>(); 11 while (true) { 12 ConsumerRecords<String, String> records = consumer.poll(100); 13 for (ConsumerRecord<String, String> record : records) { 14 buffer.add(record); 15 } 16 if (buffer.size() >= minBatchSize) { 17 insertIntoDb(buffer); 18 consumer.commitSync(); 19 buffer.clear(); 20 } 21 }

代碼演示

服務器端

 

客戶端

 

 

 

 

若是想學習Java工程化、高性能及分佈式、深刻淺出。微服務、Spring,MyBatis,Netty源碼分析的朋友能夠加個人Java高級交流:854630135,羣裏有阿里大牛直播講解技術,以及Java大型互聯網技術的視頻免費分享給你們。

相關文章
相關標籤/搜索