大數據總線平臺架構

寫在前面

研發團隊,研發規模發展到必定階段,各類平臺化,中臺化的方案就走上了日程。見多了業務架構的平臺化方案,今天咱們來拆解下數據總線平臺的架構。mysql

數據總線平臺架構

數據平臺的數據源主要來自於兩個渠道:nginx

  1. 關係數據庫
  2. 日誌數據

先看一張通用的數據總線平臺架構圖:正則表達式

數據採集

關係數據庫源數據採集,通常採用模擬mysql的slave方式接收binlog信息以實現數據抽取,同時須要對日誌信息進行信息轉換,轉換後數據入kafka進行平滑流控傳輸,下游消費者進行數據消費,寫入數據管理平臺。redis

日誌數據來自於各類中間件數據,好比redis日誌,nginx日誌,rpc日誌,es日誌,文件系統日誌等,經過filebeat或者socket方式到服務器節點agent,經過agent採集並統一發往kafka系統,以後寫入數據管理平臺。sql

關係數據庫採集

採集流程分爲三個部分:數據庫

  1. 日誌抽取模塊
  2. 增量轉換模塊
  3. 全量拉取模塊

日誌抽取模塊由兩部分組成:json

  • canal server:負責從mysql拉取增量日誌
  • mysql-extractor storm:負責將增量日誌輸出到kafka,過濾掉不須要的表數據,保證at least one和高可用

mysql主備是經過binlog實現的。binlog同步有三種模式:服務器

  • Row模式
  • Statement模式
  • Mixed模式

通常採用Row模式進行復制,能夠讀取全量日誌。架構

部署上能夠採用2個master(vip)+1個slave+1個backup做爲容災,讀取binlog日誌從slave讀取。併發

binlog採集工具比較多,有dbus和阿里的canal均可以進行增量數據讀取。

日誌抽取模塊將目標數據從canal server讀取,放到kafka中。

能夠基於zk的canal server高可用模式,不出現單點問題,日誌抽取模塊能夠用storm程序,一樣作好高可用。

增量日誌抽取流程以下:

分發模塊:

  • 未來自數據源的日誌按照不一樣的schema分發到不一樣topic上,這樣爲了數據隔離,通常不一樣到schema對應不一樣的數據庫
  • 同時爲了分離轉換模塊的計算的壓力,轉換模塊計算量較大,能夠多節點部署,每一個schema一個,以便提高效率

轉換模塊:

  • 實時數據格式轉換模塊,canal數據是pb編碼格式,須要轉換成業務要求的格式,並生成相關id信息
  • 實時數據脫敏,對指定列信息進行脫敏,編碼,加鹽等
  • 響應全量事件的能力,當收到須要響應全量數據的需求時,爲保證數據順序,會暫停拉取增量數據,等全量完成以後,再繼續
  • 監控數據,分發模塊和轉換模塊都會響應event,統計每一張表在兩次心跳中的數據和延遲狀況,發到統計系統進行監控數據
  • 分發模塊和轉換模塊,均可以執行reload事件,對zk上的源數據進行加載配置

全量拉取:

全量拉取借鑑sqoop思想,整個全量過程分爲兩個部分:

  1. 數據分片
  2. 實際拉取

數據分片

分片獲取max,min,count等信息,根據片大小計算分片數,生成分片信息保存到split topic中。

關係數據採用主鍵索引進行分片,高效,且主鍵和數據存儲順序一致。

實際拉取

每一個分片表明一個小任務,由拉取轉換模塊經過多個併發度到方式鏈接slave從庫拉取,完成狀況彙報到zk中,便於監控。

因爲全量拉取對於源數據庫有必定的壓力,作法以下:

  1. 從slave從庫拉取數據
  2. 併發度6~8
  3. 推薦在業務低峯期進行

全量拉取不常常發生,通常作初始化拉取一次,某種狀況下須要全量時能夠觸發一次。

一致性處理

爲保證日誌消息順序性,kafka咱們使用一個partition方式,基本上順序的和惟一的。若是出現寫kafka異步寫入失敗,storm有重作機制,所以並非嚴格保證exactly once和徹底順序性,保證的是at least once。

所以ums_id_變得尤其重要。 對於全量抽取,ums_id是一個值,該值爲全量拉取event的ums_id號,表示該批次的全部數據是一批的,由於數據都是不一樣的能夠共享一個ums_id_號。ums_uid_流水號從zk中生成,保證了數據的惟一性。 對於增量抽取,咱們使用的是 mysql的日誌文件號 + 日誌偏移量做爲惟一id。Id做爲64位的long整數,高6位用於日誌文件號,低13位做爲日誌偏移量。 例如:000103000012345678。 103 是日誌文件號,12345678 是日誌偏移量。 這樣,從日誌層面保證了物理惟一性(即使重作也這個id號也不變),同時也保證了順序性(還能定位日誌)。經過比較ums_id_就能知道哪條消息更新。

