kafka之Producer同步與異步消息發送及事務冪等性案例應用實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。apache

1 我很安全

爲什麼驚出此言?心裏惶恐。kafka的Producer是線程安全的,用戶能夠很是很是放心的在多線程中使用。bootstrap

可是官方建議:一般狀況下,一個線程維護一個kafka 的producer的效率會更高。安全

2 Producer 消息發送流程

  • 第一步:封裝ProducerRecord
  • 第二步:分區器Partioner進行數據路由,選擇某一個Topic分區。若是沒有指定key,消息會被均勻的分配到全部分區。
  • 第三步:肯定好分區,就會找分區對應的leader,接下來就是副本同步機制。

3 Producer官方實例

3.1 Fire and Fogret案例 (無所謂心態)

  • 發送以後便再也不理會發送結果
    Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("acks", "all");
       props.put("retries", 0);
       props.put("batch.size", 16384);
       props.put("linger.ms", 1);
       props.put("buffer.memory", 33554432);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      
       Producer<String, String> producer = new KafkaProducer<>(props);
       for (int i = 0; i < 100; i++)
           producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
      
       producer.close();
    複製代碼

3.2 異步回調官方案例 (不阻塞)

  • JavaProducer的send方法會返回一個JavaFuture對象供用戶稍後獲取發送結果。這就是回調機制。
  • Fully non-blocking usage can make use of the Callback parameter to provide a callback that will be invoked when the request is complete.網絡

  • RecordMetadata 和 Exception 不可能同時爲空,消息發送成功時,Exception爲null,消息發送失敗時,metadata爲空。多線程

    ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
       
       producer.send(myRecord,
                     new Callback() {
                         public void onCompletion(RecordMetadata metadata, Exception e) {
                             if(e != null) {
                                e.printStackTrace();
                             } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                             }
                         }
                     });
    複製代碼

3.3 同步發送官方案例 (阻塞)

  • 經過 producer.send(record)返回Future對象,經過調用Future.get()進行無限等待結果返回。app

    producer.send(record).get()
    複製代碼

3.4 基於事務發送官方案例 (原子性和冪等性)

  • From Kafka 0.11, the KafkaProducer supports two additional modes: the idempotent producer and the transactional producer. The idempotent producer strengthens Kafka's delivery semantics from at least once to exactly once delivery. In particular producer retries will no longer introduce duplicates. The transactional producer allows an application to send messages to multiple partitions (and topics!) atomically.異步

  • To enable idempotence, the enable.idempotence configuration must be set to true. If set, the retries config will default to Integer.MAX_VALUE and the acks config will default to all. There are no API changes for the idempotent producer, so existing applications will not need to be modified to take advantage of this feature.ide

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
    
    producer.initTransactions();
    
    try {
        producer.beginTransaction();
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        // We can't recover from these exceptions, so our only option is to close the producer and exit.
        producer.close();
    } catch (KafkaException e) {
        // For all other exceptions, just abort the transaction and try again.
        producer.abortTransaction();
    }
    producer.close();
    複製代碼
  • As is hinted at in the example, there can be only one open transaction per producer. All messages sent between the beginTransaction() and commitTransaction() calls will be part of a single transaction. When the transactional.id is specified, all messages sent by the producer must be part of a transaction.post

3.5 可重試異常(繼承RetriableException)

  • LeaderNotAvailableException :分區的Leader副本不可用,這多是換屆選舉致使的瞬時的異常,重試幾回就能夠恢復
  • NotControllerException:Controller主要是用來選擇分區副本和每個分區leader的副本信息,主要負責統一管理分區信息等,也多是選舉所致。
  • NetWorkerException :瞬時網絡故障異常所致。

3.6 不可重試異常

  • SerializationException:序列化失敗異常
  • RecordToolLargeException:消息尺寸過大致使。

3.7 異常的區別對待

producer.send(myRecord,
                   new Callback() {
                       public void onCompletion(RecordMetadata metadata, Exception e) {
                           if(e ==null){
                               //正常處理邏輯
                               System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
                               
                           }else{
                                   
                                 if(e instanceof RetriableException) {
                                    //處理可重試異常
                                    ......
                                 } else {
                                    //處理不可重試異常
                                    ......
                                 }
                           }
                       }
                   });
複製代碼

3.8 Producer的紳士關閉

  • producer.close():優先把消息處理完畢,優雅退出。
  • producer.close(timeout): 超時時,強制關閉。

4 總結

爲了可以證實技術就是一層窗戶紙,我會把kafka剖析的體無完膚。學習

秦凱新 於深圳 2018

相關文章
相關標籤/搜索