消息隊列的消費冪等性如何保證

什麼是冪等?

任意屢次執行所產生的影響均與一次執行的影響相同就能夠稱爲冪等java

什麼是消息冪等?

當出現消費者對某條消息重複消費的狀況時,重複消費的結果與消費一次的結果是相同的,而且屢次消費並未對業務系統產生任何負面影響git

爲何咱們要保證冪等性,不保證冪等性,會不會有問題?

這個問題其實無法準確回答。回答這個問題的根源得從業務場景上進行分析。好比正常業務狀況下,咱們是不容許同個訂單重複支付,這種業務場景咱們就須要確保冪等性。再好比日誌記錄,這種業務場景,咱們可能就不須要作冪等判斷。github

所以是否要保證冪等性,得基於業務進行考量redis

消息隊列的消費冪等性如何保證?

無法保證。前面說了要保證冪等性,得基於業務場景進行考量。消息隊列他自己就不是給你用來作業務冪等性用的。若是你要實現業務冪等性,靠消息隊列是無法幫你完成的,你本身得根據自身業務場景,來實現冪等。spring

經常使用的業務冪等性保證方法

一、利用數據庫的惟一約束實現冪等

好比將訂單表中的訂單編號設置爲惟一索引,建立訂單時,根據訂單編號就能夠保證冪等數據庫

二、去重表

這個方案本質也是根據數據庫的惟一性約束來實現。其實現大致思路是:首先在去重表上建惟一索引,其次操做時把業務表和去重表放在同個本地事務中,若是出現重現重複消費,數據庫會拋惟一約束異常,操做就會回滾apache

三、利用redis的原子性

每次操做都直接set到redis裏面,而後將redis數據定時同步到數據庫中bootstrap

四、多版本(樂觀鎖)控制

此方案多用於更新的場景下。其實現的大致思路是:給業務數據增長一個版本號屬性,每次更新數據前,比較當前數據的版本號是否和消息中的版本一致,若是不一致則拒絕更新數據,更新數據的同時將版本號+1springboot

五、狀態機機制

此方案多用於更新且業務場景存在多種狀態流轉的場景服務器

六、token機制

生產者發送每條數據的時候,增長一個全局惟一的id,這個id一般是業務的惟一標識,好比訂單編號。在消費端消費時,則驗證該id是否被消費過,若是還沒消費過,則進行業務處理。處理結束後,在把該id存入redis,同時設置狀態爲已消費。若是已經消費過了,則不進行處理。

演示

例子使用springboot2加kafka來演示一下使用token機制如何實現消費端冪等

一、application.yml
spring:
  redis:
    host: localhost
    port: 6379
    # 鏈接超時時間(毫秒)
    timeout: 10000
    jedis:
      pool:
        # 鏈接池中的最大空閒鏈接
        max-idle: 8
        # 鏈接池中的最小空閒鏈接
        min-idle: 10
        # 鏈接池最大鏈接數(使用負值表示沒有限制)
        max-active: 100
        # 鏈接池最大阻塞等待時間(使用負值表示沒有限制)
        max-wait: -1
    password:
  kafka:
    # 以逗號分隔的地址列表,用於創建與Kafka集羣的初始鏈接(kafka 默認的端口號爲9092)
    bootstrap-servers: localhost:9092
    producer:
      # 發生錯誤後,消息重發的次數。
      retries: 0
      #當有多個消息須要被髮送到同一個分區時,生產者會把它們放在同一個批次裏。該參數指定了一個批次可使用的內存大小,按照字節數計算。
      batch-size: 16384
      # 設置生產者內存緩衝區的大小。
      buffer-memory: 33554432
      # 鍵的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: com.github.lybgeek.kafka.serialization.ObjectSerializer
      # acks=0 : 生產者在成功寫入消息以前不會等待任何來自服務器的響應。
      # acks=1 : 只要集羣的首領節點收到消息,生產者就會收到一個來自服務器成功響應。
      # acks=all :只有當全部參與複製的節點所有收到消息時,生產者纔會收到一個來自服務器的成功響應。
      acks: 1
    consumer:
      # 自動提交的時間間隔 在spring boot 2.X 版本中這裏採用的是值的類型爲Duration 須要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的狀況下該做何處理:
      # latest(默認值)在偏移量無效的狀況下,消費者將從最新的記錄開始讀取數據(在消費者啓動以後生成的記錄)
      # earliest :在偏移量無效的狀況下,消費者將從起始位置讀取分區的記錄
      auto-offset-reset: earliest
      # 是否自動提交偏移量,默認值是true,爲了不出現重複數據和數據丟失,能夠把它設置爲false,而後手動提交偏移量
      enable-auto-commit: false
      # 鍵的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: com.github.lybgeek.kafka.serialization.ObjectDeserializer
    listener:
      # 在偵聽器容器中運行的線程數。
      concurrency: 1
      #listner負責ack,每調用一次,就當即commit
      ack-mode: manual_immediate
