消息中間件在解決異步處理,模塊間解耦和,和高流量場景的削峯,等狀況下有着很普遍的應用 .java
本文將跟你們一塊兒討論如下其中的異常場景,如題.git
在實際工做中,你們可能也都遇到過這樣的需求 :spring
如 : 系統A中的某些重要的數據,想在每次數據變動的時候,將當前最新的數據備份下來,固然,這個備份的動做不能影響當前數據變動的進程.sql
也更不指望由於備份的操做,影響當前進程的性能.數據庫
這是一個比較常見的,能夠異步處理的需求,業務數據變動 和 數據備份 之間並無強一致性的要求,大體的架構以下:springboot
producer做爲消息產生者,會經過指定的交換機(exchange)和路由鍵(routingkey),將消息傳輸到指定的隊列(queue)中,一般producer也會有多個節點網絡
consume做爲消息的消費者,會依次從隊列(queue)中拿到消息,進行業務處理,最終將數據寫入數據庫中,而且爲了更快速的消費消息,consume一般會部署多個節點,而且每一個節點中也會有多個線程同時消費消息多線程
queue做爲消息隊列,保證了消息被消費的時序性,以及惟一性(一條消息只能被消費一次)架構
dlxQueue做爲死信隊列,當queue中的的消息沒法被正常消費時,當重處理N次後,將會被放入死信隊列,並有專門的consume來消費和處理,好比:通知相關人員進行人工干預,等.併發
producer會源源不斷的產生消息,有新的數據,也有更新老的數據,
而consume則是拿到消息,作insert或者update的操做.
可是因爲consume是多線程併發消費消息的,那麼就會出現當前線程拿到的消息並不是最新版本的消息,若是這個時候進行了update操做的話,頗有可能會覆蓋掉已是最新版本的數據了
如 : 當前數據庫裏的數據爲1,業務操做先是將1改成了2,而後立刻的又將2改成了3,這兩個操做時間很是接近,幾乎是同時,而後產生的消息也幾乎同時的進入了消息中間件,
可是在queue裏依然有前後,2在前3在後(queue機制保證),那麼這個時候,consume來消費了,因爲consume是多線程的,因此,2和3會被分配到兩條線程中同時被處理
這時,若是2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3
可是,若是3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉
這種狀況顯然是不知足需求記錄當前最新的數據的,
而且這種狀況很容易發生,雖然queue裏保證了消息的前後,以及惟一性,可是消息被consume在線程中消費確實同時處理的
一般以上這種狀況,網絡上的一些解決方案,都是在數據中加入版本(version)的概念來解決,本文也是(上文說起的1,2,3,其實就是版本的概念).
一般網絡上的描述是,在update的時候,根據數據庫中的最新版本號,若是當前消息的版本號小於數據庫最新的版本號,則放棄更新,大於,則更新
這一段邏輯很簡單,可是也很容易產生誤解,最大的問題在於得到最新版本號,在多線程環境下,數據庫的髒讀也是蠻嚴重的,髒讀的存在,致使你查詢出來的數據並不是是最新的數據
如 : 上面的一個場景,數據庫中最新的版本號是1,有版本號2和3兩個消息是即將要消費的,按照上面的邏輯,處理程序應該先查數據庫,拿到當前最新的版本.
這個時候,兩條線程查詢到的結果有可能都是1,這時2>1,而且3>1,兩條線程依然都會執行,一樣的 :
若是2的這條線程先結束,3的這條線程後結束,那麼則數據正常,最終數據被更新成3
若是3的這條線程先結束了,2的這條線程是後結束的,那麼,最新的數據就會被老數據覆蓋掉
一樣達到想要的效果
其實要實現很簡單,首先,要知道,對於同一行數據,sql的執行也是有前後順序的,其實到底更新爲2的sql語句先執行,仍是更新爲3的sql語句先執行,並不重要
重要的是,將判斷版本號的語句放入更新條件中進行判斷.
例子 : 一樣是上面的場景,數據庫中的版本爲1,這時2和3同時更新,誰先結束,誰也不知道,也沒法控制(其實有辦法,可是損失性能,當前場景須要的是高效)
可是咱們能夠在條件中加入"version < 2"這樣的條件
SQL語句樣例 :
UPDATE TEST_MSG SET VERSION = #{version}, DATA = #{data}, LAST_UPDATE_DATE = #{lastUpdatedDate} WHERE BUSINESS_KEY = #{businessKey} AND VERSION < #{version}
這樣的話,不管那條線程先結束,都不會影響最終的結果
若是,2先結束,3線程的條件爲2 < 3,條件成立,數據將會被更新爲3
若是,3先結束,2線程的條件爲3 < 2,條件不成立,數據則不會更新(因爲是在sql執行過程當中判斷,因此這裏不存在髒讀的狀況)
這樣就能知足記錄當前最新的數據的需求了
spring.rabbitmq.host=itkk.org spring.rabbitmq.port=5672 spring.rabbitmq.username=dev_udf-sample spring.rabbitmq.password=1qazxsw2 spring.rabbitmq.virtual-host=/dev_udf-sample spring.rabbitmq.template.retry.enabled=true spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.concurrency=5 spring.rabbitmq.listener.simple.max-concurrency=10
以上配置文件配置了rabbitmq的鏈接,也指定了消費者監聽器的併發數量(5)和最大併發數量(10),而且開啓了重試,重試失敗的消息會被流轉到死信隊裏裏
@Configuration public class UdfServiceADemoConfig { public static final String EXCHANGE_ADEMO_TEST1 = "exchange.ademo.test1"; public static final String QUEUE_ADEMO_TEST1_CONSUME1 = "queue.ademo.test1.consume1"; public static final String ROUTINGKEY_ADEMO_TEST1_TESTMSG = "routingkey.ademo.test1.testmsg"; @Bean public DirectExchange exchangeAdemoTest1() { return new DirectExchange(EXCHANGE_ADEMO_TEST1, true, true); } @Bean public Queue queueAdemoTest1Consume1() { return new Queue(QUEUE_ADEMO_TEST1_CONSUME1, true, false, true); } @Bean public Binding queueAdemoTest1Consume1Binding() { return new Binding(QUEUE_ADEMO_TEST1_CONSUME1, Binding.DestinationType.QUEUE, EXCHANGE_ADEMO_TEST1, ROUTINGKEY_ADEMO_TEST1_TESTMSG, null); } }
exchangeAdemoTest1方法定義了一個交換機,而且是自動刪除的
queueAdemoTest1Consume1定義了一個消費者隊列,也是自動刪除的
queueAdemoTest1Consume1Binding將上面定義的交換機和消費者綁定起來,並設定了路由鍵(routingkey)
public class TestMsg implements Serializable { /** * 描述 : id */ private static final long serialVersionUID = 1L; /** * msgId */ private String msgId = UUID.randomUUID().toString(); /** * businessKey */ private String businessKey; /** * version */ private long version; /** * data */ private String data; /** * lastUpdatedDate */ private Date lastUpdatedDate; }
以上定義了消息的格式,主要的字段就是businessKey和version,分別用來肯定惟一的業務數據和版本的判斷
@Autowired private AmqpTemplate amqpTemplate; @Scheduled(fixedRate = SCHEDULED_FIXEDRATE) public void send1() { this.send(); } public void send2() { ..... } /** * send */ private void send() { final int numA = 1000; int a = (int) (Math.random() * numA); long b = (long) (Math.random() * numA); TestMsg testMsg = new TestMsg(); testMsg.setBusinessKey(Integer.toString(a)); testMsg.setVersion(b); testMsg.setData(UUID.randomUUID().toString()); testMsg.setLastUpdatedDate(new Date()); amqpTemplate.convertAndSend(UdfServiceADemoConfig.EXCHANGE_ADEMO_TEST1, UdfServiceADemoConfig.ROUTINGKEY_ADEMO_TEST1_TESTMSG, testMsg); }
以上定義了用於作測試的消息發送方,使用計劃任務,按期的向交換機中寫入數據,能夠定義多個計劃任務,增長同一時間消息產生的數量
@RabbitListener(queues = UdfServiceADemoConfig.QUEUE_ADEMO_TEST1_CONSUME1) public void consume1(TestMsg testMsg) { if (testMsgRespository.count(testMsg.getBusinessKey()) > 0) { int row = testMsgRespository.update(testMsg); log.info("update row = {}", row); } else { try { int row = testMsgRespository.insert(testMsg); log.info("insert row = {}", row); } catch (Exception e) { //進行異常判斷,肯定是主鍵衝突錯誤 int row = testMsgRespository.update(testMsg); log.info("update row = {}", row); } } try { final long time = 5L; Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } }
以上定義了消息消費的方法,此方法是多線程執行的,大概邏輯是,先count數據庫,判斷數據是否存在,若是存在,則update數據,若是不存在,則insert數據
可是count也有可能存在髒讀的狀況,因此insert操做有可能會由於主鍵重複而失敗,這時,會捕獲到異常,經過異常判斷,肯定是主鍵衝突錯誤後(樣例代碼中省略了),在進行update操做
這裏的update操做,則是上文提到的採用"version < #{updateVersion}"的方法進行更新,保證了將最新的數據更新到數據庫中
最後的線程休眠,是爲了模擬處理時間,以便形成更多的併發狀況
int count(@Param("businessKey") String businessKey); int insert(TestMsg testMsg); int update(TestMsg testMsg); <select id="count" resultType="int"> SELECT COUNT(*) FROM TEST_MSG WHERE BUSINESS_KEY = #{businessKey} </select> <insert id="insert"> INSERT INTO TEST_MSG ( BUSINESS_KEY, VERSION, DATA, LAST_UPDATE_DATE ) VALUES ( #{businessKey}, #{version}, #{data}, #{lastUpdatedDate} ) </insert> <update id="update"> UPDATE TEST_MSG SET VERSION = #{version}, DATA = #{data}, LAST_UPDATE_DATE = #{lastUpdatedDate} WHERE BUSINESS_KEY = #{businessKey} AND VERSION <![CDATA[ < ]]> #{version} </update>
以上爲相關的sqlmap定義以及mapper接口的定義
CREATE TABLE TEST_MSG ( BUSINESS_KEY VARCHAR(100) NOT NULL PRIMARY KEY, VERSION BIGINT NOT NULL, DATA VARCHAR(100) NOT NULL, LAST_UPDATE_DATE DATETIME NOT NULL ) COMMENT 'TEST_MSG';
以上爲表結構的定義
這樣,啓動應用,觀察數據庫表更新狀況,會發現,數據的版本只會有增加的,不會存在下降的
那麼,咱們這實現了本文中開頭提到的需求了.
而且也瞭解了在springboot中如何使用rabbitmq發送和消費消息了.
關於本文內容 , 歡迎你們的意見跟建議
想得到最快更新,請關注公衆號