DataPipeline在大數據平臺的數據流實踐

文 | 呂鵬 DataPipeline架構師數據庫

圖片描述

進入大數據時代,實時做業有着愈來愈重要的地位。本文將從如下幾個部分進行講解DataPipeline在大數據平臺的實時數據流實踐。緩存

1、企業級數據面臨的主要問題和挑戰網絡

1.數據量不斷攀升多線程

隨着互聯網+的蓬勃發展和用戶規模的急劇擴張,企業數據量也在飛速增加,數據的量以GB爲單位,逐漸的開始以TB/GB/PB/EB,甚至ZB/YB等。同時大數據也在不斷深刻到金融、零售、製造等行業,發揮着愈來愈大的做用。架構

2. 數據質量的要求不斷地提高框架

當前比較流行的AI、數據建模,對數據質量要求高。尤爲在金融領域,對於數據質量的要求是很是高的。工具

3. 數據平臺架構的複雜化oop

企業級應用架構的變化隨着企業規模而變。規模小的企業,用戶少、數據量也小,可能只需一個MySQL就搞能搞;中型企業,隨着業務量的上升,這時候可能須要讓主庫作OLTP,備庫作OLAP;當企業進入規模化,數據量很是大,原有的OLTP可能已經不能知足了,這時候咱們會作一些策略,來保證OLTP和OLAP隔離,業務系統和BI系統分開互不影響,但作了隔離後同時帶來了一個新的困難,數據流的實時同步的需求,這時企業就須要一個可擴展、可靠的流式傳輸工具。post

圖片描述

2、大數據平臺上的實踐案例性能

下圖是一個典型的BI平臺設計場景,以MySQL爲例,DataPipeline是如何實現MySQL的SourceConnector。MySQL做爲Source端時:

  • 全量+ 增量;
  • 全量:經過select 方式,將數據加載到kafka中;
  • 增量:實時讀取 binlog的方式;

使用binlog時須要注意開啓row 模式而且image設置爲 full。

1. MySQL SourceConnector 全量+增量實時同步的實現

下面是具體的實現流程圖,首先開啓repeatable read事務,保證在執行讀鎖以前的數據能夠確實的讀到。而後進行flush table with read lock 操做,添加一個讀鎖,防止這個時候有新的數據進入影響數據的讀取,這時開始一個truncation with snapshot,咱們能夠記錄當前binlog的offset 並標記一個snapshot start,這時的offset 爲增量讀取時開始的offset。當事務開始後能夠進行全量數據的讀取。record marker這時會將生成record 寫到 kafka 中,而後commit 這個事務。當全量數據push完畢後咱們解除讀鎖而且標記snapshot stop,此時全量數據已經都進入kafka了,以後從以前記錄的offset開始增量數據的同步。

圖片描述

2. DataPipeline作了哪些優化工做

1)以往在數據同步環節都分爲全量同步和增量同步,全量同步爲一個批處理。在批處理時咱們都是進行all or nothing的處理,但當大數據狀況下一個批量會佔用至關長的時間,時間越長可靠性就越難保障,因此每每會出現斷掉的狀況,這時一個從新處理會讓不少人崩潰。DataPipeline 解決了這一痛點,經過管理數據傳輸時的position 來作到斷點續傳,這時當一個大規模的數據任務即便發生了意外,也能夠重斷掉的點來繼續以前的任務,大大縮短了同步的時間,提升了同步的效率。

2)在同步多個任務的時候,很難平衡數據傳輸對源端的壓力和目的端的實時性,在大數據量下的傳輸尤爲可以體現,這時DataPipeline 在此作了大量相關測試來優化不一樣的鏈接池,開放數據傳輸效率的自定義化,供客戶針對本身的業務系統定製合適的傳輸任務,對於不一樣種類的數據庫的傳輸進行優化和調整,保證數據傳輸的高效性。

3)自定義異構數據類型的轉化,每每開源類大數據傳輸工具如 sqoop 等,對異構數據類型的支持不夠靈活,種類也不夠齊全。像金融領域中對數據精度要求較高的場景,在傳統數據庫向大數據平臺傳輸時形成的精度丟失是很大的一個問題。DataPipeline 對此作了更多數據類型的支持,好比hive 支持的複雜類型以及 decimal 和 timestamp 等。

3. Sink端之Hive

1)Hive的特性

  • Hive內部表和外部表;
  • 依賴HDFS;
  • 支持事務和非事務;
  • 多種壓縮格式;
  • 分區分桶。

2)Hive同步的問題

  • 如何保證明時的寫入?
  • schema change了怎麼辦?
  • 怎麼擴展我想保存的格式?
  • 怎麼實現多種分區方式?
  • 同步中斷了怎麼辦?
  • 如何保證個人數據不丟?

3)KafkaConnect HDFS 的 Hive 同步實踐

  • 使用外表:Hive外部表,可以提升寫入效率,直接寫HDFS,減小IO消耗,而內表會比外表多一次IO;
  • Schema change:目前的作法是目的端根據源端的變化而變化,當有增長列刪除列的狀況,目標端會跟隨源端改動;
  • 目前支持的存儲格式:parquet,avro ,csv
  • 插件化的partitioner,提供多種分區方式,如 Wallclock RecordRecordField:wallclock是使用寫入到hive端時的系統時間,Record使用是讀取時生成record的時間,RecordField是使用用戶自定義的時間戳來定義分區,將來會實現可自定義化的partitioner來知足不一樣的需求;
  • Recover 機制保障中斷後不會丟失數據;
  • 使用WAL (Write-AheadLogging)機制,保證數據目的端數據一致性。

4)Recover的機制

