如何基於日誌,同步實現數據的一致性和實時抽取?

1、背景

事情是從公司前段時間的需求提及,你們知道宜信是一家金融科技公司,咱們的不少數據與標準互聯網企業不一樣,大體來講就是:jquery

玩數據的人都知道數據是很是有價值的,而後這些數據是保存在各個系統的數據庫中,如何讓須要數據的使用方獲得一致性、實時的數據呢?git

過去的通用作法有幾種,分別是:github

  • DBA開放各個系統的備庫,在業務低峯期(好比夜間),使用方各自抽取所需數據。因爲抽取時間不一樣,各個數據使用方數據不一致,數據發生衝突,並且重複抽取,相信很多DBA很頭疼這個事情。
  • 公司統一的大數據平臺,經過Sqoop 在業務低峯期到各個系通通一抽取數據, 並保存到Hive表中, 而後爲其餘數據使用方提供數據服務。這種作法解決了一致性問題,但時效性差,基本是T+1的時效。
  • 基於trigger的方式獲取增量變動,主要問題是業務方侵入性大,並且trigger也帶來性能損失。

這些方案都不算完美。咱們在瞭解和考慮了不一樣實現方式後,最後借鑑了 linkedin的思想,認爲要想同時解決數據一致性和實時性,比較合理的方法應該是來自於log。web

