最近看到一篇文章 《基於Canal與Flink實現數據實時增量同步》 ,主要講解的是基於Flink有關於MySQL Binlog數據採集的方案,看了一下實踐方法和具體代碼操做,感受有一些欠考慮和不足的狀況。
筆者以前有過一些相似的採集工具實踐的總結,可是並無在總體上作出一個系統性的總結,因此我在想,是否是能夠寫一篇我的總結性的文章,把Binlog採集中的問題以及相應的解決方案也進行總結呢?html
可能不少人對於Binlog的認識還不是很充足,會粗淺的認爲:「它不就是MySQL產生的,有固定結構的log嘛,把數據採集過來,而後把它作一下數據落地,它有什麼難的呢?」mysql
的確,它本質上確實就是個log,但是實際上,關於Binlog採集從場景分析,再到技術選型,總體內部有不少鮮爲人知的坑,不要小瞧了它。sql
筆者寫這篇文章,目的是把實際工做中對於Binlog數據採集的開發流程的原則、注意事項、可能存在的問題點展現出來,其中也會有筆者本身的一些我的總結數據採集中的原則,爲你們做參考,都是乾貨哦。數據庫
1、Binlog實時採集總結原則json
首先拋開技術框架的討論,我的總結Binlog日誌的數據採集主要原則:api
-
原則一 :與業務數據解耦數據結構
-
原則二 :與數據結構解耦架構
-
原則三 :數據是可回溯的框架
分別闡述一下這三個原則的具體含義。運維
原則一
在數據採集中,數據落地通常都會使用時間分區進行落地,那就須要咱們肯定一下固定的時間戳做爲時間分區的基礎時間序列。
在這種狀況下看來,業務數據上的時間戳字段,不管是從實際開發中獲取此時間戳的角度,仍是現實表中都會存在這樣的時間戳,都不可能全部表徹底知足。
舉一下反例:
表 :業務時間戳(或事件時間)
table A :create_time,update_time
table B :create_time
table C :create_at
table D :無
像這樣的狀況,理論上能夠經過限制RD和DBA的在設計表時規則化表結構來實現時間戳以及命名的統一化、作限制,可是是在實際工做中,這樣的狀況基本上是作不到的,相信不少讀者也會遇到這樣的狀況。
可能不少作數據採集的同窗會想,咱們能不能要求他們去制定標準呢?
我的的想法是,能夠,可是不能把大數據底層數據採集徹底依靠這樣互相制定的標準。
緣由有如下三點:
-
若是隻是依靠兩個部門或者多個部門制定的口頭的或者書面的標準,卻沒有強制性在coding上面作約束,所有都是人爲在約束的話,後期人員增長,早晚會出問題。
-
大數據部門與後臺部門,在於數據狀況變動的狀況,有時候多是信息延時的,也就是說,有可能在數據落地後發現異常後,才知道後臺部門作出了調整。
-
也是最重要的一點,大數據部門不能要求在底層數據源去要求數據源去適應大數據的採集,這樣要成的後果頗有多是限制後臺部門在開發業務功能上的自由度,這樣的開發流程也是不合理的。
因此若是想要使用惟一固定的時間序列,就要和業務的數據剝離開,咱們想要的時間戳不受業務數據的變更的影響。
原則二
在業務數據庫中,必定會存在表結構變動的問題,絕大部分狀況爲增長列,可是也會存在列重命名、列刪除這類狀況,而其中字段變動的順序是不可控的。
此原則想描述的是,導入到數據倉庫中的表,要適應數據庫表的各類操做,保持其可用性與列數據的正確性。
原則三
此數據可回溯,其中包括兩個方面:
-
數據採集可回溯
-
數據消費落地可回溯
第一個描述的是,在採集binlog採集端,能夠從新按位置採集binlog。
第二個描述的是,在消費binlog落地的一端,能夠重複消費把數據從新落地。
此爲筆者我的總結,不管是選擇什麼樣的技術選型進行組合搭建,這幾點原則是須要具有的。
2、實現方案與具體操做
技術架構 :Debezium + Confluent + Kafka + OSS/S3 + Hive
基於原則一的解決方案
Debezium提供了New Record State Extraction的配置選項,至關於提供了一個transform算子,能夠抽取出binlog中的元數據。
對於0.10版本的配置,能夠抽取table,version,connector,name,ts_ms,db,server_id,file,pos,row等binlog元數據信息。
其中ts_ms爲binlog日誌的產生時間,此爲binlog元數據,能夠應用於全部數據表,並且能夠在徹底對數據表內部結構不瞭解的狀況下,使用此固定時間戳,徹底實現咱們的原則一。
關於Debezium,不一樣版本以前的配置參數有多是不一樣的,若是讀者有須要實踐的話須要在官方文檔上確認相應版本的配置參數。
對於其餘框架,例如市面上用的較多的Canal,或者讀者有本身須要開發數據採集程序的話,binlog的元數據建議所有抽取出來,在此過程以及後續過程當中均可能會被用到。
基於原則二的解決方案
對於Hive ,目前主流的數據存儲格式爲Parquet、ORC、Json、Avro這幾種。
拋開數據存儲的效率討論。
對於前兩種數據格式,爲列存,也就是說,這兩種數據格式的數據讀取,會嚴格依賴於咱們數據表中的數據存儲的順序,這樣的數據格式,是沒法知足數據列靈活增長、刪除等操做的。
Avro格式爲行存,可是它須要依賴於Schema Register服務,考慮Hive的數據表讀取徹底要依賴一個外部服務,風險太高。
最後肯定使用Json格式進行數據存儲,雖然這樣的讀取和存儲效率沒有其餘格式高,可是這樣能夠保證業務數據的任何變動均可以在Hive中讀取出來。
Debezium組件採集binlog的數據就是爲json格式,和預期的設計方案是吻合的,能夠解決原則二帶來的問題。
對於其餘框架,例如市面上用的較多的Canal,能夠設置爲Json數據格式進行傳輸,或者讀者有本身須要開發數據採集程序的話,也是相同的道理。
基於原則三的解決方案
在採集binlog採集端,能夠從新按位置採集binlog。
此方案實現方式在Debezium官方網站上也給出了相應的解決方案,大概描述一下,須要用到Kafkacat工具。
對於每個採集的MySQL實例,建立數據採集任務時,Confluent都會相應的建立connector(也就是採集程序)的採集的元數據的topic,裏面會存儲相應的時間戳、文件位置、以及位置,能夠經過修改此數據,重置採集binlog日誌的位置。
值得注意的是,此操做的時間節點也是有限制的,和MySQL的binlog日誌保存週期有關,因此此方式回溯時,須要確認的是MySQL日誌還存在。
對於重複消費把數據從新落地。
此方案由於基於Kafka,對於Kafka從新制定消費offset消費位點的操做網上有不少方案,此處再也不贅述。
對於讀者本身實現的話,須要確認所選擇的MQ支持此特性就行了。
Frequently Asked Questions:https://debezium.io/documentation/faq/#how_to_change_the_offsets_of_the_source_database
3、不一樣的業務場景
此部分只描述在筆者技術架構下如何實現如下操做,讀者能夠根據本身選擇的技術組件探究不一樣的技術方案。
一、數據庫分庫分表的狀況
基於Debezium的架構,一個Source端只能對應一個MySQL實例進行採集,對於同一實例上的分表狀況,可使用Debezium Topic Routing功能。
在採集過濾binlog時把相應須要採集的表按照正則匹配寫入一個指定的topic中。
在分庫的狀況下,還須要在sink端增長RegexRouter transform算子進行topic間的合併寫入操做。
二、數據增量採集與全量採集
對於採集組件,目前目前的配置都是以增量爲默認,因此不管是選擇Debezium仍是Canal的話,正常配置就好。
可是有些時候會存在須要採集全表的狀況,筆者也給出一下全量的數據採集的方案。
方案一:
Debezium自己自帶了這樣的功能,須要將snapshot.mode參數選型設置爲when_needed,這樣能夠作表的全量採集操做。
官方文檔中,在此處的參數配置有更加細緻的描述。
Snapshots:https://debezium.io/documentation/reference/0.10/connectors/mysql.html#snapshots
方案二:
使用sqoop和增量採集同時使用的方式進行。
此方案適用於表數據已存在不少,而目前binlog數據頻率不頻繁的狀況下,使用此方案。
值得注意的是有兩點:
-
sqoop數據導入落地爲Parquet格式,與增量採集數據合併時,須要作數據格式整合,也就是中間須要有臨時表,經過union all的方式把數據merge到全量表中。
-
sqoop導入的Parquet格式,與Debezium處理某些數據類型時會存在不相同的問題,例如datetime類型,sqoop會導出string,Debezium會轉化爲bigint。
三、離線數據去重條件
數據落地後,經過json表映射出binlog原始數據,那麼問題也就來了,咱們如何找到最新的一條數據呢?
也許咱們能夠簡單的認爲,用咱們剛剛的抽取的那個ts_ms,而後作倒排不就行了嗎?
大部分狀況下這樣作確實是能夠的。可是筆者在實際開發中,發現這樣的狀況是不能知足全部狀況的,由於在binlog中,可能真的會存在ts_ms與PK相同,可是確實不一樣的兩條數據。
那咱們怎麼去解決時間都相同的兩條數據呢?答案就在上文,咱們剛剛建議的把binlog的元數據都抽取出來。
SELECT *
FROM
(
SELECT *,
row_number() over(partition BY t.id ORDER BY t.`__ts_ms` DESC,t.`__file` DESC,cast(t.`__pos` AS int) DESC) AS order_by
FROM test t
WHERE dt='{pt}'
AND hour='{now_hour}'
) t1
WHERE t1.order_by = 1
解釋一下這個sql中row_number的的條件:
-
__ts_ms :爲binlog中的ts_ms,即事件時間。
-
__file :爲binlog此條數據所在file name。
-
__pos :爲binlog中此數據所在文件中的位置,爲數據類型。
這樣的條件組合取出的數據,就是最新的一條。
也許有讀者會問,若是這條數據被刪除了怎麼辦,你這樣取出來的數據不就是錯的了嗎?
這個Debezium也有相應的操做,有相應的配置選項讓你如何選擇處理刪除行爲的binlog數據。
做爲給你們的參考,筆者選擇rewrite的參數配置,這樣在上面的sql最外層只須要判斷 「delete = ’false‘「 就是正確的數據啦。
Debezium:https://debezium.io/documentation/reference/0.10/configuration/event-flattening.html
4、架構上的總結
在技術選型以及總體與細節的架構中,筆者始終堅持一個原則——流程儘可能簡約而不簡單,數據環節越長,出問題的環節就可能越多,對於後期鎖定問題與運維難度也會很高。
因此筆者在技術選型也曾考慮過Flink + Kafka的這種方式,可是基於當時的現狀,筆者並無選擇這樣的技術選型,筆者也闡述一下緣由。
1)筆者的Flink環境沒有作開發平臺化與運維平臺化。
2)場景偏向於數據採集和傳輸,而不是計算,Flink的優點特性並無使用到不少。
3)若是基於一個MySQL實例開發一個Flink程序,使用原生的Flink steaming,作api式的程序開發,若是由於某些表的數據致使程序掛掉,這個實例的數據都沒法採集了,這樣的影響範圍太大。
4)若是基於一個一個表或者經過正則的方式匹配一些表,作一個Flink程序,這樣雖然是保證了靈活度,可是90%的代碼都是冗餘的,並且會有不少任務,浪費資源。
5)最後就是開發和維護效率的問題,若是隻是寫原生的Flink程序的話,後續的累加開發,會把程序變得愈來愈重,可能邏輯也會愈來愈繁瑣。
總結起來,我當時對於Flink的思考,若是Flink沒有作開發和運維監控的平臺化的狀況下,能夠做爲一個臨時方案,可是後期若是一直在這樣一個開發流程下縫縫補補,多人開發下很容易出現問題,或者就是你們都這樣一個程序框架下造輪子,並且越造越慢。並且後期的主要項目方向並無把Flink平臺化提上日程,因此也是考慮了一部分將來的狀況進行的選擇。
所以我的最後肯定技術選型的時候,並無選用Flink。
5、結束語
本文筆者寫得較爲理論化,也是對此場景的一個技術思路方案總結。技術架構上的方案多種多樣,筆者只是選擇了其中一種進行實現,也但願你們有其餘的技術方案或者理論進行交流,煩請指正。
做者丨李楠 來源丨數據倉庫與Python大數據(ID:dw_zzxx) dbaplus社羣歡迎廣大技術人員投稿,投稿郵箱: editor@dbaplus.cn