二、實現kafka的自定義序列和反序列

:kakfa默認的序列化和反序列方式是StringSerializer和StringDeserializer。咱們要改形成支持對象的序列化和反序列化

a、序列化

public class ObjectSerializer implements Serializer<Object> {


    @Override
    public byte[] serialize(String topic, Object object) {
        return BeanUtils.serialize(object);
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

b、反序列化

public class ObjectDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] bytes) {
        return BeanUtils.deserialize(bytes);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }
}
三、消息對象
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class MessageDTO<T> implements Serializable {

    private String messageId;


    private T data;
}
四、生產者

:本例子簡單模擬生產者屢次生產同個消息,進而達到屢次消費的效果

@Slf4j
@Component
public class KafkaProducer implements CommandLineRunner {


    @Autowired
    private KafkaTemplate kafkaTemplate;

    private int threadNum = 2;

    private ExecutorService executorService = Executors.newFixedThreadPool(threadNum);

    private CountDownLatch countDownLatch = new CountDownLatch(threadNum);


    @Override
    public void run(String... args) throws Exception {
          send();
    }


    private void send(){
        for(int i = 0; i < threadNum; i++){
            executorService.submit(()->{
                try {
                    countDownLatch.await();
                } catch (InterruptedException e) {
                   log.error(e.getMessage(),e);
                }
                String messageId = "b14701b8-4b08-4bbd-8a4e-70f76a432e99";

                MessageDTO messageDTO = MessageDTO.builder().messageId(messageId).data("hello").build();
                kafkaTemplate.send(Constant.TOPIC,messageDTO);
            });

            countDownLatch.countDown();
        }

    }
}
五、消費者
@Component
@Slf4j
public class KafkaConsumer {

    @Autowired
    private RedisUtils redisUtils;

    @KafkaListener(id = "msgId",topics = {Constant.TOPIC})
    public void receive(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){

        boolean isRepeateConsume = checkRepeateConsume(record.value().getMessageId());
        if(isRepeateConsume){
            log.error("重複消費。。。。");
            //手工確認
            ack.acknowledge();
            return;
        }


       doBiz(record,ack);
    }

    private boolean checkRepeateConsume(String messageId){
        Object consumeStatus = redisUtils.get(messageId);
        /**
         * 一、若是redis存在消息ID,且消費狀態爲已消費,則說明是重複消費,此時消費端丟棄該消息
         */
        if(Objects.nonNull(consumeStatus) && "已消費".equals(consumeStatus.toString())){
           // log.error("重複消費。。。。");
            return true;
        }

        /**
         * 二、若是redis不存在消息id,或者狀態不是已消費,則從業務方面進行判重
         *
         *  業務判重的能夠考慮以下方法:
         *  若是該業務是存在狀態流轉,則採用狀態機策略進行判重。
         *  若是該業務不是狀態流轉類型,則在新增時,根據業務設置一個惟一的屬性,好比根據訂單編號的惟一性;
         *  更新時,能夠採用多版本策略,在須要更新的業務表上加上版本號
         */
        return checkRepeateByBiz(messageId);
    }



    /**
     * 模擬業務消費
     * @param messageDTO
     * @param ack
     */
    private void doBiz(ConsumerRecord<String, MessageDTO<String>> record,Acknowledgment ack){
        System.out.println("------模擬業務處理-----------");
        System.out.println("--------執行業務處理:"+record.value()+"------------");
        System.out.println("--------------一、業務處理完畢-----------");
        try {
            redisUtils.setEx(record.value().getMessageId(), "已消費",12, TimeUnit.HOURS);
            System.out.println("-------------二、業務處理完畢後,把全局ID存入redis,並設置值爲已消費");
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("----------三、業務處理完畢後,消費端手工確認");
        //手工確認
        ack.acknowledge();

    }

}
六、效果
2020-08-09 16:25:32.701  INFO 9552 --- [    msgId-0-C-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library
------模擬業務處理-----------
--------執行業務處理:MessageDTO(messageId=b14701b8-4b08-4bbd-8a4e-70f76a432e99, data=hello)------------
--------------一、業務處理完畢-----------
-------------二、業務處理完畢後,把全局ID存入redis,並設置值爲已消費
----------三、業務處理完畢後,消費端手工確認
2020-08-09 16:25:36.021 ERROR 9552 --- [    msgId-0-C-1] c.g.l.kafka.consumer.KafkaConsumer       : 重複消費。。。。

總結

消息隊列無法幫你作到消費端的冪等性,消費端的冪等性得基於業務場景進行實現。不過消息隊列必須得保證消息不能丟,至少保證被消費一次,否則消息都丟了,沒數據搞啥業務冪等。在實現消費端處理業務時,要確保消費端是採用手工確認應答機制,而不是自動應答機制。這樣可以確保消費端一旦業務處理失敗,生產者還能再次發送同個消息給消費端

demo連接

https://github.com/lyb-geek/s...
相關文章
相關標籤/搜索