Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close();
/** * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>. * See {@link #send(ProducerRecord, Callback)} for details. */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record) { return send(record, null); } /** * Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged. * <p> * The send is asynchronous and this method will return immediately once the record has been stored in the buffer of * records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one. */ @Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } });
public interface ProducerInterceptor<K, V> extends Configurable { /** * 消息發送前調用 */ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); /** * 消息發送後,服務器返回結果(成功或錯誤)時調用 */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * 攔截器關閉時調用 */ public void close(); }
Properties props = new Properties(); ... props.put("interceptor.classes", "your.interceptor.class.name");
public class KafkaProducer<K, V> implements Producer<K, V> { // ... other class members private final ProducerInterceptors<K, V> interceptors; // Producer構造函數 KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer, Metadata metadata, KafkaClient kafkaClient) { // ...其餘步驟省略 // 從config中獲取攔截器實例,config從properties中構造 List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); this.interceptors = new ProducerInterceptors<>(interceptorList); } }
@Override public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); }
/** * A container that holds the list {@link org.apache.kafka.clients.producer.ProducerInterceptor} * and wraps calls to the chain of custom interceptors. */ public class ProducerInterceptors<K, V> implements Closeable { private final List<ProducerInterceptor<K, V>> interceptors; public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) { this.interceptors = interceptors; } public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) { ProducerRecord<K, V> interceptRecord = record; // 按順序執行每個攔截器的onSend方法 for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptRecord = interceptor.onSend(interceptRecord); } catch (Exception e) { // do not propagate interceptor exception, log and continue calling other interceptors // be careful not to throw exception from here if (record != null) log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e); else log.warn("Error executing interceptor onSend callback", e); } } return interceptRecord; } /** * 1. 當發送的消息被服務器接受並返回時調用 * 2. 當發送的消息未到達服務器以前就失敗時調用(見下方onSendError方法) **/ public void onAcknowledgement(RecordMetadata metadata, Exception exception) { for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { interceptor.onAcknowledgement(metadata, exception); } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } } /** * Producer在發送數據前要構建多種不一樣的信息,每一步都有可能拋出異常,本方法由producer在遇到異常時調用, * TopicPartition記錄了topic和partition信息,由producer構建,但若異常發生在其構建以前,該參數爲空,所以從record裏提取topic和partition數據構建。 **/ public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) { for (ProducerInterceptor<K, V> interceptor : this.interceptors) { try { if (record == null && interceptTopicPartition == null) { interceptor.onAcknowledgement(null, exception); } else { if (interceptTopicPartition == null) { interceptTopicPartition = new TopicPartition(record.topic(), record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); } interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception); } } catch (Exception e) { // do not propagate interceptor exceptions, just log log.warn("Error executing interceptor onAcknowledgement callback", e); } } } }