拓展閱讀:mysql
快速部署DBus體驗實時數據流計算github
Dbus所支持兩類數據源的實現原理與架構拆解。正則表達式
大致來講,Dbus支持兩類數據源:sql
以mysql爲例子. 分爲三個部分:數據庫
mysql 日誌抽取模塊由兩部分構成:json
咱們知道,雖然mysql innodb有本身的log,mysql主備同步是經過binlog來實現的。而binlog同步有三種模式:Row 模式,Statement 模式,Mixed模式。由於statement模式有各類限制,一般生產環境都使用row模式進行復制,使得讀取全量日誌成爲可能。架構
一般咱們的mysql佈局是採用 2個master主庫(vip)+ 1個slave從庫 + 1個backup容災庫 的解決方案,因爲容災庫一般是用於異地容災,實時性不高也不便於部署。併發
爲了最小化對源端產生影響,咱們讀取binlog日誌從slave從庫讀取。oracle
讀取binlog的方案比較多,DBus也是站在巨人的肩膀上,對於Mysql數據源使用阿里巴巴開源的Canal來讀取增量日誌。這樣作的好處是:
關於Canal的介紹可參考:https://github.com/alibaba/canal/wiki/Introduction 因爲canal用戶抽取權限比較高,通常canal server節點也能夠由DBA組來維護。
日誌抽取模塊的主要目標是將數據從canal server中讀出,儘快落地到第一級kafka中,避免數據丟失(畢竟長時間不讀日誌數據,可能日誌會滾到好久之前,可能會被DBA刪除),所以須要避免作過多的事情,主要就作一下數據拆包工做防止數據包過大。
從高可用角度考慮,在使用Canal抽取過程當中,採用的基於zookeeper的Canal server高可用模式,不存在單點問題,日誌抽取模塊extractor也使用storm程序,一樣也是高可用架構。
不一樣數據源有不一樣的日誌抽取方式,好比oracle,mongo等都有相應的日誌抽取程序。
DBus日誌抽取模塊獨立出來是爲了兼容這些不一樣數據源的不一樣實現方式。
增量數據處理模塊,根據不一樣的數據源類型的格式進行轉換和處理。
1)分發模塊dispatcher
2)轉換模塊appender
全量拉取可用於初始化加載(Initial load), 數據從新加載,實現上咱們借鑑了sqoop的思想。將全量過程分爲了2 個部分:
1)數據分片
分片讀取max,min,count等信息,根據片大小計算分片數,生成分片信息保存在split topic中。下面是具體的分片策略:
以實際的經驗,對於mysql InnDB,只有使用主鍵索引進行分片,才能高效。由於mysql innDB的主鍵列與數據存儲順序一致。
2)實際拉取
每一個分片表明一個小任務,由拉取轉換模塊經過多個併發度的方式鏈接slave從庫進行拉取。 拉取完成狀況寫到zookeeper中,便於監控。
全量拉取對源端數據庫是有必定壓力的,咱們作法是:
全量拉取不是常常發生的,通常作初始化拉取一次,或者在某種狀況下須要全量時能夠觸發一次。
在整個數據傳輸中,爲了儘可能的保證日誌消息的順序性,kafka咱們使用的是1個partition的方式。在通常狀況下,基本上是順序的和惟一的。 但若是出現寫kafka異步寫入部分失敗, storm也用重作機制,所以,咱們並不嚴格保證exactly once和徹底的順序性,但保證的是at least once。
所以ums_id_變得尤其重要。 對於全量抽取,ums_id是一個值,該值爲全量拉取event的ums_id號,表示該批次的全部數據是一批的,由於數據都是不一樣的能夠共享一個ums_id_號。ums_uid_流水號從zk中生成,保證了數據的惟一性。 對於增量抽取,咱們使用的是 mysql的日誌文件號 + 日誌偏移量做爲惟一id。Id做爲64位的long整數,高6位用於日誌文件號,低13位做爲日誌偏移量。 例如:000103000012345678。 103 是日誌文件號,12345678 是日誌偏移量。 這樣,從日誌層面保證了物理惟一性(即使重作也這個id號也不變),同時也保證了順序性(還能定位日誌)。經過比較ums_id_就能知道哪條消息更新。
ums_ts_的價值在於從時間維度上能夠準確知道event發生的時間。好比:若是想獲得一個某時刻的快照數據。能夠經過ums_ts 來知道截斷時間點。
業界日誌收集、結構化、分析工具方案不少,例如:Logstash、Filebeat、Flume、Fluentd、Chukwa. scribe、Splunk等,各有所長。在結構化日誌這個方面,大多采用配置正則表達式模板:用於提取日誌中模式比較固定、通用的部分,例如日誌時間、日誌類型、行號等。對於真正的和業務比較相關的信息,這邊部分是最重要的,稱爲message部分,咱們但願使用可視化的方式來進行結構化。
例如:對於下面所示的類log4j的日誌:
若是用戶想將上述數據轉換爲以下的結構化數據信息:
咱們稱這樣的日誌爲「數據日誌」
DBUS設計的數據日誌同步方案以下:
系統流程圖以下所示:
根據配置,咱們支持同一條原始日誌,能提取爲一個表數據,或者能夠提取爲多個表數據。
每一個表是結構化的,知足相同的schema。
拿到一條原始數據日誌, 它最終應該屬於哪張表呢?
每條日誌須要與規則算子組進行匹配:
規則算子是對數據進行過濾、加工、轉換的基本單元。常見的規則算子以下:
算子之間是獨立的,經過組合不一樣的算子達到更復雜的功能,對算子進行迭代使用最終達到對任意數據進行加工的目的。
咱們試圖使得算子儘可能知足正交性或易用性(雖然正則表達式很強大,但咱們仍然開發一些簡單算子例如trim算子來完成簡單功能,以知足易用性)。
不管是增量、全量仍是日誌,最終輸出到結果kafka中的消息都是咱們約定的統一消息格式,稱爲UMS(unified message schema)格式。以下圖所示:
數據的類型,被UMS的版本號
1)namespace 由:類型. 數據源名.schema名 .表名.表版本號. 分庫號 .分表號 組成,可以描述全部表。
例如:mysql.db1.schema1.testtable.5.0.0
2)fields是字段名描述。
3)payload是指具體的數據。
一個json包裏面能夠包含1條至多條數據,提升數據的有效載荷。
RDBMS類系統涉及到數據庫的主備同步,日誌抽取,增量轉換等多個模塊等。
日誌類系統涉及到日誌抽取端,日誌轉換模模塊等。
如何知道系統正在健康工做,數據是否可以實時流轉? 所以對流程的監控和預警就尤其重要。
心跳模塊從dbusmgr庫中得到須要監控的表列表,以固定頻率(好比每分鐘)向源端dbus庫的心跳錶插入心跳數據(該數據中帶有發送時間),該心跳錶也做爲增量數據被實時同步出來,而且與被同步表走相同的邏輯和線程(爲了保證順序性,當遇到多併發度時是sharding by table的,心跳數據與table數據走一樣的bolt),這樣當收到心跳數據時,即使沒有任何增刪改的數據,也能證實整條鏈路是通的。
增量轉換模塊和心跳模塊在收到心跳包數據後,就會發送該數據到influxdb中做爲監控數據,經過grafana進行展現。 心跳模塊還會監控延時狀況,根據延時狀況給以報警。
從源端就會自動產生心跳包,相似RDBMS系統,將心跳包經過抽取模塊,和算子轉換模塊同步到末端,由心跳模塊負責監控和預警。
來源:宜信技術學院