上一篇 介紹了移山(數據遷移平臺)實時數據同步的總體架構;
本文主要介紹移山(數據遷移平臺)實時數據同步是如何保證消息的順序性。node能夠訪問 這裏 查看更多關於大數據平臺建設的原創文章。mysql
消息生產端將消息發送給同一個MQ服務器的同一個分區,而且按順序發送;算法
消費消費端按照消息發送的順序進行消費。sql
在某些業務功能場景下須要保證消息的發送和接收順序是一致的,不然會影響數據的使用。數據庫
移山的實時數據同步使用
canal
組件訂閱MySQL數據庫的日誌,並將其投遞至 kafka 中(想了解移山實時同步服務架構設計的能夠點 這裏);
kafka 消費端再根據具體的數據使用場景去處理數據(存入 HBase、MySQL 或直接作實時分析);
因爲binlog 自己是有序的,所以寫入到mq以後也須要保障順序。apache
假如如今移山建立了一個實時同步任務,而後訂閱了一個業務數據庫的訂單表;bootstrap
上游業務,向訂單表裏插入了一個訂單,而後對該訂單又作了一個更新操做,則 binlog 裏會自動寫入插入操做和更新操做的數據,這些數據會被 canal server 投遞至 kafka broker 裏面;服務器
若是 kafka 消費端先消費到了更新日誌,後消費到插入日誌,則在往目標表裏作操做時就會由於數據缺失致使發生異常。微信
實時同步服務消息處理總體流程以下:網絡
咱們主要經過如下兩個方面去保障保證消息的順序性。
kafka 的同一個 partition 用一個write ahead log組織, 是一個有序的隊列,因此能夠保證FIFO的順序;
所以生產者按照必定的順序發送消息,broker 就會按照這個順序把消息寫入 partition,消費者也會按照相同的順序去讀取消息;
kafka 的每個 partition 不會同時被兩個消費者實例消費,由此能夠保證消息消費的順序性。
要保證同一個訂單的屢次修改到達 kafka 裏的順序不能亂,能夠在Producer 往 kafka 插入數據時,控制同一個key (能夠採用訂單主鍵key-hash算法來實現)發送到同一 partition,這樣就能保證同一筆訂單都會落到同一個 partition 內。
canal 目前支持的mq有kafka/rocketmq
,本質上都是基於本地文件的方式來支持了分區級的順序消息的能力。咱們只需在配置 instance 的時候開啓以下配置便可:
1> canal.properties
# leader節點會等待全部同步中的副本確認以後再確認這條記錄是否發送完成
canal.mq.acks = all
備註:
這樣只要至少有一個同步副本存在,記錄就不會丟失。
2> instance.properties
1 # 散列模式的分區數 2 canal.mq.partitionsNum=2 3 # 散列規則定義 庫名.表名: 惟一主鍵,多個表之間用逗號分隔 4 canal.mq.partitionHash=test.lyf_canal_test:id
備註:
同一條數據的增刪改操做 產生的 binlog 數據都會寫到同一個分區內;
查看指定topic的指定分區的消息,可使用以下命令:
bin/kafka-console-consumer.sh --bootstrap-server serverlist --topic topicname --from-beginning --partition 0
將同一個訂單數據經過指定key的方式發送到同一個 partition 能夠解決大部分狀況下的數據亂序問題。
對於一個有着前後順序的消息A、B,正常狀況下應該是A先發送完成後再發送B。可是在異常狀況下:
A發送失敗了,B發送成功,而A因爲重試機制在B發送完成以後重試發送成功了;
這時對於自己順序爲AB的消息順序變成了BA。
移山的實時同步服務會在將訂閱到的數據存入HBase以前再加一層亂序處理 。
使用 mysqlbinlog
查看 binlog:
/usr/bin/mysqlbinlog --base64-output=decode-rows -v /var/lib/mysql/mysql-bin.000001
執行時間和偏移量:
備註:
每條數據都會有執行時間和偏移量這兩個重要信息,下邊的校驗邏輯核心正是藉助了這兩個值;
執行的sql 語句在 binlog 中是以base64編碼格式存儲的,若是想查看sql 語句,須要加上:--base64-output=decode-rows -v
參數來解碼;
偏移量:
Position 就表明 binlog 寫到這個偏移量的地方,也就是寫了這麼多字節,即當前 binlog 文件的大小;
也就是說後寫入數據的 Position 確定比先寫入數據的 Position 大,所以能夠根據 Position 大小來判斷消息的順序。
MariaDB [test]> insert into lyf_canal_test (name,status,content) values('demo1',1,'demo1 test'); Query OK, 1 row affected (0.00 sec) MariaDB [test]> update lyf_canal_test set name = 'demo update' where id = 13; Query OK, 1 row affected (0.00 sec) Rows matched: 1 Changed: 1 Warnings: 0 MariaDB [test]> update lyf_canal_test set name = 'demo update2',content='second update',status=2 where id = 13; Query OK, 1 row affected (0.00 sec)
把插入,第一次更新,第二次更新這三次操做產生的 binlog 被 canal server 推送至 kafka 中的消息分別稱爲:消息A,消息B,消息C。
消息A:
消息B:
消息C:
假設因爲不可知的網絡緣由:
kafka broker收到的三條消息分別爲:消息A,消息C,消息B;
則 kafka 消費端消費到的這三條消息前後順序就是:消息A,消息C,消息B
這樣就形成了消息的亂序,所以訂閱到的數據在存入目標表前必須得加亂序校驗處理。
咱們利用HBase的特性,將數據主鍵作爲目標表的 rowkey。當 kafka 消費端消費到數據時,亂序處理主要流程(摘自禧雲數芯大數據平臺技術白皮書)以下:
demo的三條消息處理流程以下:
1> 判斷消息A 的主鍵id作爲rowkey在hbase的目標表中不存在,則將消息A的數據直接插入HBase:
2> 消息C 的主鍵id作爲rowkey,已經在目標表中存在,則這時須要拿消息C 的執行時間和表中存儲的執行時間去判斷:
若是消息C 中的執行時間小於表中存儲的執行時間,則證實消息C 是重複消息或亂序的消息,直接丟棄;
消息C 中的執行時間大於表中存儲的執行時間,則直接更新表數據(本demo即符合該種場景):
消息C 中的執行時間等於表中存儲的執行時間,則這時須要拿消息C 的偏移量和表中存儲的偏移量去判斷:
消息C 中的偏移量小於表中存儲的偏移量,則證實消息C 是重複消息,直接丟棄;
消息C 中的偏移量大於等於表中存儲的偏移量,則直接更新表數據。
3> 消息B 的主鍵id作爲rowkey,已經在目標表中存在,則這時須要拿消息B 的執行時間和表中存儲的執行時間去判斷:
因爲消息B中的執行時間小於表中存儲的執行時間(即消息C 的執行時間),所以消息B 直接丟棄。
kafka 消費端將消費到的消息進行格式化處理和組裝,並藉助 HBase-client API
來完成對 HBase 表的操做。
1> 使用Put
組裝單行數據
/** * 包名: org.apache.hadoop.hbase.client.Put * hbaseData 爲從binlog訂閱到的數據,經過循環,爲目標HBase表 * 添加rowkey、列簇、列數據。 * 做用:用來對單個行執行加入操做。 */ Put put = new Put(Bytes.toBytes(hbaseData.get("id"))); // hbaseData 爲從binlog訂閱到的數據,經過循環,爲目標HBase表添加列簇和列 put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(mapKey), Bytes.toBytes(hbaseData.get(mapKey)));
2> 使用 checkAndMutate
,更新HBase
表的數據
只有服務端對應rowkey的列數據與預期的值符合指望條件(大於、小於、等於)時,纔會將put操做提交至服務端。
// 若是 update_info(列族) execute_time(列) 不存在值就插入數據,若是存在則返回false boolean res1 = table.checkAndMutate(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info")) .qualifier(Bytes.toBytes("execute_time")).ifNotExists().thenPut(put); // 若是存在,則去比較執行時間 if (!res1) { // 若是本次傳遞的執行時間大於HBase中的執行時間,則插入put boolean res2 =table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_time"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_time")),put); // 執行時間相等時,則去比較偏移量,本次傳遞的值大於HBase中的值則插入put if (!res2) { boolean res3 = table.checkAndPut(Bytes.toBytes(hbaseData.get("id")), Bytes.toBytes("update_info"), Bytes.toBytes("execute_position"), CompareFilter.CompareOp.GREATER, Bytes.toBytes(hbaseData.get("execute_position")),put); } }
目前移山的實時同步服務,kafka 消費端是使用一個線程去消費數據;
若是未來有版本升級需求,將消費端改成多個線程去消費數據時,要考慮到多線程消費時有序的消息會被打亂這種狀況的解決辦法。
歡迎你們關注個人微信公衆號閱讀更多文章: