數棧是雲原生—站式數據中臺PaaS,咱們在github和gitee上有一個有趣的開源項目:FlinkX,FlinkX是一個基於Flink的批流統一的數據同步工具,既能夠採集靜態的數據,也能夠採集實時變化的數據,是全域、異構、批流一體的數據同步引擎。你們喜歡的話請給咱們點個star!star!star!
html
github開源項目:https://github.com/DTStack/flinkxmysql
gitee開源項目:https://gitee.com/dtstack_dev_0/flinkxgit
袋鼠云云原生一站式數據中臺PaaS——數棧,覆蓋了建設數據中心過程當中所須要的各類工具(包括數據開發平臺、數據資產平臺、數據科學平臺、數據服務引擎等),完整覆蓋離線計算、實時計算應用,幫助企業極大地縮短數據價值的萃取過程,提升提煉數據價值的能力。
github
目前,數棧-離線開發平臺(BatchWorks) 中的數據離線同步任務、數棧-實時開發平臺(StreamWorks)中的數據實時採集任務已經統一基於FlinkX來實現。數據的離線採集和實時採集基本的原理的是同樣的,主要的不一樣之處是源頭的流是否有界,因此統一用Flink的Stream API 來實現這兩種數據同步場景,實現數據同步的批流統一。sql
一、斷點續傳數據庫
斷點續傳是指數據同步任務在運行過程當中因各類緣由致使任務失敗,不須要重頭同步數據,只須要從上次失敗的位置繼續同步便可,相似於下載文件時因網絡緣由失敗,不須要從新下載文件,只須要繼續下載就行,能夠大大節省時間和計算資源。斷點續傳是數棧-離線開發平臺(BatchWorks)裏數據同步任務的一個功能,須要結合任務的出錯重試機制才能完成。當任務運行失敗,會在Engine裏進行重試,重試的時候會接着上次失敗時讀取的位置繼續讀取數據,直到任務運行成功爲止。緩存
二、實時採集服務器
實時採集是數棧-實時開發平臺(StreamWorks)裏數據採集任務的一個功能,當數據源裏的數據發生了增刪改操做,同步任務監聽到這些變化,將變化的數據實時同步到目標數據源。除了數據實時變化外,實時採集和離線數據同步的另外一個區別是:實時採集任務是不會中止的,任務會一直監聽數據源是否有變化。這一點和Flink任務是一致的,因此實時採集任務是數棧流計算應用裏的一個任務類型,配置過程和離線計算裏的同步任務基本同樣。網絡
斷點續傳和實時採集都依賴於Flink的Checkpoint機制,因此我們先來簡單瞭解一下。oracle
Checkpoint是Flink實現容錯機制最核心的功能,它可以根據配置週期性地基於Stream中各個Operator的狀態來生成Snapshot,從而將這些狀態數據按期持久化存儲下來,當Flink程序一旦意外崩潰時,從新運行程序時能夠有選擇地從這些Snapshot進行恢復,從而修正由於故障帶來的程序數據狀態中斷。
Checkpoint觸發時,會向多個分佈式的Stream Source中插入一個Barrier標記,這些Barrier會隨着Stream中的數據記錄一塊兒流向下游的各個Operator。當一個Operator接收到一個Barrier時,它會暫停處理Steam中新接收到的數據記錄。由於一個Operator可能存在多個輸入的Stream,而每一個Stream中都會存在對應的Barrier,該Operator要等到全部的輸入Stream中的Barrier都到達。
當全部Stream中的Barrier都已經到達該Operator,這時全部的Barrier在時間上看來是同一個時刻點(表示已經對齊),在等待全部Barrier到達的過程當中,Operator的Buffer中可能已經緩存了一些比Barrier早到達Operator的數據記錄(Outgoing Records),這時該Operator會將數據記錄(Outgoing Records)發射(Emit)出去,做爲下游Operator的輸入,最後將Barrier對應Snapshot發射(Emit)出去做爲這次Checkpoint的結果數據。
一、前提條件
同步任務要支持斷點續傳,對數據源有一些強制性的要求:
1)數據源(這裏特指關係數據庫)中必須包含一個升序的字段,好比主鍵或者日期類型的字段,同步過程當中會使用checkpoint機制記錄這個字段的值,任務恢復運行時使用這個字段構造查詢條件過濾已經同步過的數據,若是這個字段的值不是升序的,那麼任務恢復時過濾的數據就是錯誤的,最終致使數據的缺失或重複;
2)數據源必須支持數據過濾,若是不支持的話,任務就沒法從斷點處恢復運行,會致使數據重複;
3)目標數據源必須支持事務,好比關係數據庫,文件類型的數據源也能夠經過臨時文件的方式支持。
二、任務運行的詳細過程
咱們用一個具體的任務詳細介紹一下整個過程,任務詳情以下:
1)讀取數據
讀取數據時首先要構造數據分片,構造數據分片就是根據通道索引和checkpoint記錄的位置構造查詢sql,sql模板以下:
select * from data_test where id mod ${channel_num}=${channel_index} and id > ${offset}
若是是第一次運行,或者上一次任務失敗時尚未觸發checkpoint,那麼offset就不存在,根據offset和通道能夠肯定具體的查詢sql:
offset存在時
第一個通道:
select * from data_test where id mod 2=0 and id > ${offset_0};
第二個通道:
select * from data_test where id mod 2=1 and id > ${offset_1};
offset不存在時
第一個通道:
select * from data_test where id mod 2=0;
第二個通道:
select * from data_test where id mod 2=1;
數據分片構造好以後,每一個通道就根據本身的數據分片去讀數據了。
2)寫數據
寫數據前會先作幾個操做:
a、檢測 /data_test 目錄是否存在,若是目錄不存在,則建立這個目錄,若是目錄存在,進行2操做;
b、判斷是否是以覆蓋模式寫數據,若是是,則刪除 /data_test目錄,而後再建立目錄,若是不是,則進行3操做;
c、檢測 /data_test/.data 目錄是否存在,若是存在就先刪除,再建立,確保沒有其它任務因異常失敗遺留的髒數據文件;
數據寫入hdfs是單條寫入的,不支持批量寫入。數據會先寫入/data_test/.data/目錄下,數據文件的命名格式爲:
channelIndex.jobId.fileIndex
包含通道索引,jobId,文件索引三個部分。
3)checkpoint觸發時
在FlinkX中「狀態」表示的是標識字段id的值,咱們假設checkpoint觸發時兩個通道的讀取和寫入狀況如圖中所示:
checkpoint觸發後,兩個reader先生成Snapshot記錄讀取狀態,通道0的狀態爲 id=12,通道1的狀態爲 id=11。Snapshot生成以後向數據流裏面插入barrier,barrier隨數據流向Writer。以Writer_0爲例,Writer_0接收Reader_0和Reader_1發來的數據,假設先收到了Reader_0的barrier,這個時候Writer_0中止寫出數據到HDFS,將接收到的數據先放到 InputBuffer裏面,一直等待Reader_1的barrier到達以後再將Buffer裏的數據所有寫出,而後生成Writer的Snapshot,整個checkpoint結束後,記錄的任務狀態爲:
Reader_0:id=12
Reader_1:id=11
Writer_0:id=沒法肯定
Writer_1:id=沒法肯定
任務狀態會記錄到配置的HDFS目錄/flinkx/checkpoint/abc123下。由於每一個Writer會接收兩個Reader的數據,以及各個通道的數據讀寫速率可能不同,因此致使writer接收到的數據順序是不肯定的,可是這不影響數據的準確性,由於讀取數據時只須要Reader記錄的狀態就能夠構造查詢sql,咱們只要確保這些數據真的寫到HDFS就好了。在Writer生成Snapshot以前,會作一系列操做保證接收到的數據所有寫入HDFS:
a、close寫入HDFS文件的數據流,這時候會在/data_test/.data目錄下生成兩個兩個文件:
/data_test/.data/0.abc123.0
/data_test/.data/1.abc123.0
b、將生成的兩個數據文件移動到/data_test目錄下;
c、更新文件名稱模板更新爲:channelIndex.abc123.1;
快照生成後任務繼續讀寫數據,若是生成快照的過程當中有任何異常,任務會直接失敗,這樣此次快照就不會生成,任務恢復時會從上一個成功的快照恢復。
4)任務正常結束
任務正常結束時也會作和生成快照時一樣的操做,close文件流,移動臨時數據文件等。
5)任務異常終止
任務若是異常結束,假設任務結束時最後一個checkpoint記錄的狀態爲:
Reader_0:id=12Reader_1:id=11
那麼任務恢復的時候就會把各個通道記錄的狀態賦值給offset,再次讀取數據時構造的sql爲:
第一個通道:
select * from data_test where id mod 2=0 and id > 12;
第二個通道:
select * from data_test where id mod 2=1 and id > 11;
這樣就能夠從上一次失敗的位置繼續讀取數據了。
三、支持斷點續傳的插件
理論上只要支持過濾數據的數據源,和支持事務的數據源均可以支持斷點續傳的功能,目前FlinkX支持的插件以下:
目前FlinkX支持實時採集的插件有KafKa、binlog插件,binlog插件是專門針對mysql數據庫作實時採集的,若是要支持其它的數據源,只須要把數據打到Kafka,而後再用FlinkX的Kafka插件消費數據便可,好比oracle,只須要使用oracle的ogg將數據打到Kafka。這裏咱們專門講解一下mysql的實時採集插件binlog。
一、binlog
binlog是Mysql sever層維護的一種二進制日誌,與innodb引擎中的redo/undo log是徹底不一樣的日誌;其主要是用來記錄對mysql數據更新或潛在發生更新的SQL語句,並以"事務"的形式保存在磁盤中。
binlog的做用主要有:
1)複製:MySQL Replication在Master端開啓binlog,Master把它的二進制日誌傳遞給slaves並回放來達到master-slave數據一致的目的;
2)數據恢復:經過mysqlbinlog工具恢復數據;
3)增量備份。
二、MySQL 主備複製
有了記錄數據變化的binlog日誌還不夠,咱們還須要藉助MySQL的主備複製功能:主備複製是指 一臺服務器充當主數據庫服務器,另外一臺或多臺服務器充當從數據庫服務器,主服務器中的數據自動複製到從服務器之中。
主備複製的過程:
1)MySQL master 將數據變動寫入二進制日誌( binary log, 其中記錄叫作二進制日誌事件binary log events,能夠經過 show binlog events 進行查看);
2)MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌(relay log);
3)MySQL slave 重放 relay log 中事件,將數據變動反映它本身的數據。
三、寫入Hive
binlog插件能夠監聽多張表的數據變動狀況,解析出的數據中包含表名稱信息,讀取到的數據能夠所有寫入目標數據庫的一張表,也能夠根據數據中包含的表名信息寫入不一樣的表,目前只有Hive插件支持這個功能。Hive插件目前只有寫入插件,功能基於HDFS的寫入插件實現,也就是說從binlog讀取,寫入hive也支持失敗恢復的功能。
寫入Hive的過程:
1)從數據中解析出MySQL的表名,而後根據表名映射規則轉換成對應的Hive表名;
2)檢查Hive表是否存在,若是不存在就建立Hive表;
3)查詢Hive表的相關信息,構造HdfsOutputFormat;
4)調用HdfsOutputFormat將數據寫入HDFS。