bootstrap.servers
該參數爲 broker
地址,不須要所有都填,由於 kafka
會從當前 broker
中獲取其餘 broker
信息。不過爲了某個 broker
掛掉,通常填多個 broker
地址java
key.serializer
消息 key
如何序列化apache
value.serializer
消息內容如何序列化bootstrap
示例代碼ide
Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
在消息發送前,對消息進行處理,該動做發生在序列化器
、分區器
以前。編碼
實現 org.apache.kafka.clients.producer.ProducerInterceptor
接口,便可自定義攔截器code
介紹一下接口定義的方法server
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record)
在消息發送以前,能夠對消息進行處理接口
void onAcknowledgement(RecordMetadata metadata, Exception exception
消息被應答以前或者消息發送失敗時被調用get
void close()
producer
被關閉時,會調用kafka
kafka 容許配置攔截器鏈,多個攔截器用 , 號隔開便可。
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TestProducerInterceptor.class.getName() + "," + TestProducerInterceptor2.class.getName());
序列化發生在分區器
以前
實現 org.apache.kafka.common.serialization.StringSerializer
接口便可自定義序列化
介紹一下接口定義的方法
void configure(Map<String, ?> configs, boolean isKey)
在 StringSerializer
實現中,用於設置編碼
byte[] serialize(String topic, String data)
定義如何序列化
void close()
producer
關閉時,被調用
實現 org.apache.kafka.clients.producer.Partitioner
便可自定義分區器
kafka 可按 key 進行哈希(MurmurHash2),將消息發往同一個分區。若是未指定 key,那麼將會把消息發往隨機的一個分區。
介紹一下接口定義的方法
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)
定義發往哪一個分區;具體的實現可參考DefaultPartitioner
void close()
producer
關閉時,被調用
kafka
一致,rocketMQ
容許生產者將消息發送到指定的 'partition' 中rocketMQ
沒有 序列化器
的概念。消息內容由 rocketMQ
自行序列化rocketMQ
也沒有提供相似 攔截器
概念rocketMQ
提供了 hock
以此在消息發送前,和消息發送後,對消息進行處理例如:
DefaultMQProducer producer = new DefaultMQProducer("default"); producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() { @Override public String hookName() { return null; } @Override public void sendMessageBefore(SendMessageContext context) { } @Override public void sendMessageAfter(SendMessageContext context) { } });