spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的?

spring boot / cloud (十九) 併發消費消息,如何保證入庫的數據是最新的?

消息中間件在解決異步處理,模塊間解耦和,和高流量場景的削峯,等狀況下有着很普遍的應用 .java

本文將跟你們一塊兒討論如下其中的異常場景,如題.git

場景

在實際工做中,你們可能也都遇到過這樣的需求 :spring

如 : 系統A中的某些重要的數據,想在每次數據變動的時候,將當前最新的數據備份下來,固然,這個備份的動做不能影響當前數據變動的進程.sql

也更不指望由於備份的操做,影響當前進程的性能.數據庫

分析

這是一個比較常見的,能夠異步處理的需求,業務數據變動 和 數據備份 之間並無強一致性的要求,大體的架構以下:springboot

併發消費消息

producer做爲消息產生者,會經過指定的交換機(exchange)和路由鍵(routingkey),將消息傳輸到指定的隊列(queue)中,一般producer也會有多個節點網絡

consume做爲消息的消費者,會依次從隊列(queue)中拿到消息,進行業務處理,最終將數據寫入數據庫中,而且爲了更快速的消費消息,consume一般會部署多個節點,而且每一個節點中也會有多個線程同時消費消息多線程

queue做爲消息隊列,保證了消息被消費的時序性,以及惟一性(一條消息只能被消費一次)架構

dlxQueue做爲死信隊列,當queue中的的消息沒法被正常消費時,當重處理N次後,將會被放入死信隊列,並有專門的consume來消費和處理,好比:通知相關人員進行人工干預,等.併發

問題

producer會源源不斷的產生消息,有新的數據,也有更新老的數據,

而consume則是拿到消息,作insert或者update的操做.

可是因爲consume是多線程併發消費消息的,那麼就會出現當前線程拿到的消息並不是最新版本的消息,若是這個時候進行了update操做的話,頗有可能會覆蓋掉已是最新版本的數據了

如 : 當前數據庫裏的數據爲1,業務操做先是將1改成了2,而後立刻的又將2改成了3,這兩個操做時間很是接近,幾乎是同時,而後產生的消息也幾乎同時的進入了消息中間件,

可是在queue裏依然有前後,2在前3在後(queue機制保證),那麼這個時候,consume來消費了,因爲consume是多線程的,因此,2和3會被分配到兩條線程中同時被處理

這時,若是2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3

可是,若是3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉

這種狀況顯然是不知足需求記錄當前最新的數據的,

而且這種狀況很容易發生,雖然queue裏保證了消息的前後,以及惟一性,可是消息被consume在線程中消費確實同時處理的

髒讀的問題

一般以上這種狀況,網絡上的一些解決方案,都是在數據中加入版本(version)的概念來解決,本文也是(上文說起的1,2,3,其實就是版本的概念).

一般網絡上的描述是,在update的時候,根據數據庫中的最新版本號,若是當前消息的版本號小於數據庫最新的版本號,則放棄更新,大於,則更新

這一段邏輯很簡單,可是也很容易產生誤解,最大的問題在於得到最新版本號,在多線程環境下,數據庫的髒讀也是蠻嚴重的,髒讀的存在,致使你查詢出來的數據並不是是最新的數據

如 : 上面的一個場景,數據庫中最新的版本號是1,有版本號2和3兩個消息是即將要消費的,按照上面的邏輯,處理程序應該先查數據庫,拿到當前最新的版本.

這個時候,兩條線程查詢到的結果有可能都是1,這時2>1,而且3>1,兩條線程依然都會執行,一樣的 :

若是2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3

若是3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉

一樣達到想要的效果

如何保證入庫的數據是最新的?

其實要實現很簡單,首先,要知道,對於同一行數據,sql的執行也是有前後順序的,其實到底更新爲2的sql語句先執行,仍是更新爲3的sql語句先執行,並不重要

重要的是,將判斷版本號的語句放入更新條件中進行判斷.

