美團DB數據同步到數據倉庫的架構與實踐

背景

在數據倉庫建模中,未經任何加工處理的原始業務層數據,咱們稱之爲ODS(Operational Data Store)數據。在互聯網企業中,常見的ODS數據有業務日誌數據(Log)和業務DB數據(DB)兩類。對於業務DB數據來講,從MySQL等關係型數據庫的業務數據進行採集,而後導入到Hive中,是進行數據倉庫生產的重要環節。正則表達式

如何準確、高效地把MySQL數據同步到Hive中?通常經常使用的解決方案是批量取數並Load:直連MySQL去Select表中的數據,而後存到本地文件做爲中間存儲,最後把文件Load到Hive表中。這種方案的優勢是實現簡單,可是隨着業務的發展,缺點也逐漸暴露出來:數據庫

  • 性能瓶頸:隨着業務規模的增加,Select From MySQL -> Save to Localfile -> Load to Hive這種數據流花費的時間愈來愈長,沒法知足下游數倉生產的時間要求。
  • 直接從MySQL中Select大量數據,對MySQL的影響很是大,容易形成慢查詢,影響業務線上的正常服務。
  • 因爲Hive自己的語法不支持更新、刪除等SQL原語,對於MySQL中發生Update/Delete的數據沒法很好地進行支持。

爲了完全解決這些問題,咱們逐步轉向CDC (Change Data Capture) + Merge的技術方案,即實時Binlog採集 + 離線處理Binlog還原業務數據這樣一套解決方案。Binlog是MySQL的二進制日誌,記錄了MySQL中發生的全部數據變動,MySQL集羣自身的主從同步就是基於Binlog作的。服務器

本文主要從Binlog實時採集和離線處理Binlog還原業務數據兩個方面,來介紹如何實現DB數據準確、高效地進入數倉。架構

總體架構

圖片0

總體的架構如上圖所示。在Binlog實時採集方面,咱們採用了阿里巴巴的開源項目Canal,負責從MySQL實時拉取Binlog並完成適當解析。Binlog採集後會暫存到Kafka上供下游消費。總體實時採集部分如圖中紅色箭頭所示。負載均衡

離線處理Binlog的部分,如圖中黑色箭頭所示,經過下面的步驟在Hive上還原一張MySQL表:框架

  1. 採用Linkedin的開源項目Camus,負責每小時把Kafka上的Binlog數據拉取到Hive上。
  2. 對每張ODS表,首先須要一次性製做快照(Snapshot),把MySQL裏的存量數據讀取到Hive上,這一過程底層採用直連MySQL去Select數據的方式。
  3. 對每張ODS表,天天基於存量數據和當天增量產生的Binlog作Merge,從而還原出業務數據。

咱們回過頭來看看,背景中介紹的批量取數並Load方案遇到的各類問題,爲何用這種方案能解決上面的問題呢?性能

  • 首先,Binlog是流式產生的,經過對Binlog的實時採集,把部分數據處理需求由天天一次的批處理分攤到實時流上。不管從性能上仍是對MySQL的訪問壓力上,都會有明顯地改善。
  • 第二,Binlog自己記錄了數據變動的類型(Insert/Update/Delete),經過一些語義方面的處理,徹底可以作到精準的數據還原。

Binlog實時採集

對Binlog的實時採集包含兩個主要模塊:一是CanalManager,主要負責採集任務的分配、監控報警、元數據管理以及和外部依賴系統的對接;二是真正執行採集任務的Canal和CanalClient。優化

圖片1

當用戶提交某個DB的Binlog採集請求時,CanalManager首先會調用DBA平臺的相關接口,獲取這一DB所在MySQL實例的相關信息,目的是從中選出最適合Binlog採集的機器。而後把採集實例(Canal Instance)分發到合適的Canal服務器上,即CanalServer上。在選擇具體的CanalServer時,CanalManager會考慮負載均衡、跨機房傳輸等因素,優先選擇負載較低且同地域傳輸的機器。spa

CanalServer收到採集請求後,會在ZooKeeper上對收集信息進行註冊。註冊的內容包括:設計

  • 以Instance名稱命名的永久節點。
  • 在該永久節點下注冊以自身ip:port命名的臨時節點。

