文 | 呂鵬 DataPipeline架構師數據庫
進入大數據時代,實時做業有着愈來愈重要的地位。本文將從如下幾個部分進行講解DataPipeline在大數據平臺的實時數據流實踐。緩存
1、企業級數據面臨的主要問題和挑戰網絡
1.數據量不斷攀升多線程
隨着互聯網+的蓬勃發展和用戶規模的急劇擴張,企業數據量也在飛速增加,數據的量以GB爲單位,逐漸的開始以TB/GB/PB/EB,甚至ZB/YB等。同時大數據也在不斷深刻到金融、零售、製造等行業,發揮着愈來愈大的做用。架構
2. 數據質量的要求不斷地提高框架
當前比較流行的AI、數據建模,對數據質量要求高。尤爲在金融領域,對於數據質量的要求是很是高的。工具
3. 數據平臺架構的複雜化oop
企業級應用架構的變化隨着企業規模而變。規模小的企業,用戶少、數據量也小,可能只需一個MySQL就搞能搞;中型企業,隨着業務量的上升,這時候可能須要讓主庫作OLTP,備庫作OLAP;當企業進入規模化,數據量很是大,原有的OLTP可能已經不能知足了,這時候咱們會作一些策略,來保證OLTP和OLAP隔離,業務系統和BI系統分開互不影響,但作了隔離後同時帶來了一個新的困難,數據流的實時同步的需求,這時企業就須要一個可擴展、可靠的流式傳輸工具。post
2、大數據平臺上的實踐案例性能
下圖是一個典型的BI平臺設計場景,以MySQL爲例,DataPipeline是如何實現MySQL的SourceConnector。MySQL做爲Source端時:
使用binlog時須要注意開啓row 模式而且image設置爲 full。
1. MySQL SourceConnector 全量+增量實時同步的實現
下面是具體的實現流程圖,首先開啓repeatable read事務,保證在執行讀鎖以前的數據能夠確實的讀到。而後進行flush table with read lock 操做,添加一個讀鎖,防止這個時候有新的數據進入影響數據的讀取,這時開始一個truncation with snapshot,咱們能夠記錄當前binlog的offset 並標記一個snapshot start,這時的offset 爲增量讀取時開始的offset。當事務開始後能夠進行全量數據的讀取。record marker這時會將生成record 寫到 kafka 中,而後commit 這個事務。當全量數據push完畢後咱們解除讀鎖而且標記snapshot stop,此時全量數據已經都進入kafka了,以後從以前記錄的offset開始增量數據的同步。
2. DataPipeline作了哪些優化工做
1)以往在數據同步環節都分爲全量同步和增量同步,全量同步爲一個批處理。在批處理時咱們都是進行all or nothing的處理,但當大數據狀況下一個批量會佔用至關長的時間,時間越長可靠性就越難保障,因此每每會出現斷掉的狀況,這時一個從新處理會讓不少人崩潰。DataPipeline 解決了這一痛點,經過管理數據傳輸時的position 來作到斷點續傳,這時當一個大規模的數據任務即便發生了意外,也能夠重斷掉的點來繼續以前的任務,大大縮短了同步的時間,提升了同步的效率。
2)在同步多個任務的時候,很難平衡數據傳輸對源端的壓力和目的端的實時性,在大數據量下的傳輸尤爲可以體現,這時DataPipeline 在此作了大量相關測試來優化不一樣的鏈接池,開放數據傳輸效率的自定義化,供客戶針對本身的業務系統定製合適的傳輸任務,對於不一樣種類的數據庫的傳輸進行優化和調整,保證數據傳輸的高效性。
3)自定義異構數據類型的轉化,每每開源類大數據傳輸工具如 sqoop 等,對異構數據類型的支持不夠靈活,種類也不夠齊全。像金融領域中對數據精度要求較高的場景,在傳統數據庫向大數據平臺傳輸時形成的精度丟失是很大的一個問題。DataPipeline 對此作了更多數據類型的支持,好比hive 支持的複雜類型以及 decimal 和 timestamp 等。
3. Sink端之Hive
1)Hive的特性
2)Hive同步的問題
3)KafkaConnect HDFS 的 Hive 同步實踐
4)Recover的機制
recover 是一種恢復的機制,在數據傳輸的階段每每可能出現各類不一樣的問題,如網絡問題等等。當出現問題後咱們須要恢復數據同步,那麼recover是怎麼保證數據正常傳輸不丟失呢?當recover開始的時候,獲取目標文件在hdfs 上的租約,若是這時候須要讀寫的HDFS當前文件是被佔用的,那咱們須要等待它直到能夠獲取到租約。當咱們獲取到租約後就能夠開始讀以前寫入時候的log,若是第一次會建立一個新的log,並標記一個begin,而後記錄了當時的kafka offset。這時候須要清理以前遺留下來的臨時數據,清理掉以後再從新開始同步直到同步結束會標記一個end。若是沒有結束的話就至關於正在進行中,正在進行中每次都會提交當前同步的offset,來保證出現意外後會回滾到以前offset。
5)WAL (Write-Ahead Logging)機制
Write-Ahead Logging機制其實就是核心思想在數據寫入到數據庫以前,它先寫臨時文件,當一個批次結束後,在將這個臨時文件更名爲正式文件,確保每次提交後的正式文件一致性,若是中途出現寫入錯誤將臨時文件刪除從新寫入,至關於一個回滾。hive 的同步主要利用這種實現方式來保證一致性。首先它同步數據寫入到HDFS臨時文件上,確保一個批次的數據正常後再重命名到正式文件當中。正式的文件名會包含kafka offset,例如一個avro 文件的文件名爲 xxxx+001+0020.avro ,這表示當前文件中有offset 1 到 20 的20條數據。
4. Sink端之GreenPlum
GreenPlum,是一個MPP架構的數據倉庫,底層由多個postgres數據庫做爲計算節點,擅長OLAP,做爲BI數據倉庫有着良好的性能。
1)DataPipeline對GreenPlum 同步實踐以及優化策略
➢ 每一個須要同步的表單獨記錄一個offset,當整個任務失敗時能夠分開進行恢復;
➢ 使用一個線程池管理加載數據的線程,每一個同步的表單獨一個線程來進行加載數據,多表同時同步;
➢ 在加載數據的時間裏,提早對kafka進行消費,緩存處理好的一個數據集,當一個線程加載數據結束後立刻開始新的線程加載數據,減小處理加載數據的時間;
同步GreenPlum須要注意:由於是經過copy 寫入文件的,須要文件是結構化數據,典型的是使用CSV,CSV 寫入時需注意spiltquote,escapequote,避免出現數據錯位的現象。update主鍵的問題 , 當源端是update一個主鍵時,同時須要記錄update前的主鍵,並在目標端進行刪除。還有 0 特殊字符的問題,由於核心是用C語言,因此在同步的時候0須要特殊處理掉。
3、DataPipeline將來的工做
1.目前咱們碰到kafka connect rebalance的一些問題,因此咱們對其進行了改造。以往的rebalance機制是假如咱們增長或者刪除一個task,會致使整個集羣rebalance,這樣形成不少無謂的開銷並且頻繁的rebalance 不利於數據同步的任務的穩定。因而咱們將rebalance機制改形成一個黏性的機制:
2.源端的數據一致性,目前經過WAL的機制能夠保證目的端的一致性;
3.大數據量下的同步優化以及提升同步的穩定性。
4、總結
—end—