一.Flume收集各數據庫日誌,準實時抽取到HDFSmysql
安裝HDP,包含Flumeweb
方案優勢:sql
1.配置簡單,不用編程:只要在flume.conf文件中配置source、channel及sink的相關屬性數據庫
2.採用普通SQL輪詢的方式實現,具備通用性,適用於全部關係庫數據源編程
方案缺點:json
1.在源庫上執行了查詢,具備入侵性bootstrap
2.經過輪詢的方式實現增量,只能作到準實時,並且輪詢間隔越短,對源庫的影響越大緩存
3.只能識別新增數據,檢測不到刪除與更新併發
4.要求源庫必須有用於表示增量的字段oracle
二.canal
原理:
- canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送(slave拉取,不是master主動push給slaves)binary log給slave(也就是canal)
- canal解析binary log對象(原始爲byte流)
mysql中須要配置一個用戶,專門提供給canal用
canal開源代碼中發送端僅僅支持mysql,不支持oracle,接收端因爲採用jdbc,mysql、oracle等能夠通吃。
三.maxwell
優勢:
- 支持bootstrap啓動,同步歷史數據
- 集成kafka,直接將數據落地到kafka
- 已將binlog中的DML和DDL進行了模式匹配,將其解碼爲有schema的json(有利於後期將其重組爲nosql支持的語言)
{「database」:」test」,」table」:」e」,」type」:」update」,」ts」:1488857869,」xid」:8924,」commit」:true,」data」:{「id」:1,」m」:5.556666,」torvalds」:null},」old」:{「m」:5.55}}
缺點:
- 一個MySQL實例須要對應一個maxwell進程
- bootstrap的方案使用的是
select *
maxwell的配置文件只有一個config.properties,在home目錄。其中除了須要配置mysql master的地址、kafka地址還須要配置一個用於存放maxwell相關信息的mysql地址,maxwell會把讀取binlog關係的信息,如binlog name、position。
工具對比
方案對比
- 方案1使用阿里開源的Canal進行Mysql binlog數據的抽取,另需開發一個數據轉換工具將從binlog中解析出的數據轉換成自帶schema的json數據並寫入kafka中。而方案2使用maxwell可直接完成對mysql binlog數據的抽取和轉換成自帶schema的json數據寫入到kafka中。
- 方案1中不支持表中已存在的歷史數據進行同步,此功能須要開發(若是使用sqoop進行歷史數據同步,不夠靈活,會使結果表與原始表結構相同,有區別於數據交換平臺所需的schema)。方案2提供同步歷史數據的解決方案。
- 方案1支持HA部署,而方案2不支持HA
方案1和方案2的區別只在於kafka以前,當數據緩存到kafka以後,須要一個定製的數據路由組件來將自帶schema的數據解析到目標存儲中。
數據路由組件主要負責將kafka中的數據實時讀出,寫入到目標存儲中。(如將全部日誌數據保存到HDFS中,也能夠將數據落地到全部支持jdbc的數據庫,落地到HBase,Elasticsearch等。)
maxwell:
MySQL->Maxwell->Kafka->Flume->HDFS
寫入HDFS的數據時json的,可能還須要提取只須要的數據,另外,對於update或delete操做目前還不知道要怎麼處理。生產使用難度很大。
把增量的Log做爲一切系統的基礎。後續的數據使用方,經過訂閱kafka來消費log。
好比:
- 大數據的使用方能夠將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
- 提供搜索服務的使用方能夠保存到Elasticsearch或HBase 中;
- 提供緩存服務的使用方能夠將日誌緩存到Redis或alluxio中;
- 數據同步的使用方能夠將數據保存到本身的數據庫中;
- 因爲kafka的日誌是能夠重複消費的,而且緩存一段時間,各個使用方能夠經過消費kafka的日誌來達到既能保持與數據庫的一致性,也能保證明時性;
爲何使用log和kafka做爲基礎,而不使用Sqoop進行抽取呢? 由於:
DWS平臺, DWS平臺是有3個子項目組成:
- Dbus(數據總線):負責實時將數據從源端實時抽出,並轉換爲約定的自帶schema的json格式數據(UMS 數據),放入kafka中;
- Wormhole(數據交換平臺):負責從kafka讀出數據 將數據寫入到目標中;
- Swifts(實時計算平臺):負責從kafka中讀出數據,實時計算,並將數據寫回kafka中。
圖中:
- Log extractor和dbus共同完成數據抽取和數據轉換,抽取包括全量和增量抽取。
- Wormhole能夠將全部日誌數據保存到HDFS中; 還能夠將數據落地到全部支持jdbc的數據庫,落地到HBash,Elasticsearch,Cassandra等;
- Swifts支持以配置和SQL的方式實現對進行流式計算,包括支持流式join,look up,filter,window aggregation等功能;
- Dbus web是dbus的配置管理端,rider除了配置管理之外,還包括對Wormhole和Swifts運行時管理,數據質量校驗等。
對於增量的log,經過訂閱Canal Server的方式,咱們獲得了MySQL的增量日誌:
- 按照Canal的輸出,日誌是protobuf格式,開發增量Storm程序,將數據實時轉換爲咱們定義的UMS格式(json格式,稍後我會介紹),並保存到kafka中;
- 增量Storm程序還負責捕獲schema變化,以控制版本號;
- 增量Storm的配置信息保存在Zookeeper中,以知足高可用需求。
- Kafka既做爲輸出結果也做爲處理過程當中的緩衝器和消息解構區。
- 在考慮使用Storm做爲解決方案的時候,咱們主要是認爲Storm有如下優勢:
- 技術相對成熟,比較穩定,與kafka搭配也算標準組合;
- 實時性比較高,可以知足實時性需求;
- 知足高可用需求;
- 經過配置Storm併發度,能夠活動性能擴展的能力;
全量抽取
對於流水錶,有增量部分就夠了,可是許多表須要知道最初(已存在)的信息。這時候咱們須要initial load(第一次加載)。
對於initial load(第一次加載),一樣開發了全量抽取Storm程序經過jdbc鏈接的方式,從源端數據庫的備庫進行拉取。initial load是拉所有數據,因此咱們推薦在業務低峯期進行。好在只作一次,不須要天天都作。
全量抽取,咱們借鑑了Sqoop的思想。將全量抽取Storm分爲了2 個部分:
- 數據分片
- 實際抽取
數據分片須要考慮分片列,按照配置和自動選擇列將數據按照範圍來分片,並將分片信息保存到kafka中。
下面是具體的分片策略:
全量抽取的Storm程序是讀取kafka的分片信息,採用多個併發度並行鏈接數據庫備庫進行拉取。由於抽取的時間可能很長。抽取過程當中將實時狀態寫到Zookeeper中,便於心跳程序監控。