例子 : 一樣是上面的場景,數據庫中的版本爲1,這時2和3同時更新,誰先結束,誰也不知道,也沒法控制(其實有辦法,可是損失性能,當前場景須要的是高效)

可是咱們能夠在條件中加入"version < 2"這樣的條件

SQL語句樣例 :

UPDATE TEST_MSG
SET
    VERSION          = #{version},
    DATA             = #{data},
    LAST_UPDATE_DATE = #{lastUpdatedDate}
WHERE BUSINESS_KEY = #{businessKey} AND VERSION  <  #{version}

這樣的話,不管那條線程先結束,都不會影響最終的結果

若是,2先結束,3線程的條件爲2 < 3,條件成立,數據將會被更新爲3

若是,3先結束,2線程的條件爲3 < 2,條件不成立,數據則不會更新(因爲是在sql執行過程當中判斷,因此這裏不存在髒讀的狀況)

這樣就能知足記錄當前最新的數據的需求了

實現 (springboot使用rabbitmq的例子)

spring.rabbitmq.host=itkk.org
spring.rabbitmq.port=5672
spring.rabbitmq.username=dev_udf-sample
spring.rabbitmq.password=1qazxsw2
spring.rabbitmq.virtual-host=/dev_udf-sample
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.listener.simple.retry.enabled=true
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.max-concurrency=10

以上配置文件配置了rabbitmq的鏈接,也指定了消費者監聽器的併發數量(5)和最大併發數量(10),而且開啓了重試,重試失敗的消息會被流轉到死信隊裏裏

@Configuration
public class UdfServiceADemoConfig {

    public static final String EXCHANGE_ADEMO_TEST1 = "exchange.ademo.test1";

    public static final String QUEUE_ADEMO_TEST1_CONSUME1 = "queue.ademo.test1.consume1";

    public static final String ROUTINGKEY_ADEMO_TEST1_TESTMSG = "routingkey.ademo.test1.testmsg";

    @Bean
    public DirectExchange exchangeAdemoTest1() {
        return new DirectExchange(EXCHANGE_ADEMO_TEST1, true, true);
    }

    @Bean
    public Queue queueAdemoTest1Consume1() {
        return new Queue(QUEUE_ADEMO_TEST1_CONSUME1, true, false, true);
    }

    @Bean
    public Binding queueAdemoTest1Consume1Binding() {
        return new Binding(QUEUE_ADEMO_TEST1_CONSUME1, 
        Binding.DestinationType.QUEUE, EXCHANGE_ADEMO_TEST1, 
        ROUTINGKEY_ADEMO_TEST1_TESTMSG, null);
    }
}

exchangeAdemoTest1方法定義了一個交換機,而且是自動刪除的

queueAdemoTest1Consume1定義了一個消費者隊列,也是自動刪除的

queueAdemoTest1Consume1Binding將上面定義的交換機和消費者綁定起來,並設定了路由鍵(routingkey)

public class TestMsg implements Serializable {
    /**
     * 描述 : id
     */
    private static final long serialVersionUID = 1L;

    /**
     * msgId
     */
    private String msgId = UUID.randomUUID().toString();

    /**
     * businessKey
     */
    private String businessKey;

    /**
     * version
     */
    private long version;

    /**
     * data
     */
    private String data;

    /**
     * lastUpdatedDate
     */
    private Date lastUpdatedDate;
}

以上定義了消息的格式,主要的字段就是businessKey和version,分別用來肯定惟一的業務數據和版本的判斷