recover 是一種恢復的機制,在數據傳輸的階段每每可能出現各類不一樣的問題,如網絡問題等等。當出現問題後咱們須要恢復數據同步,那麼recover是怎麼保證數據正常傳輸不丟失呢?當recover開始的時候,獲取目標文件在hdfs 上的租約,若是這時候須要讀寫的HDFS當前文件是被佔用的,那咱們須要等待它直到能夠獲取到租約。當咱們獲取到租約後就能夠開始讀以前寫入時候的log,若是第一次會建立一個新的log,並標記一個begin,而後記錄了當時的kafka offset。這時候須要清理以前遺留下來的臨時數據,清理掉以後再從新開始同步直到同步結束會標記一個end。若是沒有結束的話就至關於正在進行中,正在進行中每次都會提交當前同步的offset,來保證出現意外後會回滾到以前offset。

圖片描述

5)WAL (Write-Ahead Logging)機制

Write-Ahead Logging機制其實就是核心思想在數據寫入到數據庫以前,它先寫臨時文件,當一個批次結束後,在將這個臨時文件更名爲正式文件,確保每次提交後的正式文件一致性,若是中途出現寫入錯誤將臨時文件刪除從新寫入,至關於一個回滾。hive 的同步主要利用這種實現方式來保證一致性。首先它同步數據寫入到HDFS臨時文件上,確保一個批次的數據正常後再重命名到正式文件當中。正式的文件名會包含kafka offset,例如一個avro 文件的文件名爲 xxxx+001+0020.avro ,這表示當前文件中有offset 1 到 20 的20條數據。

4. Sink端之GreenPlum

GreenPlum,是一個MPP架構的數據倉庫,底層由多個postgres數據庫做爲計算節點,擅長OLAP,做爲BI數據倉庫有着良好的性能。

1)DataPipeline對GreenPlum 同步實踐以及優化策略

  • greenplum 支持多種數據加載方式,目前咱們使用copy的加載方式;
  • 批量處理提升sink端寫入效率,不進行insert 和 update 的操做,一概使用 delete + copy 的方式批量加載;
  • 多線程加預加載機制:

➢ 每一個須要同步的表單獨記錄一個offset,當整個任務失敗時能夠分開進行恢復;
➢ 使用一個線程池管理加載數據的線程,每一個同步的表單獨一個線程來進行加載數據,多表同時同步;
➢ 在加載數據的時間裏,提早對kafka進行消費,緩存處理好的一個數據集,當一個線程加載數據結束後立刻開始新的線程加載數據,減小處理加載數據的時間;

  • delete + copy的方式能夠保證數據最終一致性;
  • source 端有主鍵的表能夠經過主鍵來合併一個批次須要同步的數據,如一個須要同步的批量數據中包含一條 insert 的數據,後面跟着
    update 該條數據,那就無需同步兩遍,將該數據更新到 update 以後的狀態 copy 到 gp 當中便可。

同步GreenPlum須要注意:由於是經過copy 寫入文件的,須要文件是結構化數據,典型的是使用CSV,CSV 寫入時需注意spiltquote,escapequote,避免出現數據錯位的現象。update主鍵的問題 , 當源端是update一個主鍵時,同時須要記錄update前的主鍵,並在目標端進行刪除。還有 0 特殊字符的問題,由於核心是用C語言,因此在同步的時候0須要特殊處理掉。

3、DataPipeline將來的工做

1.目前咱們碰到kafka connect rebalance的一些問題,因此咱們對其進行了改造。以往的rebalance機制是假如咱們增長或者刪除一個task,會致使整個集羣rebalance,這樣形成不少無謂的開銷並且頻繁的rebalance 不利於數據同步的任務的穩定。因而咱們將rebalance機制改形成一個黏性的機制:

  • 當咱們增長一個新的任務的時候,咱們會檢查全部的worker使用率比較低的,當worker的task比較少,咱們只把它加進比較少的worker就能夠了,也不須要作全量的平衡,固然這時候可能仍是有一些不平衡的資源浪費,這是咱們能夠容忍的,至少比咱們作一次全量的rebalance開銷要小;
  • 假如刪除一個task,以往的機制是刪除一個task的時候也會作全量的Rebalance,新的機制不會觸發rebalance。這時候若是時間長也會形成一個資源不平衡,這是咱們能夠自動化rebalance一下全部的集羣;
  • 假如說集羣的某個節點宕掉了,該節點的task怎麼辦呢?咱們不會立刻就把這個節點上的task分配出去,會先等待10分鐘,由於有的時候它可能只是短暫的鏈接超時,過一段時間後就會恢復,若是根據這個來作一次rebalance,可能這是不太值的。當等待10分鐘後節點仍是沒有恢復,咱們再作rebalance,將宕掉的節點任務分配到其餘節點上;

2.源端的數據一致性,目前經過WAL的機制能夠保證目的端的一致性;

3.大數據量下的同步優化以及提升同步的穩定性。

4、總結

  1. 大數據時代企業數據集成主要面臨各類複雜的架構,應對這些複雜的系統對ETL的要求也愈來愈高。咱們能作的就是須要權衡利弊選取一個符合業務需求的框架;
  2. Kafka Connect 比較適合對數據量大,且有實時性需求的業務;
  3. 基於Kafka Connect 優良特性能夠依據不一樣的數據倉庫特性來提升數據時效性和同步效率;
  4. DataPipeline針對目前企業在大規模實時數據流的痛點,進行了相關的改造和優化,首先數據端到端一致性的保證是幾乎全部企業在數據同步過程當中碰到的,目前已經作到基於kafka connect 的框架中 rebalance 中的優化和改造。

—end—

相關文章
相關標籤/搜索