大哥你先走 浪尖聊大數據 git
如今,Kafka指標的採集都僅包括客戶端或broker,這使得用戶跟蹤消息在集羣內的傳遞路徑,構建系統端到端的性能和行爲畫像變的困難。從技術上講,經過修改應用以收集或跟蹤額外的信息來測量系統端到端的性能是可行的,但對於關鍵的基礎設施應用來講,這種方案並不必定是切實可行的。在生產環境中,可以快速部署工具來觀察,測量和監控Kafka客戶端行爲(粒度直至消息級別),是很是有用的。同時,不一樣應用的度量指標須要的上下文元數據各異。無需從新編寫代碼或從新編譯便可實現監控客戶端的能力十分重要(在某些場景下,這種能力有助於鏈接到正在運行的應用程序)。
爲了實現這個功能,kafka 更加傾向於增長生產者和消費者攔截器,攔截器能夠在生產者和消費者處理消息的不一樣階段攔截消息。在Apache Flume 攔截器接口的啓發下,kafka開發瞭如今的機制。雖然,有不少功能均可以使用攔截器實現(例如,異常檢測,數據加密,字段過濾等),可是每一個功能都須要仔細的評估是否應該使用攔截器仍是其餘機制來完成。當這些場景有明確的使用動機時,提供明確的API是一種良好的實踐。所以,kafka提供了最小化的生產者和消費者攔截器接口,旨在僅支持測量和監控。github
儘管增長更多的指標或改進kafka的監控是可能的,可是基於如下緣由咱們認爲提供靈活的,用戶可定製的接口更加有益:算法
構建通用監控工具。在一家大公司,不一樣的團隊合做構建系統。一般來講,隨着時間的推移,不一樣的團隊開發部署不一樣的組件。此外,組織對於通用的指標、數據格式和數據系統但願實現標準化。對於一個組織,咱們認爲開發部署通用的Kafka客戶端監控工具並在全部使用Kafka的應用中部署該工具是很是有價值的。apache
高昂的監控代價。向kafka添加其餘指標可能會影響kafka的性能。不幸的是,有時候須要在系統性能和數據收集之間進行權衡。舉個例子,考慮檢測消息大小的場景。代價最低,最簡單,最直接的方法是計算消息的平均大小。計算分佈式系統中的百分比要比計算簡單的平均值代價更高,更復雜,可是在不少應用中這是很是有用的。咱們但願可以讓客戶使用不一樣的算法收集指標數據,或者不收集。bootstrap
應用對指標的要求不一樣。例如,一個用戶可能認爲監控kafka中不一樣key的消息數很是的重要。在kafka內部提供全部的指標是不切實際的。插件化的攔截系統爲指標的定製化提供了簡單可行的能力。分佈式
爲了支持攔截器功能,Kafka在0.10.0.0版本增長了兩個全新的接口:ProducerInterceptor和ConsumerInterceptor並支持實現和配置攔截器鏈。攔截器API容許修改消息以支持給消息增長額外元數據實現端到端跟蹤的能力。ide
生產者攔截器 ProducerInterceptor工具
public interface ProducerInterceptor<K, V> extends Configurable { public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); public void onAcknowledgement(RecordMetadata metadata, Exception exception); public void close(); }
消費者攔截器 ConsumerInterceptor性能
public interface ConsumerInterceptor<K, V> extends Configurable { ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1); void onCommit(Map<TopicPartition, OffsetAndMetadata> var1); void close(); }
下面以實現一個簡單的kafka指標採集小功能爲例,進一步瞭解kafka攔截器的功能和使用方法。採集指標包括:測試
生產和消費消費的線程名
3.1 修改消息,增長處理線程名
在生產端,實現ProducerInterceptor接口並覆寫onSend方法,修改ProducerRecord,在Heads中增長生產者線程名:
public class TraceProducerInterceptor implements ProducerInterceptor<String, String> { @Override public ProducerRecord<String,String> onSend(ProducerRecord<String,String > record) { Header producerThread = new RecordHeader("producerThread",Thread.currentThread().getName().getBytes()); record.headers().add(producerThread); return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),record.value(),record.headers()); } }
在消費端,實現ConsumerInterceptor接口並覆寫onConsume方法,修改ConsumerRecord,在Heads中增長消費者線程名:
public class TraceConsumerInterceptor implements ConsumerInterceptor<String, String> { @Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { byte[] currentThreadName = Thread.currentThread().getName().getBytes(); Header header = new RecordHeader("consumer Thread", currentThreadName); records.forEach(record -> record.headers().add(header)); return records; } }
3.2 實現生產者消息成功失敗統計
在生產端,實現ProducerInterceptor接口並覆寫onAcknowledgement方法,對發送成功和失敗的消息進行統計,並在攔截器關閉時將數據打印到控制檯:.
public class TraceProducerInterceptor implements ProducerInterceptor<String, String> { private AtomicLong successCounts = new AtomicLong(0); private AtomicLong failedCounts = new AtomicLong(0); @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if (null == exception) { successCounts.getAndIncrement(); } else { failedCounts.getAndIncrement(); } } @Override public void close() { System.out.println("success counts " + successCounts.get()); System.out.println("failed counts " + failedCounts); } }
3.3 . 攔截器配置:
生產者和消費者能夠經過interceptor.classes屬性配置攔截器,屬性的值爲一個字符串集合,集合中的元素爲攔截器類的全路徑名(包括包名)。
生產者只包含攔截器的配置以下:
Properties props = new Properties(); List<String> interceptors = new ArrayList<>(); interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
消費者只包含攔截器的配置以下:
Properties props = new Properties(); List<String> interceptors = new ArrayList<>(); interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor"); props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
3.4. 測試
生產者使用三個線程,每一個線程發送一個消息到kafka,在主線程啓動消費者消費kafka的消息,收到的每條消息打印消息的Heads信息。
private static ExecutorService executor = Executors.newFixedThreadPool(3);
爲了不主線程退出致使發送消息失敗,在添加任務時,將返回的Future對象保存到隊列中,而後逐個檢查任務是否完成,詳細的代碼以下:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("delivery.timeout.ms", 300000); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); List<String> interceptors = new ArrayList<>(); interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceProducerInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Producer<String, String> producer = new KafkaProducer<>(props); List<Future> futures = new ArrayList<>(3); for (int i = 0; i < 3; i++) { futures.add(executor.submit(() -> { producer.send(new ProducerRecord<>("TEST", "hello world ")); })); } futures.forEach(future -> { try { future.get(); } catch (Exception e) { System.out.println(e.getMessage()); } }); producer.close();
代碼的輸出結果以下:
success counts 3 failed counts 0
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "chentong"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); List<String> interceptors = new ArrayList<>(); interceptors.add("io.github.ctlove0523.stackoverflow.kafka.TraceConsumerInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors); Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.assign(Arrays.asList(new TopicPartition("TEST", 0))); consumer.seek(new TopicPartition("TEST", 0), 0L); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); records.forEach(record -> { record.headers().headers("producer thread") .forEach(header -> System.out.print("producer thread = " + new String(header.value()))); record.headers().headers("consumer thread") .forEach(header -> System.out.println("\t consumer thread = " + new String(header.value()))); }); }
代碼輸出結果以下:
producer thread = pool-1-thread-2 consumer thread = main producer thread = pool-1-thread-1 consumer thread = main producer thread = pool-1-thread-3 consumer thread = main
本文首先介紹了kafka攔截器引入的動機,主要爲了解決當前kafka指標採集和監控的痛點問題;接着簡單介紹了ProducerInterceptor和ConsumerInterceptor兩個接口,最後以一個實際修改kafka消息Heads的例子進一步闡述瞭如何使用kafka提供的攔截器功能。
轉自:https://www.jianshu.com/p/a344b3bba8f0