@Autowired
    private AmqpTemplate amqpTemplate;

    @Scheduled(fixedRate = SCHEDULED_FIXEDRATE)
    public void send1() {
        this.send();
    }
    
    public void send2() {
        .....
    }

    /**
     * send
     */
    private void send() {
        final int numA = 1000;
        int a = (int) (Math.random() * numA);
        long b = (long) (Math.random() * numA);
        TestMsg testMsg = new TestMsg();
        testMsg.setBusinessKey(Integer.toString(a));
        testMsg.setVersion(b);
        testMsg.setData(UUID.randomUUID().toString());
        testMsg.setLastUpdatedDate(new Date());
        amqpTemplate.convertAndSend(UdfServiceADemoConfig.EXCHANGE_ADEMO_TEST1, 
        UdfServiceADemoConfig.ROUTINGKEY_ADEMO_TEST1_TESTMSG, testMsg);
    }

以上定義了用於作測試的消息發送方,使用計劃任務,按期的向交換機中寫入數據,能夠定義多個計劃任務,增長同一時間消息產生的數量

@RabbitListener(queues = UdfServiceADemoConfig.QUEUE_ADEMO_TEST1_CONSUME1)
    public void consume1(TestMsg testMsg) {
        if (testMsgRespository.count(testMsg.getBusinessKey()) > 0) {
            int row = testMsgRespository.update(testMsg);
            log.info("update row = {}", row);
        } else {
            try {
                int row = testMsgRespository.insert(testMsg);
                log.info("insert row = {}", row);
            } catch (Exception e) {
                //進行異常判斷,肯定是主鍵衝突錯誤
                int row = testMsgRespository.update(testMsg);
                log.info("update row = {}", row);
            }
        }
        try {
            final long time = 5L;
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

以上定義了消息消費的方法,此方法是多線程執行的,大概邏輯是,先count數據庫,判斷數據是否存在,若是存在,則update數據,若是不存在,則insert數據

可是count也有可能存在髒讀的狀況,因此insert操做有可能會由於主鍵重複而失敗,這時,會捕獲到異常,經過異常判斷,肯定是主鍵衝突錯誤後(樣例代碼中省略了),在進行update操做

這裏的update操做,則是上文提到的採用"version < #{updateVersion}"的方法進行更新,保證了將最新的數據更新到數據庫中

最後的線程休眠,是爲了模擬處理時間,以便形成更多的併發狀況

int count(@Param("businessKey") String businessKey);


    int insert(TestMsg testMsg);


    int update(TestMsg testMsg);

    <select id="count" resultType="int">
        SELECT COUNT(*)
        FROM TEST_MSG
        WHERE BUSINESS_KEY = #{businessKey}
    </select>

    <insert id="insert">
        INSERT INTO TEST_MSG
        (
            BUSINESS_KEY,
            VERSION,
            DATA,
            LAST_UPDATE_DATE
        )
        VALUES
            (
                #{businessKey},
                #{version},
                #{data},
                #{lastUpdatedDate}
            )
    </insert>

    <update id="update">
        UPDATE TEST_MSG
        SET
            VERSION          = #{version},
            DATA             = #{data},
            LAST_UPDATE_DATE = #{lastUpdatedDate}
        WHERE BUSINESS_KEY = #{businessKey} AND VERSION <![CDATA[ < ]]> #{version}
    </update>

以上爲相關的sqlmap定義以及mapper接口的定義

CREATE TABLE TEST_MSG
(
  BUSINESS_KEY     VARCHAR(100) NOT NULL
    PRIMARY KEY,
  VERSION          BIGINT       NOT NULL,
  DATA             VARCHAR(100) NOT NULL,
  LAST_UPDATE_DATE DATETIME     NOT NULL
)
  COMMENT 'TEST_MSG';

以上爲表結構的定義

結束

這樣,啓動應用,觀察數據庫表更新狀況,會發現,數據的版本只會有增加的,不會存在下降的

那麼,咱們這實現了本文中開頭提到的需求了.

而且也瞭解了在springboot中如何使用rabbitmq發送和消費消息了.

關於本文內容 , 歡迎你們的意見跟建議

代碼倉庫 (博客配套代碼)


想得到最快更新,請關注公衆號

想得到最快更新,請關注公衆號

相關文章
相關標籤/搜索