(此圖來自:https://www.confluent.io/blog...sql

把增量的Log做爲一切系統的基礎。後續的數據使用方,經過訂閱kafka來消費log。數據庫

好比:json

  • 大數據的使用方能夠將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
  • 提供搜索服務的使用方能夠保存到Elasticsearch或HBase 中;
  • 提供緩存服務的使用方能夠將日誌緩存到Redis或alluxio中;
  • 數據同步的使用方能夠將數據保存到本身的數據庫中;
  • 因爲kafka的日誌是能夠重複消費的,而且緩存一段時間,各個使用方能夠經過消費kafka的日誌來達到既能保持與數據庫的一致性,也能保證明時性;

爲何使用log和kafka做爲基礎,而不使用Sqoop進行抽取呢? 由於:segmentfault

爲何不使用dual write(雙寫)呢?,請參考https://www.confluent.io/blog...緩存

這裏就很少作解釋了。安全

2、整體架構

因而咱們提出了構建一個基於log的公司級的平臺的想法。

下面解釋一下DWS平臺,DWS平臺是有3個子項目組成:

  • Dbus(數據總線):負責實時將數據從源端實時抽出,並轉換爲約定的自帶schema的json格式數據(UMS 數據),放入kafka中;
  • Wormhole(數據交換平臺):負責從kafka讀出數據 將數據寫入到目標中;
  • Swifts(實時計算平臺):負責從kafka中讀出數據,實時計算,並將數據寫回kafka中。

圖中:

  • Log extractor和dbus共同完成數據抽取和數據轉換,抽取包括全量和增量抽取。
  • Wormhole能夠將全部日誌數據保存到HDFS中; 還能夠將數據落地到全部支持jdbc的數據庫,落地到HBash,Elasticsearch,Cassandra等;
  • Swifts支持以配置和SQL的方式實現對進行流式計算,包括支持流式join,look up,filter,window aggregation等功能;
  • Dbus web是dbus的配置管理端,rider除了配置管理之外,還包括對Wormhole和Swifts運行時管理,數據質量校驗等。

因爲時間關係,我今天主要介紹DWS中的Dbus和Wormhole,在須要的時候附帶介紹一下Swifts。

3、dbus解決方案

3.1 日誌解析

如前面所說,Dbus主要解決的是將日誌從源端實時的抽出。 這裏咱們以MySQL爲例子,簡單說明如何實現。

咱們知道,雖然MySQL InnoDB有本身的log,MySQL主備同步是經過binlog來實現的。以下圖:

圖片來自:https://github.com/alibaba/canal

而binlog有三種模式:

  • Row 模式:日誌中會記錄成每一行數據被修改的形式,而後在slave端再對相同的數據進行修改。
  • Statement 模式: 每一條會修改數據的sql都會記錄到 master的bin-log中。slave在複製的時候SQL進程會解析成和原來master端執行過的相同的SQL來再次執行。
  • Mixed模式: MySQL會根據執行的每一條具體的sql語句來區分對待記錄的日誌形式,也就是在Statement和Row之間選擇一種。

他們各自的優缺點以下:

此處來自:http://www.jquerycn.cn/a_13625

因爲statement 模式的缺點,在與咱們的DBA溝經過程中瞭解到,實際生產過程當中都使用row 模式進行復制。這使得讀取全量日誌成爲可能。

一般咱們的MySQL佈局是採用 2個master主庫(vip)+ 1個slave從庫 + 1個backup容災庫 的解決方案,因爲容災庫一般是用於異地容災,實時性不高也不便於部署。

爲了最小化對源端產生影響,顯然咱們讀取binlog日誌應該從slave從庫讀取。

讀取binlog的方案比較多,github上很多,參考https://github.com/search?utf...。最終咱們選用了阿里的canal作位日誌抽取方。

Canal最先被用於阿里中美機房同步, canal原理相對比較簡單:

  • Canal模擬MySQL Slave的交互協議,假裝本身爲MySQL Slave,向MySQL Slave發送dump協議
  • MySQL master收到dump請求,開始推送binary log給Slave(也就是canal)
  • Canal解析binary log對象(原始爲byte流)


圖片來自:https://github.com/alibaba/canal

3.2 解決方案

Dbus 的MySQL版主要解決方案以下:

對於增量的log,經過訂閱Canal Server的方式,咱們獲得了MySQL的增量日誌:

  • 按照Canal的輸出,日誌是protobuf格式,開發增量Storm程序,將數據實時轉換爲咱們定義的UMS格式(json格式,稍後我會介紹),並保存到kafka中;
  • 增量Storm程序還負責捕獲schema變化,以控制版本號;
  • 增量Storm的配置信息保存在Zookeeper中,以知足高可用需求。
  • Kafka既做爲輸出結果也做爲處理過程當中的緩衝器和消息解構區。

在考慮使用Storm做爲解決方案的時候,咱們主要是認爲Storm有如下優勢:

  • 技術相對成熟,比較穩定,與kafka搭配也算標準組合;
  • 實時性比較高,可以知足實時性需求;
  • 知足高可用需求;
  • 經過配置Storm併發度,能夠活動性能擴展的能力;

3.3 全量抽取

對於流水錶,有增量部分就夠了,可是許多表須要知道最初(已存在)的信息。這時候咱們須要initial load(第一次加載)。

對於initial load(第一次加載),一樣開發了全量抽取Storm程序經過jdbc鏈接的方式,從源端數據庫的備庫進行拉取。initial load是拉所有數據,因此咱們推薦在業務低峯期進行。好在只作一次,不須要天天都作。

全量抽取,咱們借鑑了Sqoop的思想。將全量抽取Storm分爲了2 個部分:

  • 數據分片
  • 實際抽取

數據分片須要考慮分片列,按照配置和自動選擇列將數據按照範圍來分片,並將分片信息保存到kafka中。

下面是具體的分片策略:

全量抽取的Storm程序是讀取kafka的分片信息,採用多個併發度並行鏈接數據庫備庫進行拉取。由於抽取的時間可能很長。抽取過程當中將實時狀態寫到Zookeeper中,便於心跳程序監控。

3.4 統一消息格式

不管是增量仍是全量,最終輸出到kafka中的消息都是咱們約定的一個統一消息格式,稱爲UMS(unified message schema)格式。

以下圖所示:

消息中schema部分,定義了namespace 是由 類型+數據源名+schema名+表名+版本號+分庫號+分表號 可以描述整個公司的全部表,經過一個namespace就能惟必定位。

  • _ums_op_ 代表數據的類型是I(insert),U(update),D(刪除);
  • _ums_ts_ 發生增刪改的事件的時間戳,顯然新的數據發生的時間戳更新;
  • _ums_id_ 消息的惟一id,保證消息是惟一的,但這裏咱們保證了消息的前後順序(稍後解釋);

payload是指具體的數據,一個json包裏面能夠包含1條至多條數據,提升數據的有效載荷。

UMS中支持的數據類型,參考了Hive類型並進行簡化,基本上包含了全部數據類型。

3.5 全量和增量的一致性

在整個數據傳輸中,爲了儘可能的保證日誌消息的順序性,kafka咱們使用的是1個partition的方式。在通常狀況下,基本上是順序的和惟一的。

可是咱們知道寫kafka會失敗,有可能重寫,Storm也用重作機制,所以,咱們並不嚴格保證exactly once和徹底的順序性,但保證的是at least once。

所以_ums_id_變得尤其重要。

對於全量抽取,_ums_id_是惟一的,從zk中每一個併發度分別取不一樣的id片區,保證了惟一性和性能,填寫負數,不會與增量數據衝突,也保證他們是早於增量消息的。

對於增量抽取,咱們使用的是MySQL的日誌文件號 + 日誌偏移量做爲惟一id。Id做爲64位的long整數,高7位用於日誌文件號,低12位做爲日誌偏移量。

例如:000103000012345678。 103 是日誌文件號,12345678 是日誌偏移量。

這樣,從日誌層面保證了物理惟一性(即使重作也這個id號也不變),同時也保證了順序性(還能定位日誌)。經過比較_ums_id_ 消費日誌就能經過比較_ums_id_知道哪條消息更新。

其實_ums_ts_與_ums_id_意圖是相似的,只不過有時候_ums_ts_可能會重複,即在1毫秒中發生了多個操做,這樣就得靠比較_ums_id_了。

3.6 心跳監控和預警

整個系統涉及到數據庫的主備同步,Canal Server,多個併發度Storm進程等各個環節。

所以對流程的監控和預警就尤其重要。

經過心跳模塊,例如每分鐘(可配置)對每一個被抽取的表插入一條心態數據並保存發送時間,這個心跳錶也被抽取,跟隨着整個流程下來,與被同步表在實際上走相同的邏輯(由於多個併發的的Storm可能有不一樣的分支),當收到心跳包的時候,即使沒有任何增刪改的數據,也能證實整條鏈路是通的。

Storm程序和心跳程序將數據發送公共的統計topic,再由統計程序保存到influxdb中,使用grafana進行展現,就能夠看到以下效果:

圖中是某業務系統的實時監控信息。上面是實時流量狀況,下面是實時延時狀況。能夠看到,實時性仍是很不錯的,基本上1~2秒數據就已經到末端kafka中。

Granfana提供的是一種實時監控能力。

若是出現延時,則是經過dbus的心跳模塊發送郵件報警或短信報警。

3.7 實時脫敏

考慮到數據安全性,對於有脫敏需求的場景,Dbus的全量storm和增量storm程序也完成了實時脫敏的功能。脫敏方式有3種:

總結一下:簡單的說,Dbus就是將各類源的數據,實時的導出,並以UMS的方式提供訂閱, 支持實時脫敏,實際監控和報警。

4、Wormhole解決方案

說完Dbus,該說一下Wormhole,爲何兩個項目不是一個,而要經過kafka來對接呢?

其中很大一個緣由就是解耦,kafka具備自然的解耦能力,程序直接能夠經過kafka作異步的消息傳遞。Dbus和Wornhole內部也使用了kafka作消息傳遞和解耦。

另一個緣由就是,UMS是自描述的,經過訂閱kafka,任何有能力的使用方來直接消費UMS來使用。

雖然UMS的結果能夠直接訂閱,但還須要開發的工做。Wormhole解決的是:提供一鍵式的配置,將kafka中的數據落地到各類系統中,讓沒有開發能力的數據使用方經過wormhole來實現使用數據。

如圖所示,Wormhole 能夠將kafka中的UMS 落地到各類系統,目前用的最多的HDFS,JDBC的數據庫和HBase。

在技術棧上, wormhole選擇使用spark streaming來進行。

在Wormhole中,一條flow是指從一個namaspace從源端到目標端。一個spark streaming服務於多條flow。

選用Spark的理由是很充分的:

  • Spark自然的支持各類異構存儲系統;
  • 雖然Spark Stream比Storm延時稍差,但Spark有着更好的吞吐量和更好的計算性能;
  • Spark在支持並行計算方面有更強的靈活性;
  • Spark提供了一個技術棧內解決Sparking Job,Spark Streaming,Spark SQL的統一功能,便於後期開發;

這裏補充說一下Swifts的做用:

  • Swifts的本質是讀取kafka中的UMS數據,進行實時計算,將結果寫入到kafka的另一個topic。
  • 實時計算能夠是不少種方式:好比過濾filter,projection(投影),lookup, 流式join window aggregation,能夠完成各類具備業務價值的流式實時計算。

Wormhole和Swifts對好比下:

4.1 落HDFS

經過Wormhole Wpark Streaming程序消費kafka的UMS,首先UMS log能夠被保存到HDFS上。

kafka通常只保存若干天的信息,不會保存所有信息,而HDFS中能夠保存全部的歷史增刪改的信息。這就使得不少事情變爲可能:

  • 經過重放HDFS中的日誌,咱們可以還原任意時間的歷史快照。
  • 能夠作拉鍊表,還原每一條記錄的歷史信息,便於分析;
  • 當程序出現錯誤是,能夠經過回灌(backfill),從新消費消息,從新造成新的快照。

能夠說HDFS中的日誌是不少的事情基礎。

介於Spark原生對parquet支持的很好,Spark SQL可以對Parquet提供很好的查詢。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的內容是全部log的增刪改信息以及_ums_id_,_ums_ts_都存下來。

Wormhole spark streaming根據namespace 將數據分佈存儲到不一樣的目錄中,即不一樣的表和版本放在不一樣目錄中。

因爲每次寫的Parquet都是小文件,你們知道HDFS對於小文件性能並很差,所以另外還有一個job,天天定時將這些的Parquet文件進行合併成大文件。

每一個Parquet文件目錄都帶有文件數據的起始時間和結束時間。這樣在回灌數據時,能夠根據選取的時間範圍來決定須要讀取哪些Parquet文件,沒必要讀取所有數據。

4.2 插入或更新數據的冪等性

經常咱們遇到的需求是,將數據通過加工落地到數據庫或HBase中。那麼這裏涉及到的一個問題就是,什麼樣的數據能夠被更新到數據?

這裏最重要的一個原則就是數據的冪等性。

不管是遇到增刪改任何的數據,咱們面臨的問題都是:

  • 該更新哪一行;
  • 更新的策略是什麼。

對於第一個問題,其實就須要定位數據要找一個惟一的鍵,常見的有:

  • 使用業務庫的主鍵;
  • 由業務方指定幾個列作聯合惟一索引;

對於第二個問題,就涉及到_ums_id_了,由於咱們已經保證了_ums_id_大的值更新,所以在找到對應數據行後,根據這個原則來進行替換更新。

之因此要軟刪除和加入_is_active_列,是爲了這樣一種狀況:

若是已經插入的_ums_id_比較大,是刪除的數據(代表這個數據已經刪除了), 若是不是軟刪除,此時插入一個_ums_id_小的數據(舊數據),就會真的插入進去。

這就致使舊數據被插入了。不冪等了。因此被刪除的數據依然保留(軟刪除)是有價值的,它能被用於保證數據的冪等性。

4.3 HBase的保存

插入數據到Hbase中,至關要簡單一些。不一樣的是HBase能夠保留多個版本的數據(固然也能夠只保留一個版本)默認是保留3個版本;

所以插入數據到HBase,須要解決的問題是:

  • 選擇合適的rowkey:Rowkey的設計是能夠選的,用戶能夠選擇源表的主鍵,也能夠選擇若干列作聯合主鍵。
  • 選擇合適的version:使用_ums_id_+ 較大的偏移量(好比100億) 做爲row的version。

Version的選擇頗有意思,利用_ums_id_的惟一性和自增性,與version自身的比較關係一致:即version較大等價於_ums_id_較大,對應的版本較新。

從提升性能的角度,咱們能夠將整個Spark Streaming的Dataset集合直接插入到HBase,不須要比較。讓HBase基於version自動替咱們判斷哪些數據能夠保留,哪些數據不須要保留。

Jdbc的插入數據:插入數據到數據庫中,保證冪等的原理雖然簡單,要想提升性能在實現上就變得複雜不少,總不能一條一條的比較而後在插入或更新。

咱們知道Spark的RDD/dataset都是以集合的方式來操做以提升性能,一樣的咱們須要以集合操做的方式實現冪等性。

具體思路是:

  • 首先根據集合中的主鍵到目標數據庫中查詢,獲得一個已有數據集合;
  • 與dataset中的集合比較,分出兩類:

A:不存在的數據,即這部分數據insert就能夠;

B:存在的數據,比較_ums_id_, 最終只將哪些_ums_id_更新較大row到目標數據庫,小的直接拋棄。

使用Spark的同窗都知道,RDD/dataset都是能夠partition的,可使用多個worker並進行操做以提升效率。

在考慮併發狀況下,插入和更新均可能出現失敗,那麼還有考慮失敗後的策略。

好比:由於別的worker已經插入,那麼由於惟一性約束插入失敗,那麼須要改成更新,還要比較_ums_id_看是否可以更新。

對於沒法插入其餘狀況(好比目標系統有問題),Wormhole還有重試機制。插入到其餘存儲中的就很少介紹了,總的原則是:根據各自存儲自身特性,設計基於集合的,併發的插入數據實現。這些都是Wormhole爲了性能而作的努力,使用Wormhole的用戶沒必要關心 。

5、運用案例

5.1 實時營銷

說了那麼多,DWS有什麼實際運用呢?下面我來介紹某系統使用DWS實現了的實時營銷。

如上圖所示:

系統A的數據都保存到本身的數據庫中,咱們知道,宜信提供不少金融服務,其中包括借款,而借款過程當中很重要的就是信用審覈。

借款人須要提供證實具備信用價值的信息,好比央行徵信報告,是具備最強信用數據的數據。 而銀行流水,網購流水也是具備較強的信用屬性的數據。

借款人經過Web或手機APP在系統A中填寫信用信息時,可能會某些緣由沒法繼續,雖然可能這個借款人是一個優質潛在客戶,但之前因爲沒法或好久才能知道這個信息,因此實際上這樣的客戶是流失了。

應用了DWS之後,借款人已經填寫的信息已經記錄到數據庫中,並經過DWS實時的進行抽取、計算和落地到目標庫中。根據對客戶的打分,評價出優質客戶。而後馬上將這個客戶的信息輸出到客服系統中。

客服人員在很短的時間(幾分鐘之內)就經過打電話的方式聯繫上這個借款人(潛客),進行客戶關懷,將這個潛客轉換爲真正的客戶。咱們知道借款是有時效性的,若是時間過久就沒有價值了。

若是沒有實時抽取/計算/落庫的能力,那麼這一切都沒法實現。

5.2 實時報表系統

另一個實時報表的應用以下:

咱們數據使用方的數據來自多個系統,之前是經過T+1的方式得到報表信息,而後指導次日的運營,這樣時效性不好。

經過DWS,將數據從多個系統中實時抽取,計算和落地,並提供報表展現,使得運營能夠及時做出部署和調整,快速應對。

6、總結

  • DWS技術上基於主流實時流式大數據技術框架,高可用大吞吐強水平擴容,低延遲高容錯最終一致。
  • DWS能力上支持異構多源多目標系統,支持多數據格式(結構化半結構化非結構化數據)和實時技術能力。
  • DWS將三個子項目合併做爲一個平臺推出,使得咱們具有了實時的能力, 驅動各類實時場景應用。
  • 適合場景包括:實時同步/實時計算/實時監控/實時報表/實時分析/實時洞察/實時管理/實時運營/實時決策

做者:王東

7月25日晚8點,線上直播,【AI中臺——智能聊天機器人平臺】,點擊瞭解詳情。

來源:宜信技術學院

相關文章
相關標籤/搜索