本文主要解析一下spring for apache kafka對原生的kafka client producer的封裝與集成。java
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaProducerFactory.javaspring
public class DefaultKafkaProducerFactory<K, V> implements ProducerFactory<K, V>, Lifecycle, DisposableBean { @Override public void destroy() throws Exception { //NOSONAR CloseSafeProducer<K, V> producer = this.producer; this.producer = null; if (producer != null) { producer.delegate.close(this.physicalCloseTimeout, TimeUnit.SECONDS); } } @Override public void start() { this.running = true; } @Override public void stop() { try { destroy(); } catch (Exception e) { logger.error("Exception while stopping producer", e); } } @Override public boolean isRunning() { return this.running; } @Override public Producer<K, V> createProducer() { if (this.producer == null) { synchronized (this) { if (this.producer == null) { this.producer = new CloseSafeProducer<K, V>(createKafkaProducer()); } } } return this.producer; } }
集成spring的第一步就是集成到spring容器託管,而後跟隨spring容器的生命週期正常啓動和銷燬。這裏建立了CloseSafeProducer,它實際的操做都委託給kafka producerapache
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/KafkaTemplate.java
實現了以下接口ide
public interface KafkaOperations<K, V> { /** * Send the data to the default topic with no key or partition. * @param data The data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> sendDefault(V data); /** * Send the data to the default topic with the provided key and no partition. * @param key the key. * @param data The data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); /** * Send the data to the default topic with the provided key and partition. * @param partition the partition. * @param key the key. * @param data the data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> sendDefault(int partition, K key, V data); /** * Send the data to the provided topic with no key or partition. * @param topic the topic. * @param data The data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> send(String topic, V data); /** * Send the data to the provided topic with the provided key and no partition. * @param topic the topic. * @param key the key. * @param data The data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); /** * Send the data to the provided topic with the provided partition and no key. * @param topic the topic. * @param partition the partition. * @param data The data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> send(String topic, int partition, V data); /** * Send the data to the provided topic with the provided key and partition. * @param topic the topic. * @param partition the partition. * @param key the key. * @param data the data. * @return a Future for the {@link SendResult}. */ ListenableFuture<SendResult<K, V>> send(String topic, int partition, K key, V data); /** * Send a message with routing information in message headers. The message payload * may be converted before sending. * @param message the message to send. * @return a Future for the {@link SendResult}. * @see org.springframework.kafka.support.KafkaHeaders#TOPIC * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY */ ListenableFuture<SendResult<K, V>> send(Message<?> message); /** * See {@link Producer#partitionsFor(String)}. * @param topic the topic. * @return the partition info. * @since 1.1 */ List<PartitionInfo> partitionsFor(String topic); /** * See {@link Producer#metrics()}. * @return the metrics. * @since 1.1 */ Map<MetricName, ? extends Metric> metrics(); /** * Execute some arbitrary operation(s) on the producer and return the result. * @param callback the callback. * @param <T> the result type. * @return the result. * @since 1.1 */ <T> T execute(ProducerCallback<K, V, T> callback); /** * Flush the producer. */ void flush(); /** * A callback for executing arbitrary operations on the {@link Producer}. * @param <K> the key type. * @param <V> the value type. * @param <T> the return type. * @since 1.1 */ interface ProducerCallback<K, V, T> { T doInKafka(Producer<K, V> producer); } }
主要的send方法以下,這就是spring對producer的主要包裝的地方:this
/** * Send the producer record. * @param producerRecord the producer record. * @return a Future for the {@link RecordMetadata}. */ protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { final Producer<K, V> producer = getTheProducer(); if (this.logger.isTraceEnabled()) { this.logger.trace("Sending: " + producerRecord); } final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); producer.send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { try { if (exception == null) { future.set(new SendResult<>(producerRecord, metadata)); if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) { KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata); } } else { future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception); } } } finally { producer.close(); } } }); if (this.autoFlush) { flush(); } if (this.logger.isTraceEnabled()) { this.logger.trace("Sent: " + producerRecord); } return future; }
不要被CloseSafeProducer的close方法誤導,它裏頭是個空方法。rest