上一篇介紹了移山(數據遷移平臺)實時數據同步的總體架構;
本文主要介紹移山(數據遷移平臺)實時數據同步是如何保證消息的順序性。
能夠訪問 這裏 查看更多關於 大數據平臺建設的原創文章。
- 消息生產端將消息發送給同一個MQ服務器的同一個分區,而且按順序發送;
- 消費消費端按照消息發送的順序進行消費。
在某些業務功能場景下須要保證消息的發送和接收順序是一致的,不然會影響數據的使用。mysql
移山的實時數據同步使用canal
組件訂閱MySQL數據庫的日誌,並將其投遞至 kafka 中(想了解移山實時同步服務架構設計的能夠點 這裏);
kafka 消費端再根據具體的數據使用場景去處理數據(存入 HBase、MySQL 或直接作實時分析);
因爲binlog 自己是有序的,所以寫入到mq以後也須要保障順序。
實時同步服務消息處理總體流程以下:算法
咱們主要經過如下兩個方面去保障保證消息的順序性。sql
要保證同一個訂單的屢次修改到達 kafka 裏的順序不能亂,能夠在Producer 往 kafka 插入數據時,控制同一個key (能夠採用訂單主鍵key-hash算法來實現)發送到同一 partition,這樣就能保證同一筆訂單都會落到同一個 partition 內。數據庫
canal 目前支持的mq有kafka/rocketmq
,本質上都是基於本地文件的方式來支持了分區級的順序消息的能力。咱們只需在配置 instance 的時候開啓以下配置便可:apache
1> canal.propertiesbootstrap
# leader節點會等待全部同步中的副本確認以後再確認這條記錄是否發送完成 canal.mq.acks = all
備註:服務器
2> instance.properties微信
# 散列模式的分區數 canal.mq.partitionsNum=2 # 散列規則定義 庫名.表名: 惟一主鍵,多個表之間用逗號分隔 canal.mq.partitionHash=test.lyf_canal_test:id
備註:網絡
查看指定topic的指定分區的消息,可使用以下命令:多線程
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
將同一個訂單數據經過指定key的方式發送到同一個 partition 能夠解決大部分狀況下的數據亂序問題。
對於一個有着前後順序的消息A、B,正常狀況下應該是A先發送完成後再發送B。可是在異常狀況下:
移山的實時同步服務會在將訂閱到的數據存入HBase以前再加一層亂序處理 。
使用 mysqlbinlog
查看 binlog:
/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001
執行時間和偏移量:
備註:
--base64-output=decode-rows -v
參數來解碼;偏移量:
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test'); Query OK, 1 row affected (0.00 sec) MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13; Query OK, 1 row affected (0.00 sec)
把插入,第一次更新,第二次更新這三次操做產生的 binlog 被 canal server 推送至 kafka 中的消息分別稱爲:消息A,消息B,消息C。
假設因爲不可知的網絡緣由:
咱們利用HBase的特性,將數據主鍵作爲目標表的 rowkey。當 kafka 消費端消費到數據時,亂序處理主要流程(摘自禧雲數芯大數據平臺技術白皮書)以下:
demo的三條消息處理流程以下:
1> 判斷消息A 的主鍵id作爲rowkey在hbase的目標表中不存在,則將消息A的數據直接插入HBase:
2> 消息C 的主鍵id作爲rowkey,已經在目標表中存在,則這時須要拿消息C 的執行時間和表中存儲的執行時間去判斷:
消息C 中的執行時間等於表中存儲的執行時間,則這時須要拿消息C 的偏移量和表中存儲的偏移量去判斷:
3> 消息B 的主鍵id作爲rowkey,已經在目標表中存在,則這時須要拿消息B 的執行時間和表中存儲的執行時間去判斷:
kafka 消費端將消費到的消息進行格式化處理和組裝,並藉助 HBase-client API
來完成對 HBase 表的操做。
1> 使用Put
組裝單行數據
/** * 包名: org.apache.hadoop.hbase.client.Put * hbaseData 爲從binlog訂閱到的數據,經過循環,爲目標HBase表 * 添加rowkey、列簇、列數據。 * 做用:用來對單個行執行加入操做。 */ Put put = new Put(Bytes.toBytes(hbaseData.get("id"))); // hbaseData 爲從binlog訂閱到的數據,經過循環,爲目標HBase表添加列簇和列 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));
2> 使用 checkAndMutate
,更新HBase
表的數據
只有服務端對應rowkey的列數據與預期的值符合指望條件(大於、小於、等於)時,纔會將put操做提交至服務端。
// 若是 update_info(列族) execute_time(列) 不存在值就插入數據,若是存在則返回false boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put); // 若是存在,則去比較執行時間 if (!res1) { // 若是本次傳遞的執行時間大於HBase中的執行時間,則插入put boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"),Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put); // 執行時間相等時,則去比較偏移量,本次傳遞的值大於HBase中的值則插入put if (!res2) { boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put); } }
歡迎你們關注個人微信公衆號閱讀更多文章: