拆解大數據總線平臺DBus的系統架構

拓展閱讀:mysql

如何基於日誌,同步實現數據的一致性和實時抽取?git

快速部署DBus體驗實時數據流計算github

Dbus所支持兩類數據源的實現原理與架構拆解。正則表達式

大致來講,Dbus支持兩類數據源:sql

  • RDBMS數據源
  • 日誌類數據源

1、RMDBMS類數據源的實現

以mysql爲例子. 分爲三個部分:數據庫

  • 日誌抽取模塊
  • 增量轉換模塊
  • 全量拉取模塊

1.1 日誌抽取模塊(Extractor)

mysql 日誌抽取模塊由兩部分構成:json

  • canal server:負責從mysql中抽取增量日誌。
  • mysql-extractor storm程序:負責將增量日誌輸出到kafka中,過濾不須要的表數據,保證at least one和高可用。

咱們知道,雖然mysql innodb有本身的log,mysql主備同步是經過binlog來實現的。而binlog同步有三種模式:Row 模式,Statement 模式,Mixed模式。由於statement模式有各類限制,一般生產環境都使用row模式進行復制,使得讀取全量日誌成爲可能。架構

一般咱們的mysql佈局是採用 2個master主庫(vip)+ 1個slave從庫 + 1個backup容災庫 的解決方案,因爲容災庫一般是用於異地容災,實時性不高也不便於部署。併發

爲了最小化對源端產生影響,咱們讀取binlog日誌從slave從庫讀取。oracle

讀取binlog的方案比較多,DBus也是站在巨人的肩膀上,對於Mysql數據源使用阿里巴巴開源的Canal來讀取增量日誌。這樣作的好處是:

  • 不用重複開發避免重複造輪子
  • 享受canal升級帶來的好處

關於Canal的介紹可參考:https://github.com/alibaba/canal/wiki/Introduction 因爲canal用戶抽取權限比較高,通常canal server節點也能夠由DBA組來維護。

日誌抽取模塊的主要目標是將數據從canal server中讀出,儘快落地到第一級kafka中,避免數據丟失(畢竟長時間不讀日誌數據,可能日誌會滾到好久之前,可能會被DBA刪除),所以須要避免作過多的事情,主要就作一下數據拆包工做防止數據包過大。

從高可用角度考慮,在使用Canal抽取過程當中,採用的基於zookeeper的Canal server高可用模式,不存在單點問題,日誌抽取模塊extractor也使用storm程序,一樣也是高可用架構。

不一樣數據源有不一樣的日誌抽取方式,好比oracle,mongo等都有相應的日誌抽取程序。

DBus日誌抽取模塊獨立出來是爲了兼容這些不一樣數據源的不一樣實現方式。

1.2 增量轉換模塊(Stream)

增量數據處理模塊,根據不一樣的數據源類型的格式進行轉換和處理。

1)分發模塊dispatcher

  • 未來自數據源的日誌按照不一樣的schema分發到不一樣topic上。這樣作的目的
  • 是爲了數據隔離(由於通常不一樣的shema對應不一樣的數據庫)
  • 是爲了分離轉換模塊的計算壓力,由於轉換模塊計算量比較大,能夠部署多個,每一個schema一個提升效率。

2)轉換模塊appender

  • 實時數據格式轉換:Canal數據是protobuf格式,須要轉換爲咱們約定的UMS格式,生成惟一標識符ums_id和ums_ts等;
  • 捕獲元數據版本變動:好比表加減列,字段變動等,維護版本信息,發出通知觸發告警
  • 實時數據脫敏:根據須要對指定列進行脫敏,例如替換爲***,MD5加鹽等。
  • 響應拉全量事件:當收到拉全量請求時爲了保證數據的相應順序行,會暫停拉增量數據,等全量數據完成後,再繼續。
  • 監控數據:分發模塊和轉換模塊都會響應心跳event,統計每一張表在兩次心跳中的數據和延時狀況,發送到statistic做爲監控數據使用。
  • 分發模塊和轉換模塊都會相應相關reload通知事件從Mgr庫和zk上進行加載配置操做。

