在上一篇文章【大數據實踐】Kafka生產者編程(2)——producer發送流程中,對自定義Interceptor和自定義Partitioner作了簡單介紹,沒有作深刻講解。所以,在本文章中,嘗試補充介紹Interceptor和Partitioner的一些理論知識,並介紹如何自定義者兩個類。java
攔截器(interceptor)可讓用戶在消息記錄發送以前,或者producer回調方法執行以前,對消息或者回調信息作一些邏輯處理。攔截器實現瞭如下接口:算法
package org.apache.kafka.clients.producer; import org.apache.kafka.common.Configurable; public interface ProducerInterceptor<K, V> extends Configurable { ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1); void onAcknowledgement(RecordMetadata var1, Exception var2); void close(); }
onSend()
:onSend函數將會在消息記錄被髮送以前被調用,它能夠對ProducerRecord
作一些處理,返回處理以後的ProducerRecord
。onAcknowledgement()
:onAcknowledgement方法將在send時指定的回調函數執行以前被調用,可對執行結果進行一些處理。close()
:close方法將在執行producer.close()的時候被調用,能夠釋放資源等。攔截鏈(ProducerInterceptors)包含了一個由多個攔截器組裝起來的攔截器列表List<ProducerInterceptor<K, V>>
,在producer發送消息,消息迴應以及close時,攔截鏈的onSend、onAcknowledgement、close方法會被調用,而這些方法中,會逐一調用每一個攔截器的onSend、onAcknowledgement、close方法。就像是生成流水線上,各個處理程序同樣。apache
攔截鏈類所在位置:編程
package org.apache.kafka.clients.producer.internals; public class ProducerInterceptors<K, V> implements Closeable {}
自定義一個計數攔截器,以下:segmentfault
import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Map; public class CounterInterceptor implements ProducerInterceptor<Integer, String> { public int sendCounter = 0; public int succCounter = 0; public int failCounter = 0; public void configure(Map<String, ?> configs) { } public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, String> record) { System.out.println("onSend called in CounterInterceptor, key = " + record.key()); sendCounter++; return record; } public void onAcknowledgement(RecordMetadata recordMetadata, Exception exception) { if (exception == null) { System.out.println("record send ok. topic = " + recordMetadata.topic() + "partion = " + recordMetadata.partition()); succCounter++; } else { System.out.println("record send failed. topic = " + recordMetadata.topic() + "partion = " + recordMetadata.partition()); failCounter++; } } public void close() { System.out.println("sendCounter = " + sendCounter + " succCounter = " + succCounter + " failCounter = " + failCounter); } }
將攔截器裝配到自定義的Producer中:數組
package myproducers; /** * kafka消息生產者—— */ import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.ArrayList; import java.util.List; import java.util.Properties; import java.util.concurrent.ExecutionException; public class GameRecordProducer { public static final String KAFKA_SERVER_URL = "localhost"; public static final int KAFKA_SERVER_PORT = 9092; public GameRecordProducer() {} public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT); props.put(ProducerConfig.CLIENT_ID_CONFIG, "myproducers.GameRecordProducer"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); List<String> intercepters = new ArrayList<String>(); intercepters.add("myproducers.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, intercepters); KafkaProducer<Integer, String> producer; producer = new KafkaProducer<Integer, String>(props); try { producer.send(new ProducerRecord<Integer,String>("game-score","message1")).get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
消息記錄類中,記錄了須要發送的消息內容以及要發送到的主題、分區等內容。類的定義以下:dom
package org.apache.kafka.clients.producer; public class ProducerRecord<K, V> { private final String topic; private final Integer partition; private final Headers headers; private final K key; private final V value; private final Long timestamp;
topic
:必須字段,表示該消息記錄record發送到那個topic。value
:必須字段,表示消息內容。partition
:可選字段,要發送到哪一個分區partition。函數
key
:可選字段,消息記錄的key,可用於計算選定partition。timestamp
:可選字段,時間戳;表示該條消息記錄的建立時間createtime,若是不指定,則默認使用producer的當前時間。headers
:可選字段。kafka producer的partition制定策略爲:大數據
具體算法源代碼以下:this
package org.apache.kafka.clients.producer.internals; import ... public class DefaultPartitioner implements Partitioner { // ... public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 從集羣中獲取該topic分區列表及分區數量。 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { // 沒有指定key值,及key值序列化以後爲null,則獲取下一個可用的partition值 int nextValue = this.nextValue(topic); // 獲取該topic可用的分區列表 List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { // 可用分區列表大於0時, int part = Utils.toPositive(nextValue) % availablePartitions.size(); return ((PartitionInfo)availablePartitions.get(part)).partition(); } else { // toPositive:確保爲正數,Math.abs(Integer.MIN_VALUE)爲負數,因此不能用。 // toPositive(Integer.MIN_VALUE) == 0 // toPositive(-1) == 2147483647 // 取餘 return Utils.toPositive(nextValue) % numPartitions; } } else { // 使用murmur2 hash算法,求得值,在取餘 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } // 獲取下一個值 private int nextValue(String topic) { AtomicInteger counter = (AtomicInteger)this.topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = (AtomicInteger)this.topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } ... }
除了使用默認的Partitioner以外,也可使用自定義的Partitioner,已實現更好的分區均衡。
package myproducers; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; public class ConstantPartioner implements Partitioner { public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 固定永遠返回1,即所有放在1分區 return 1; } public void close() { } public void configure(Map<String, ?> configs) { } }
在構建KafkaProducer對象時,在配置信息中,將自定義的Partitioner類配置進去:
kafkaProps.put("partitioner.class", "myproducer.ConstantPartitioner");
本文章介紹了kafka producer中兩個比較獨立概念,在實際開發過程當中,可做爲咱們程序的擴展點。後一篇文章將繼續圍繞KafkaProducer的配置細節進行分析,以瞭解Kafka發送過程當中的更多的細節和機制。