研發團隊,研發規模發展到必定階段,各類平臺化,中臺化的方案就走上了日程。見多了業務架構的平臺化方案,今天咱們來拆解下數據總線平臺的架構。mysql
數據平臺的數據源主要來自於兩個渠道:nginx
先看一張通用的數據總線平臺架構圖:正則表達式
關係數據庫源數據採集,通常採用模擬mysql的slave方式接收binlog信息以實現數據抽取,同時須要對日誌信息進行信息轉換,轉換後數據入kafka進行平滑流控傳輸,下游消費者進行數據消費,寫入數據管理平臺。redis
日誌數據來自於各類中間件數據,好比redis日誌,nginx日誌,rpc日誌,es日誌,文件系統日誌等,經過filebeat或者socket方式到服務器節點agent,經過agent採集並統一發往kafka系統,以後寫入數據管理平臺。sql
採集流程分爲三個部分:數據庫
日誌抽取模塊由兩部分組成:json
mysql主備是經過binlog實現的。binlog同步有三種模式:服務器
通常採用Row模式進行復制,能夠讀取全量日誌。架構
部署上能夠採用2個master(vip)+1個slave+1個backup做爲容災,讀取binlog日誌從slave讀取。併發
binlog採集工具比較多,有dbus和阿里的canal均可以進行增量數據讀取。
日誌抽取模塊將目標數據從canal server讀取,放到kafka中。
能夠基於zk的canal server高可用模式,不出現單點問題,日誌抽取模塊能夠用storm程序,一樣作好高可用。
增量日誌抽取流程以下:
分發模塊:
轉換模塊:
全量拉取:
全量拉取借鑑sqoop思想,整個全量過程分爲兩個部分:
數據分片
分片獲取max,min,count等信息,根據片大小計算分片數,生成分片信息保存到split topic中。
關係數據採用主鍵索引進行分片,高效,且主鍵和數據存儲順序一致。
實際拉取
每一個分片表明一個小任務,由拉取轉換模塊經過多個併發度到方式鏈接slave從庫拉取,完成狀況彙報到zk中,便於監控。
因爲全量拉取對於源數據庫有必定的壓力,作法以下:
全量拉取不常常發生,通常作初始化拉取一次,某種狀況下須要全量時能夠觸發一次。
一致性處理
爲保證日誌消息順序性,kafka咱們使用一個partition方式,基本上順序的和惟一的。若是出現寫kafka異步寫入失敗,storm有重作機制,所以並非嚴格保證exactly once和徹底順序性,保證的是at least once。
所以ums_id_變得尤其重要。 對於全量抽取,ums_id是一個值,該值爲全量拉取event的ums_id號,表示該批次的全部數據是一批的,由於數據都是不一樣的能夠共享一個ums_id_號。ums_uid_流水號從zk中生成,保證了數據的惟一性。 對於增量抽取,咱們使用的是 mysql的日誌文件號 + 日誌偏移量做爲惟一id。Id做爲64位的long整數,高6位用於日誌文件號,低13位做爲日誌偏移量。 例如:000103000012345678。 103 是日誌文件號,12345678 是日誌偏移量。 這樣,從日誌層面保證了物理惟一性(即使重作也這個id號也不變),同時也保證了順序性(還能定位日誌)。經過比較ums_id_就能知道哪條消息更新。
ums_ts_的價值在於從時間維度上能夠準確知道event發生的時間。好比:若是想獲得一個某時刻的快照數據。能夠經過ums_ts 來知道截斷時間點。
業界日誌收集、結構化、分析工具方案不少,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,各有所長。
在結構化日誌這個方面,大多采用配置正則表達式模板:用於提取日誌中模式比較固定、通用的部分,例如日誌時間、日誌類型、行號等。對於真正的和業務比較相關的信息,這邊部分是最重要的,稱爲message部分,咱們但願使用可視化的方式來進行結構化。
log4j的日誌以下:
若是想將上述日誌轉換成結構化數據:
DBUS設計的數據日誌同步方案以下:
日誌抓取端採用業界流行的組件(例如Logstash、Flume、Filebeat等)。一方面便於用戶和業界統一標準,方便用戶的整合;另外一方面也避免無謂的重造輪子。抓取數據稱爲原始數據日誌(raw data log)放進Kafka中,等待處理。
提供可視化界面,配置規則來結構化日誌。用戶可配置日誌來源和目標。同一個日誌來源能夠輸出到多個目標。每一條「日誌源-目標」線,中間數據通過的規則處理用戶根據本身的需求來自由定義。最終輸出的數據是結構化的,即:有schema約束,能夠理解爲相似數據庫中的表。
所謂規則,在DBUS中,即「規則算子」。DBUS設計了豐富易用的過濾、拆分、合併、替換等算子供用戶使用。用戶對數據的處理可分多個步驟進行,每一個步驟的數據處理結果可即時查看、驗證;可重複使用不一樣算子,直到轉換、裁剪獲得本身須要的數據。
將配置好的規則算子組運用到執行引擎中,對目標日誌數據進行預處理,造成結構化數據,輸出到Kafka,供下游數據使用方使用。
流程以下:
根據配置,咱們支持同一條原始日誌,能提取爲一個表數據,或者能夠提取爲多個表數據。
每一個表是結構化的,知足相同的schema。
每一個表是一個規則 算子組的合集,能夠配置1個到多個規則算子組 每一個規則算子組,由一組規則算子組合而成 拿到一條原始數據日誌, 它最終應該屬於哪張表呢?
每條日誌須要與規則算子組進行匹配:
符合條件的進入規則算子組的,最終被規則組轉換爲結構化的表數據。 不符合的嘗試下一個規則算子組。 都不符合的,進入unknown_table表。
不管是增量、全量仍是日誌,最終輸出到結果kafka中的消息都是咱們約定的統一消息格式,稱爲UMS(unified message schema)格式。以下圖所示:
Protocol
數據的類型,被UMS的版本號
schema 1)namespace 由:類型. 數據源名.schema名 .表名.表版本號. 分庫號 .分表號 組成,可以描述全部表。
例如:mysql.db1.schema1.testtable.5.0.0
2)fields是字段名描述。
ums_id_ 消息的惟一id,保證消息是惟一的 ums_ts_ canal捕獲事件的時間戳; ums_op_ 代表數據的類型是I (insert),U (update),B (before Update),D(delete) ums_uid_ 數據流水號,惟一值 3)payload是指具體的數據。
一個json包裏面能夠包含1條至多條數據,提升數據的有效載荷。
RDBMS類系統涉及到數據庫的主備同步,日誌抽取,增量轉換等多個模塊等。
日誌類系統涉及到日誌抽取端,日誌轉換模模塊等。
如何知道系統正在健康工做,數據是否可以實時流轉? 所以對流程的監控和預警就尤其重要。
對於RDBMS類系統
心跳模塊從dbusmgr庫中得到須要監控的表列表,以固定頻率(好比每分鐘)向源端dbus庫的心跳錶插入心跳數據(該數據中帶有發送時間),該心跳錶也做爲增量數據被實時同步出來,而且與被同步表走相同的邏輯和線程(爲了保證順序性,當遇到多併發度時是sharding by table的,心跳數據與table數據走一樣的bolt),這樣當收到心跳數據時,即使沒有任何增刪改的數據,也能證實整條鏈路是通的。
增量轉換模塊和心跳模塊在收到心跳包數據後,就會發送該數據到influxdb中做爲監控數據,經過grafana進行展現。 心跳模塊還會監控延時狀況,根據延時狀況給以報警。
對於日誌類系統
從源端就會自動產生心跳包,相似RDBMS系統,將心跳包經過抽取模塊,和算子轉換模塊同步到末端,由心跳模塊負責監控和預警。