這樣作的目的有兩個:

  • 高可用:CanalManager對Instance進行分發時,會選擇兩臺CanalServer,一臺是Running節點,另外一臺做爲Standby節點。Standby節點會對該Instance進行監聽,當Running節點出現故障後,臨時節點消失,而後Standby節點進行搶佔。這樣就達到了容災的目的。
  • 與CanalClient交互:CanalClient檢測到本身負責的Instance所在的Running CanalServer後,便會進行鏈接,從而接收到CanalServer發來的Binlog數據。

對Binlog的訂閱以MySQL的DB爲粒度,一個DB的Binlog對應了一個Kafka Topic。底層實現時,一個MySQL實例下全部訂閱的DB,都由同一個Canal Instance進行處理。這是由於Binlog的產生是以MySQL實例爲粒度的。CanalServer會拋棄掉未訂閱的Binlog數據,而後CanalClient將接收到的Binlog按DB粒度分發到Kafka上。

離線還原MySQL數據

完成Binlog採集後,下一步就是利用Binlog來還原業務數據。首先要解決的第一個問題是把Binlog從Kafka同步到Hive上。

圖片2

Kafka2Hive

整個Kafka2Hive任務的管理,在美團數據平臺的ETL框架下進行,包括任務原語的表達和調度機制等,都同其餘ETL相似。而底層採用LinkedIn的開源項目Camus,並進行了有針對性的二次開發,來完成真正的Kafka2Hive數據傳輸工做。

對Camus的二次開發

Kafka上存儲的Binlog未帶Schema,而Hive表必須有Schema,而且其分區、字段等的設計,都要便於下游的高效消費。對Camus作的第一個改造,即是將Kafka上的Binlog解析成符合目標Schema的格式。

對Camus作的第二個改造,由美團的ETL框架所決定。在咱們的任務調度系統中,目前只對同調度隊列的任務作上下游依賴關係的解析,跨調度隊列是不能創建依賴關係的。而在MySQL2Hive的整個流程中,Kafka2Hive的任務須要每小時執行一次(小時隊列),Merge任務天天執行一次(天隊列)。而Merge任務的啓動必需要嚴格依賴小時Kafka2Hive任務的完成。

爲了解決這一問題,咱們引入了Checkdone任務。Checkdone任務是天任務,主要負責檢測前一天的Kafka2Hive是否成功完成。若是成功完成了,則Checkdone任務執行成功,這樣下游的Merge任務就能夠正確啓動了。

Checkdone的檢測邏輯

Checkdone是怎樣檢測的呢?每一個Kafka2Hive任務成功完成數據傳輸後,由Camus負責在相應的HDFS目錄下記錄該任務的啓動時間。Checkdone會掃描前一天的全部時間戳,若是最大的時間戳已經超過了0點,就說明前一天的Kafka2Hive任務都成功完成了,這樣Checkdone就完成了檢測。

此外,因爲Camus自己只是完成了讀Kafka而後寫HDFS文件的過程,還必須完成對Hive分區的加載才能使下游查詢到。所以,整個Kafka2Hive任務的最後一步是加載Hive分區。這樣,整個任務纔算成功執行。

每一個Kafka2Hive任務負責讀取一個特定的Topic,把Binlog數據寫入original_binlog庫下的一張表中,即前面圖中的original_binlog.db,其中存儲的是對應到一個MySQL DB的所有Binlog。

圖片3

上圖說明了一個Kafka2Hive完成後,文件在HDFS上的目錄結構。假如一個MySQL DB叫作user,對應的Binlog存儲在original_binlog.user表中。ready目錄中,按天存儲了當天全部成功執行的Kafka2Hive任務的啓動時間,供Checkdone使用。每張表的Binlog,被組織到一個分區中,例如userinfo表的Binlog,存儲在table_name=userinfo這一分區中。每一個table_name一級分區下,按dt組織二級分區。圖中的xxx.lzo和xxx.lzo.index文件,存儲的是通過lzo壓縮的Binlog數據。

Merge

Binlog成功入倉後,下一步要作的就是基於Binlog對MySQL數據進行還原。Merge流程作了兩件事,首先把當天生成的Binlog數據存放到Delta表中,而後和已有的存量數據作一個基於主鍵的Merge。Delta表中的數據是當天的最新數據,當一條數據在一天內發生屢次變動時,Delta表中只存儲最後一次變動後的數據。