ums_ts_的價值在於從時間維度上能夠準確知道event發生的時間。好比:若是想獲得一個某時刻的快照數據。能夠經過ums_ts 來知道截斷時間點。

日誌數據採集

業界日誌收集、結構化、分析工具方案不少,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,各有所長。

在結構化日誌這個方面,大多采用配置正則表達式模板:用於提取日誌中模式比較固定、通用的部分,例如日誌時間、日誌類型、行號等。對於真正的和業務比較相關的信息,這邊部分是最重要的,稱爲message部分,咱們但願使用可視化的方式來進行結構化。

log4j的日誌以下:

若是想將上述日誌轉換成結構化數據:

DBUS設計的數據日誌同步方案以下:

日誌抓取端採用業界流行的組件(例如Logstash、Flume、Filebeat等)。一方面便於用戶和業界統一標準,方便用戶的整合;另外一方面也避免無謂的重造輪子。抓取數據稱爲原始數據日誌(raw data log)放進Kafka中,等待處理。

提供可視化界面,配置規則來結構化日誌。用戶可配置日誌來源和目標。同一個日誌來源能夠輸出到多個目標。每一條「日誌源-目標」線,中間數據通過的規則處理用戶根據本身的需求來自由定義。最終輸出的數據是結構化的,即:有schema約束,能夠理解爲相似數據庫中的表。

所謂規則,在DBUS中,即「規則算子」。DBUS設計了豐富易用的過濾、拆分、合併、替換等算子供用戶使用。用戶對數據的處理可分多個步驟進行,每一個步驟的數據處理結果可即時查看、驗證;可重複使用不一樣算子,直到轉換、裁剪獲得本身須要的數據。

將配置好的規則算子組運用到執行引擎中,對目標日誌數據進行預處理,造成結構化數據,輸出到Kafka,供下游數據使用方使用。

流程以下:

根據配置,咱們支持同一條原始日誌,能提取爲一個表數據,或者能夠提取爲多個表數據。

每一個表是結構化的,知足相同的schema。

每一個表是一個規則 算子組的合集,能夠配置1個到多個規則算子組 每一個規則算子組,由一組規則算子組合而成 拿到一條原始數據日誌, 它最終應該屬於哪張表呢?

每條日誌須要與規則算子組進行匹配:

符合條件的進入規則算子組的,最終被規則組轉換爲結構化的表數據。 不符合的嘗試下一個規則算子組。 都不符合的,進入unknown_table表。

自定義統一消息格式

不管是增量、全量仍是日誌,最終輸出到結果kafka中的消息都是咱們約定的統一消息格式,稱爲UMS(unified message schema)格式。以下圖所示:

Protocol

數據的類型,被UMS的版本號

schema 1)namespace 由:類型. 數據源名.schema名 .表名.表版本號. 分庫號 .分表號 組成,可以描述全部表。

例如:mysql.db1.schema1.testtable.5.0.0

2)fields是字段名描述。

ums_id_ 消息的惟一id,保證消息是惟一的 ums_ts_ canal捕獲事件的時間戳; ums_op_ 代表數據的類型是I (insert),U (update),B (before Update),D(delete) ums_uid_ 數據流水號,惟一值 3)payload是指具體的數據。

一個json包裏面能夠包含1條至多條數據,提升數據的有效載荷。

心跳監控和預警

RDBMS類系統涉及到數據庫的主備同步,日誌抽取,增量轉換等多個模塊等。

日誌類系統涉及到日誌抽取端,日誌轉換模模塊等。

如何知道系統正在健康工做,數據是否可以實時流轉? 所以對流程的監控和預警就尤其重要。

對於RDBMS類系統

心跳模塊從dbusmgr庫中得到須要監控的表列表,以固定頻率(好比每分鐘)向源端dbus庫的心跳錶插入心跳數據(該數據中帶有發送時間),該心跳錶也做爲增量數據被實時同步出來,而且與被同步表走相同的邏輯和線程(爲了保證順序性,當遇到多併發度時是sharding by table的,心跳數據與table數據走一樣的bolt),這樣當收到心跳數據時,即使沒有任何增刪改的數據,也能證實整條鏈路是通的。

增量轉換模塊和心跳模塊在收到心跳包數據後,就會發送該數據到influxdb中做爲監控數據,經過grafana進行展現。 心跳模塊還會監控延時狀況,根據延時狀況給以報警。

對於日誌類系統

從源端就會自動產生心跳包,相似RDBMS系統,將心跳包經過抽取模塊,和算子轉換模塊同步到末端,由心跳模塊負責監控和預警。

相關文章
相關標籤/搜索