實時數據同步方案

一.Flume收集各數據庫日誌,準實時抽取到HDFSmysql

 

    安裝HDP,包含Flumeweb

    方案優勢:sql

        1.配置簡單,不用編程:只要在flume.conf文件中配置source、channel及sink的相關屬性數據庫

        2.採用普通SQL輪詢的方式實現,具備通用性,適用於全部關係庫數據源編程

   方案缺點:json

        1.在源庫上執行了查詢,具備入侵性bootstrap

        2.經過輪詢的方式實現增量,只能作到準實時,並且輪詢間隔越短,對源庫的影響越大緩存

        3.只能識別新增數據,檢測不到刪除與更新併發

        4.要求源庫必須有用於表示增量的字段oracle

 

二.canal

原理:

  1. canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送(slave拉取,不是master主動push給slaves)binary log給slave(也就是canal)
  3. canal解析binary log對象(原始爲byte流)

mysql中須要配置一個用戶,專門提供給canal用

canal開源代碼中發送端僅僅支持mysql,不支持oracle,接收端因爲採用jdbc,mysql、oracle等能夠通吃。

 

 

三.maxwell

    優勢:

  • 支持bootstrap啓動,同步歷史數據
  • 集成kafka,直接將數據落地到kafka
  • 已將binlog中的DML和DDL進行了模式匹配,將其解碼爲有schema的json(有利於後期將其重組爲nosql支持的語言)
    {「database」:」test」,」table」:」e」,」type」:」update」,」ts」:1488857869,」xid」:8924,」commit」:true,」data」:{「id」:1,」m」:5.556666,」torvalds」:null},」old」:{「m」:5.55}}

    缺點:

  • 一個MySQL實例須要對應一個maxwell進程
  • bootstrap的方案使用的是select *

maxwell的配置文件只有一個config.properties,在home目錄。其中除了須要配置mysql master的地址、kafka地址還須要配置一個用於存放maxwell相關信息的mysql地址,maxwell會把讀取binlog關係的信息,如binlog name、position。

 

工具對比

方案對比

  1. 方案1使用阿里開源的Canal進行Mysql binlog數據的抽取,另需開發一個數據轉換工具將從binlog中解析出的數據轉換成自帶schema的json數據並寫入kafka中。而方案2使用maxwell可直接完成對mysql binlog數據的抽取和轉換成自帶schema的json數據寫入到kafka中。
  2. 方案1中不支持表中已存在的歷史數據進行同步,此功能須要開發(若是使用sqoop進行歷史數據同步,不夠靈活,會使結果表與原始表結構相同,有區別於數據交換平臺所需的schema)。方案2提供同步歷史數據的解決方案。
  3. 方案1支持HA部署,而方案2不支持HA

方案1和方案2的區別只在於kafka以前,當數據緩存到kafka以後,須要一個定製的數據路由組件來將自帶schema的數據解析到目標存儲中。
數據路由組件主要負責將kafka中的數據實時讀出,寫入到目標存儲中。(如將全部日誌數據保存到HDFS中,也能夠將數據落地到全部支持jdbc的數據庫,落地到HBase,Elasticsearch等。)

 

maxwell:

    MySQL->Maxwell->Kafka->Flume->HDFS

    寫入HDFS的數據時json的,可能還須要提取只須要的數據,另外,對於update或delete操做目前還不知道要怎麼處理。生產使用難度很大。

 

 

把增量的Log做爲一切系統的基礎。後續的數據使用方,經過訂閱kafka來消費log。

好比:

  • 大數據的使用方能夠將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
  • 提供搜索服務的使用方能夠保存到Elasticsearch或HBase 中;
  • 提供緩存服務的使用方能夠將日誌緩存到Redis或alluxio中;
  • 數據同步的使用方能夠將數據保存到本身的數據庫中;
  • 因爲kafka的日誌是能夠重複消費的,而且緩存一段時間,各個使用方能夠經過消費kafka的日誌來達到既能保持與數據庫的一致性,也能保證明時性;

爲何使用log和kafka做爲基礎,而不使用Sqoop進行抽取呢? 由於:

 

 

DWS平臺, DWS平臺是有3個子項目組成:

  1. Dbus(數據總線):負責實時將數據從源端實時抽出,並轉換爲約定的自帶schema的json格式數據(UMS 數據),放入kafka中;
  2. Wormhole(數據交換平臺):負責從kafka讀出數據 將數據寫入到目標中;
  3. Swifts(實時計算平臺):負責從kafka中讀出數據,實時計算,並將數據寫回kafka中。

 

圖中:

  • Log extractor和dbus共同完成數據抽取和數據轉換,抽取包括全量和增量抽取。
  • Wormhole能夠將全部日誌數據保存到HDFS中; 還能夠將數據落地到全部支持jdbc的數據庫,落地到HBash,Elasticsearch,Cassandra等;
  • Swifts支持以配置和SQL的方式實現對進行流式計算,包括支持流式join,look up,filter,window aggregation等功能;
  • Dbus web是dbus的配置管理端,rider除了配置管理之外,還包括對Wormhole和Swifts運行時管理,數據質量校驗等。

 

對於增量的log,經過訂閱Canal Server的方式,咱們獲得了MySQL的增量日誌:

  • 按照Canal的輸出,日誌是protobuf格式,開發增量Storm程序,將數據實時轉換爲咱們定義的UMS格式(json格式,稍後我會介紹),並保存到kafka中;
  • 增量Storm程序還負責捕獲schema變化,以控制版本號;
  • 增量Storm的配置信息保存在Zookeeper中,以知足高可用需求。
  • Kafka既做爲輸出結果也做爲處理過程當中的緩衝器和消息解構區。
  • 在考慮使用Storm做爲解決方案的時候,咱們主要是認爲Storm有如下優勢:
  • 技術相對成熟,比較穩定,與kafka搭配也算標準組合;
  • 實時性比較高,可以知足實時性需求;
  • 知足高可用需求;
  • 經過配置Storm併發度,能夠活動性能擴展的能力;

全量抽取

對於流水錶,有增量部分就夠了,可是許多表須要知道最初(已存在)的信息。這時候咱們須要initial load(第一次加載)。

對於initial load(第一次加載),一樣開發了全量抽取Storm程序經過jdbc鏈接的方式,從源端數據庫的備庫進行拉取。initial load是拉所有數據,因此咱們推薦在業務低峯期進行。好在只作一次,不須要天天都作。

全量抽取,咱們借鑑了Sqoop的思想。將全量抽取Storm分爲了2 個部分:

  1. 數據分片
  2. 實際抽取

數據分片須要考慮分片列,按照配置和自動選擇列將數據按照範圍來分片,並將分片信息保存到kafka中。

下面是具體的分片策略:

全量抽取的Storm程序是讀取kafka的分片信息,採用多個併發度並行鏈接數據庫備庫進行拉取。由於抽取的時間可能很長。抽取過程當中將實時狀態寫到Zookeeper中,便於心跳程序監控。

相關文章
相關標籤/搜索