kafka Consumer Api還提供了本身存儲offset的功能,將offset和data作到原子性,可讓消費具備Exactly Once 的語義,比kafka默認的At-least Once更強大java
設置消費者從自定義的位置開始拉取數據,好比從程序中止時已消費的下一Offset開始拉取數據,使用這個功能要求data和offset的update操做是原子的,不然可能會破壞數據一致性數據庫
/* 手動設置指定分區的offset,只適用於使用Consumer.assign方法添加主題的分區,不適用於kafka自動管理消費者組中的消費者場景, 後面這種場景可使用ConsumerRebalanceListener作故障恢復使用 */ @Test public void controlsOffset() { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //消費者訂閱主題,並設置要拉取的分區 //加一段代碼將本身保存的分區和偏移量讀取到內存 //load partition and it's offset TopicPartition partition0 = new TopicPartition("mytopic3", 0); consumer.assign(Arrays.asList(partition0)); //告知Consumer每一個分區應該從什麼位置開始拉取數據,offset從你加載的值或者集合中拿 consumer.seek(partition0, 4140l); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } }
代碼和上面的絕大多數都同樣,就是要本身加載分區信息,給消費者設置每一個分區的偏移量apache
kafka提供該監聽來處理當某一個topic的消費者發生變化(加入、退出)時分區從新分配(先解除與消費者的綁定關係,再從新與消費者綁定)用戶想作回調的狀況,分區與消費者解除綁定時調用onPartitionsRevoked方法;從新綁定時調用onPartitionsAssigned。bootstrap
監聽代碼緩存
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /* kafka提供了這個監聽來處理分區的變化,區分被取消時調用onPartitionsRevoked方法;分區被分配時調用onPartitionsAssigned */ public class MyConsumerRebalanceListener implements ConsumerRebalanceListener { static Map<TopicPartition,Long> partitionMap = new ConcurrentHashMap<>(); private Consumer<?,?> consumer; //實例化Listener的時候將Consumer傳進來 public MyConsumerRebalanceListener(Consumer<?,?> consumer) { this.consumer = consumer; } /* 有新的消費者加入消費者組或者已有消費者從消費者組中移除會觸發kafka的rebalance機制,rebalance被調用前會先調用下面的方法 此時你能夠將分區和它的偏移量記錄到外部存儲中,好比DBMS、文件、緩存數據庫等,還能夠在這裏處理本身的業務邏輯 */ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { for(TopicPartition partition: partitions){ //記錄分區和它的偏移量 partitionMap.put(partition,consumer.position(partition)); //清空緩存 System.out.println("onPartitionsRevoked partition:" + partition.partition()+" - offset"+consumer.position(partition)); } } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { //設置分區的偏移量 for(TopicPartition partition: partitions){ System.out.println("onPartitionsAssigned partition:" + partition.partition()+" - offset"+consumer.position(partition)); if(partitionMap.get(partition)!=null){ consumer.seek(partition, partitionMap.get(partition)); }else{ //自定義處理邏輯 } } } }
測試代碼安全
@Test public void autoCommitAddListner(){ Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 true-開啓 false-關閉 props.put("enable.auto.commit", "false"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); MyConsumerRebalanceListener myListener = new MyConsumerRebalanceListener(consumer); //消費者訂閱主題,能夠訂閱多個主題 consumer.subscribe(Arrays.asList("mytopic3"),myListener); //consumer.subscribe(Arrays.asList("mytopic3")); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value()); } long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); /* 能夠將這裏的偏移量提交挪到監聽的onPartitionsRevoked方法中,控制靈活,可是也很容易出問題 */ consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1))); } } } finally { consumer.close(); } }
使用pause和resume能夠暫停和恢復一個分區的消費動做多線程
consumer.pause(Arrays.asList(new TopicPartition("topic_name",parition_num)))併發
consumer.resume(Arrays.asList(new TopicPartition("topic_name",parition_num)))jvm
按事務讀數據ide
該操做與Producer的按事務寫相匹配,在Consumer代碼的配置中增長一行:
props.put("isolation.level","read_committed");
注意,按事務讀,不能使用在按指定分區拉取數據的消費者中
KafkaConsumer是線程不安全,kafka官方提供了一種寫法來避免線程安全問題
ConsumerRunner:
package com.jv.parallel; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import java.util.Arrays; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; public class ConsumerRunner implements Runnable{ private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer<String,String> consumer; private final CountDownLatch latch; public ConsumerRunner(KafkaConsumer<String,String> consumer, CountDownLatch latch){ this.consumer = consumer; this.latch = latch; } @Override public void run() { System.out.println("threadName...."+Thread.currentThread().getName()); try { consumer.subscribe(Arrays.asList("mytopic3")); while (!closed.get()) { ConsumerRecords<String, String> records = consumer.poll(10000); for (ConsumerRecord<String, String> record : records) System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(),record.offset(), record.key(), record.value()); } } catch (WakeupException e) { if (!closed.get()) throw e; } finally { consumer.close(); latch.countDown(); } } public void shutdown() { System.out.println("close ConsumerRunner"); closed.set(true); consumer.wakeup(); } }
驅動方法:
@Test public void autoCommitParallelTest() { Properties props = new Properties(); //設置kafka集羣的地址 props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092"); //設置消費者組,組名字自定義,組名字相同的消費者在一個組 props.put("group.id", "my_group"); //開啓offset自動提交 props.put("enable.auto.commit", "true"); //自動提交時間間隔 props.put("auto.commit.interval.ms", "1000"); //序列化器 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //實例化一個消費者 final List<ConsumerRunner> consumers = new ArrayList<>(); final List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>(); for(int i = 0;i < 2;i++){ kafkaConsumers.add(new KafkaConsumer<String, String>(props)); } final CountDownLatch latch = new CountDownLatch(2); final ExecutorService executor = Executors.newFixedThreadPool(2); for(int i = 0;i < 2;i++){ ConsumerRunner c = new ConsumerRunner(kafkaConsumers.get(i),latch); consumers.add(c); executor.submit(c); } /* 這個方法的意思就是在jvm中增長一個關閉的鉤子,當jvm關閉的時候,會執行系統中已經設置的全部經過方法addShutdownHook添加的鉤子,當系統執行完這些鉤子後,jvm纔會關閉 因此這些鉤子能夠在jvm關閉的時候進行內存清理、對象銷燬、關閉鏈接等操做 */ Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { System.out.println("...................."); for (ConsumerRunner consumer : consumers) { consumer.shutdown(); } executor.shutdown(); try { executor.awaitTermination(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } }
可是Kafka官方任然不建議多個線程共用一個Consumer,不然會出現ConcurrentModificationException異常
Kafka提供以下兩個方案實現併發:
1.一個線程一個Consumer
每一個線程都擁有本身的Consumer
優勢:
寫代碼容易
由於不須要協調和調度線程,速度比較快
實現分區的有序很是容易
缺點:
TCP鏈接更多,若是分區很是多,這種方案不可行了
消費者多了,可能由於批處理少了,使IO吞吐量減小
併發數嚴重依賴於分區數(消費者數只能小於等於分區數)
2.Consumer和Processer分離
使用一個或者多個Consumer從Kafka拉取數據,並將數據放到一個阻塞隊列中,由Processor從阻塞隊列中獲取數據並作業務處理。
優勢:
將消費和業務處理作垂直切分,而後在水平上能夠獨立的進行擴展
缺點:
分區順序難以保障
分區提交很是麻煩
針對這種方案的分區內數據順序問題,可使用讓每一個消費者都有本身的阻塞隊列。由於Consumer和Processor已經分離了,如何讓Consumer知道數據已經被Processor處理完是比較麻煩的事情,