1.3 全量拉取模塊(FullPuller)

全量拉取可用於初始化加載(Initial load), 數據從新加載,實現上咱們借鑑了sqoop的思想。將全量過程分爲了2 個部分:

1)數據分片

分片讀取max,min,count等信息,根據片大小計算分片數,生成分片信息保存在split topic中。下面是具體的分片策略:

以實際的經驗,對於mysql InnDB,只有使用主鍵索引進行分片,才能高效。由於mysql innDB的主鍵列與數據存儲順序一致。

2)實際拉取

每一個分片表明一個小任務,由拉取轉換模塊經過多個併發度的方式鏈接slave從庫進行拉取。 拉取完成狀況寫到zookeeper中,便於監控。

全量拉取對源端數據庫是有必定壓力的,咱們作法是:

  • 從slave從庫拉取數據
  • 控制併發度6~8
  • 推薦在業務低峯期進行

全量拉取不是常常發生的,通常作初始化拉取一次,或者在某種狀況下須要全量時能夠觸發一次。

1.3 全量和增量的一致性

在整個數據傳輸中,爲了儘可能的保證日誌消息的順序性,kafka咱們使用的是1個partition的方式。在通常狀況下,基本上是順序的和惟一的。 但若是出現寫kafka異步寫入部分失敗, storm也用重作機制,所以,咱們並不嚴格保證exactly once和徹底的順序性,但保證的是at least once。

所以ums_id_變得尤其重要。 對於全量抽取,ums_id是一個值,該值爲全量拉取event的ums_id號,表示該批次的全部數據是一批的,由於數據都是不一樣的能夠共享一個ums_id_號。ums_uid_流水號從zk中生成,保證了數據的惟一性。 對於增量抽取,咱們使用的是 mysql的日誌文件號 + 日誌偏移量做爲惟一id。Id做爲64位的long整數,高6位用於日誌文件號,低13位做爲日誌偏移量。 例如:000103000012345678。 103 是日誌文件號,12345678 是日誌偏移量。 這樣,從日誌層面保證了物理惟一性(即使重作也這個id號也不變),同時也保證了順序性(還能定位日誌)。經過比較ums_id_就能知道哪條消息更新。

ums_ts_的價值在於從時間維度上能夠準確知道event發生的時間。好比:若是想獲得一個某時刻的快照數據。能夠經過ums_ts 來知道截斷時間點。

2、日誌類數據源的實現

業界日誌收集、結構化、分析工具方案不少,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,各有所長。在結構化日誌這個方面,大多采用配置正則表達式模板:用於提取日誌中模式比較固定、通用的部分,例如日誌時間、日誌類型、行號等。對於真正的和業務比較相關的信息,這邊部分是最重要的,稱爲message部分,咱們但願使用可視化的方式來進行結構化。

例如:對於下面所示的類log4j的日誌:

若是用戶想將上述數據轉換爲以下的結構化數據信息:

咱們稱這樣的日誌爲「數據日誌」

DBUS設計的數據日誌同步方案以下:

  • 日誌抓取端採用業界流行的組件(例如Logstash、Flume、Filebeat等)。一方面便於用戶和業界統一標準,方便用戶的整合;另外一方面也避免無謂的重造輪子。抓取數據稱爲原始數據日誌(raw data log)放進Kafka中,等待處理。
  • **提供可視化界面,配置規則來結構化日誌。**用戶可配置日誌來源和目標。同一個日誌來源能夠輸出到多個目標。每一條「日誌源-目標」線,中間數據通過的規則處理用戶根據本身的需求來自由定義。最終輸出的數據是結構化的,即:有schema約束,能夠理解爲相似數據庫中的表。
  • 所謂規則,在DBUS中,即**「規則算子」**。DBUS設計了豐富易用的過濾、拆分、合併、替換等算子供用戶使用。用戶對數據的處理可分多個步驟進行,每一個步驟的數據處理結果可即時查看、驗證;可重複使用不一樣算子,直到轉換、裁剪獲得本身須要的數據。
  • 將配置好的規則算子組運用到執行引擎中,對目標日誌數據進行預處理,造成結構化數據,輸出到Kafka,供下游數據使用方使用。

