Kafka源碼閱讀日記——ProducerInterceptor

Kafka做爲當前流行的消息中間件,在消息隊列、微服務架構、大數據平臺等方面有着普遍的應用。若是將平臺比做人體,Kafka即是神經系統,負責傳遞消息。本系列利用碎片化的時間,閱讀Kafka源碼深刻了解各個模塊的原理和實現,不按期更新。文中全部代碼均來自https://github.com/apache/kafkajava

Kafka Producer簡單使用示例

KafkaProducer用於將事件從客戶端應用發送至Kafka集羣。Producer自己是線程安全的,而且多個線程共享單個實例時也會有性能上的提高。如下示例來自org.apache.kafka.clients.producer.KafkaProducer類:git

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

props變量定義了Producer的屬性和基本配置信息:github

  • bootstrap.servers:kafka服務器地址,事件會發送至該服務器。在生產環境中一般定義多個服務器並用逗號分隔,以防止單個服務器忽然崩潰。
  • acks:當事件發送至Kafka集羣時,數據在集羣內部會有主從備份,acks定義了什麼時候能夠斷定消息發送成功。apache

    • acks = 0時,Producer在消息發送後不會等待服務器返回結果,馬上返回成功。
    • acks = 1時,消息在主(leader)服務器寫入後返回成功,不會等待從(follower)服務器備份完成。
    • acks = all時,消息在主從服務器都寫入成功後才告知Producer發送成功。
  • retries: 當發送失敗時,producer自動重發的次數,並非全部的錯誤均可以觸發自動重發,而且自動重發可能致使消息發送順序錯亂,具體信息將在之後的章節介紹
  • key.serializer/value.serializer: 全部發送至kafka的數據都是以byte形式存在的,key/value serializer負責將Java實例轉化爲字節。

使用上述配置初始化proudcer後,咱們能夠構建ProducerRecord,這裏使用topic,key,value構建消息並調用producer.send方法發送至kafka集羣。在程序結束前務必調用producer.close方法,由於默認狀況下producer會在內存中batch多個事件,並一塊兒發送以增長性能,close方法會強制發送當前內存中未發送的事件。bootstrap

發送消息

在上述示例中咱們使用了send接口傳入併發送ProducerRecord,在實際實現中該方法使用另外一個send接口並傳入了null回調函數。Kafka發送消息是異步的,回調函數能夠得到發送結果,若發送成功,回調函數能夠獲得消息的元數據包括topic,partition,offset等。若失敗可得到錯誤信息。安全

/**
* Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
* See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}

/**
* Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
* <p>
* The send is asynchronous and this method will return immediately once the record has been stored in the buffer of
* records waiting to be sent. This allows sending many records in parallel without blocking to wait for the response after each one.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

一個回調函數的例子:服務器

producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e != null) {
                              e.printStackTrace();
                           } else {
                              System.out.println("The offset of the record we just sent is: " + metadata.offset());
                           }
                       }
                   });

攔截器(ProducerInterceptor)

public interface ProducerInterceptor<K, V> extends Configurable {
  /**
  * 消息發送前調用
  */
  public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
  /**
  * 消息發送後,服務器返回結果(成功或錯誤)時調用
  */
  public void onAcknowledgement(RecordMetadata metadata, Exception exception);

  /**
  * 攔截器關閉時調用
  */
  public void close();
}

每個Producer均可以設置一個或多個攔截器,攔截器容許客戶端攔截或修改要發送的消息,經過Properties進行設置:架構

Properties props = new Properties();
...
props.put("interceptor.classes", "your.interceptor.class.name");
public class KafkaProducer<K, V> implements Producer<K, V> {
  // ... other class members
  private final ProducerInterceptors<K, V> interceptors;

  // Producer構造函數
  KafkaProducer(ProducerConfig config,
                  Serializer<K> keySerializer,
                  Serializer<V> valueSerializer,
                  Metadata metadata,
                  KafkaClient kafkaClient) {
      // ...其餘步驟省略
      // 從config中獲取攔截器實例,config從properties中構造
      List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ProducerInterceptor.class);
      this.interceptors = new ProducerInterceptors<>(interceptorList);   
  }           
}

攔截器設置完成後,在send方法中進行調用:併發

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

ProducerInterceptors

ProducerInterceptors是一個容器類,封裝了多個攔截器,onSend方法被producer的send方法調用。異步

/**
* A container that holds the list {@link org.apache.kafka.clients.producer.ProducerInterceptor}
* and wraps calls to the chain of custom interceptors.
*/
public class ProducerInterceptors<K, V> implements Closeable {
    private final List<ProducerInterceptor<K, V>> interceptors;

    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
        this.interceptors = interceptors;
    }
    
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
        ProducerRecord<K, V> interceptRecord = record;
        // 按順序執行每個攔截器的onSend方法
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptRecord = interceptor.onSend(interceptRecord);
            } catch (Exception e) {
                // do not propagate interceptor exception, log and continue calling other interceptors
                // be careful not to throw exception from here
                if (record != null)
                    log.warn("Error executing interceptor onSend callback for topic: {}, partition: {}", record.topic(), record.partition(), e);
                else
                    log.warn("Error executing interceptor onSend callback", e);
            }
        }
        return interceptRecord;
    }
    
    /**
    * 1. 當發送的消息被服務器接受並返回時調用
    * 2. 當發送的消息未到達服務器以前就失敗時調用(見下方onSendError方法)
    **/
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                interceptor.onAcknowledgement(metadata, exception);
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
    
    /**
    * Producer在發送數據前要構建多種不一樣的信息,每一步都有可能拋出異常,本方法由producer在遇到異常時調用,
    * TopicPartition記錄了topic和partition信息,由producer構建,但若異常發生在其構建以前,該參數爲空,所以從record裏提取topic和partition數據構建。
    **/
    public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
            try {
                if (record == null && interceptTopicPartition == null) {
                    interceptor.onAcknowledgement(null, exception);
                } else {
                    if (interceptTopicPartition == null) {
                        interceptTopicPartition = new TopicPartition(record.topic(),
                                record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition());
                    }
                    interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
                                    RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception);
                }
            } catch (Exception e) {
                // do not propagate interceptor exceptions, just log
                log.warn("Error executing interceptor onAcknowledgement callback", e);
            }
        }
    }
}

須要注意的是,若是攔截器拋出異常,程序不會中止,只會寫入一個warn級別的日誌。而且攔截器鏈也不會中止執行,而是繼續執行下一個攔截器。若是下一個攔截器依賴於上一個的結果,那麼最終獲得的數據可能不正確。

相關文章
相關標籤/搜索