把Delta數據和存量數據進行Merge的過程當中,須要有惟一鍵來斷定是不是同一條數據。若是同一條數據既出如今存量表中,又出如今Delta表中,說明這一條數據發生了更新,則選取Delta表的數據做爲最終結果;不然說明沒有發生任何變更,保留原來存量表中的數據做爲最終結果。Merge的結果數據會Insert Overwrite到原表中,即圖中的origindb.table

Merge流程舉例

下面用一個例子來具體說明Merge的流程。

圖片4

數據表共id、value兩列,其中id是主鍵。在提取Delta數據時,對同一條數據的屢次更新,只選擇最後更新的一條。因此對id=1的數據,Delta表中記錄最後一條更新後的值value=120。Delta數據和存量數據作Merge後,最終結果中,新插入一條數據(id=4),兩條數據發生了更新(id=1和id=2),一條數據未變(id=3)。

默認狀況下,咱們採用MySQL表的主鍵做爲這一判重的惟一鍵,業務也能夠根據實際狀況配置不一樣於MySQL的惟一鍵。

上面介紹了基於Binlog的數據採集和ODS數據還原的總體架構。下面主要從兩個方面介紹咱們解決的實際業務問題。

實踐一:分庫分表的支持

隨着業務規模的擴大,MySQL的分庫分表狀況愈來愈多,不少業務的分表數目都在幾千個這樣的量級。而通常數據開發同窗須要把這些數據聚合到一塊兒進行分析。若是對每一個分表都進行手動同步,再在Hive上進行聚合,這個成本很難被咱們接受。所以,咱們須要在ODS層就完成分表的聚合。

圖片5

首先,在Binlog實時採集時,咱們支持把不一樣DB的Binlog寫入到同一個Kafka Topic。用戶能夠在申請Binlog採集時,同時勾選同一個業務邏輯下的多個物理DB。經過在Binlog採集層的聚集,全部分庫的Binlog會寫入到同一張Hive表中,這樣下游在進行Merge時,依然只須要讀取一張Hive表。

第二,Merge任務的配置支持正則匹配。經過配置符合業務分表命名規則的正則表達式,Merge任務就能瞭解本身須要聚合哪些MySQL表的Binlog,從而選取相應分區的數據來執行。

這樣經過兩個層面的工做,就完成了分庫分表在ODS層的合併。

這裏面有一個技術上的優化,在進行Kafka2Hive時,咱們按業務分表規則對錶名進行了處理,把物理表名轉換成了邏輯表名。例如userinfo123這張表名會被轉換爲userinfo,其Binlog數據存儲在original_binlog.user表的table_name=userinfo分區中。這樣作的目的是防止過多的HDFS小文件和Hive分區形成的底層壓力。

實踐二:刪除事件的支持

Delete操做在MySQL中很是常見,因爲Hive不支持Delete,若是想把MySQL中刪除的數據在Hive中刪掉,須要採用「迂迴」的方式進行。

對須要處理Delete事件的Merge流程,採用以下兩個步驟:

  • 首先,提取出發生了Delete事件的數據,因爲Binlog自己記錄了事件類型,這一步很容易作到。將存量數據(表A)與被刪掉的數據(表B)在主鍵上作左外鏈接(Left outer join),若是可以所有join到雙方的數據,說明該條數據被刪掉了。所以,選擇結果中表B對應的記錄爲NULL的數據,便是應當被保留的數據。
  • 而後,對上面獲得的被保留下來的數據,按照前面描述的流程作常規的Merge。

圖片6

總結與展望

做爲數據倉庫生產的基礎,美團數據平臺提供的基於Binlog的MySQL2Hive服務,基本覆蓋了美團內部的各個業務線,目前已經可以知足絕大部分業務的數據同步需求,實現DB數據準確、高效地入倉。在後面的發展中,咱們會集中解決CanalManager的單點問題,並構建跨機房容災的架構,從而更加穩定地支撐業務的發展。

本文主要從Binlog流式採集和基於Binlog的ODS數據還原兩方面,介紹了這一服務的架構,並介紹了咱們在實踐中遇到的一些典型問題和解決方案。但願可以給其餘開發者一些參考價值,同時也歡迎你們和咱們一塊兒交流。

招聘

若是你對咱們的工做內容比較感興趣,歡迎發送簡歷給 wangmengmeng05@meituan.com,和咱們一塊兒致力於解決海量數據採集和傳輸的問題中來吧!

相關文章
相關標籤/搜索