系統流程圖以下所示:

根據配置,咱們支持同一條原始日誌,能提取爲一個表數據,或者能夠提取爲多個表數據。

每一個表是結構化的,知足相同的schema。

  • 每一個表是一個規則 算子組的合集,能夠配置1個到多個規則算子組
  • 每一個規則算子組,由一組規則算子組合而成

拿到一條原始數據日誌, 它最終應該屬於哪張表呢?

每條日誌須要與規則算子組進行匹配:

  • 符合條件的進入規則算子組的,最終被規則組轉換爲結構化的表數據。
  • 不符合的嘗試下一個規則算子組。
  • 都不符合的,進入unknown_table表。

2.1 規則算子

規則算子是對數據進行過濾、加工、轉換的基本單元。常見的規則算子以下:

算子之間是獨立的,經過組合不一樣的算子達到更復雜的功能,對算子進行迭代使用最終達到對任意數據進行加工的目的。

咱們試圖使得算子儘可能知足正交性或易用性(雖然正則表達式很強大,但咱們仍然開發一些簡單算子例如trim算子來完成簡單功能,以知足易用性)。

3、UMS統一消息格式

不管是增量、全量仍是日誌,最終輸出到結果kafka中的消息都是咱們約定的統一消息格式,稱爲UMS(unified message schema)格式。以下圖所示:

3.1 Protocol

數據的類型,被UMS的版本號

3.2 schema

1)namespace 由:類型. 數據源名.schema名 .表名.表版本號. 分庫號 .分表號 組成,可以描述全部表。

例如:mysql.db1.schema1.testtable.5.0.0

2)fields是字段名描述。

  • ums_id_ 消息的惟一id,保證消息是惟一的
  • ums_ts_ canal捕獲事件的時間戳;
  • ums_op_ 代表數據的類型是I (insert),U (update),B (before Update),D(delete)
  • ums_uid_ 數據流水號,惟一值

3)payload是指具體的數據。

一個json包裏面能夠包含1條至多條數據,提升數據的有效載荷。

4、心跳監控和預警

RDBMS類系統涉及到數據庫的主備同步,日誌抽取,增量轉換等多個模塊等。

日誌類系統涉及到日誌抽取端,日誌轉換模模塊等。

如何知道系統正在健康工做,數據是否可以實時流轉? 所以對流程的監控和預警就尤其重要。

4.1 對於RDBMS類系統

心跳模塊從dbusmgr庫中得到須要監控的表列表,以固定頻率(好比每分鐘)向源端dbus庫的心跳錶插入心跳數據(該數據中帶有發送時間),該心跳錶也做爲增量數據被實時同步出來,而且與被同步表走相同的邏輯和線程(爲了保證順序性,當遇到多併發度時是sharding by table的,心跳數據與table數據走一樣的bolt),這樣當收到心跳數據時,即使沒有任何增刪改的數據,也能證實整條鏈路是通的。

增量轉換模塊和心跳模塊在收到心跳包數據後,就會發送該數據到influxdb中做爲監控數據,經過grafana進行展現。 心跳模塊還會監控延時狀況,根據延時狀況給以報警。

4.2 對於日誌類系統

從源端就會自動產生心跳包,相似RDBMS系統,將心跳包經過抽取模塊,和算子轉換模塊同步到末端,由心跳模塊負責監控和預警。

來源:宜信技術學院

相關文章
相關標籤/搜索