導讀:數據總線DBus的整體架構中主要包括六大模塊,分別是:日誌抓取模塊、增量轉換模塊、全量抽取程序、日誌算子處理模塊、心跳監控模塊、Web管理模塊。六大模塊各自的功能相互鏈接,構成DBus的工做原理:經過讀取RDBMS增量日誌的方式來實時獲取增量數據日誌(支持全量拉取);基於Logstash,flume,filebeat等抓取工具來實時得到數據,以可視化的方式對數據進行結構化輸出。本文主要介紹的是DBus中基於可視化配置的日誌結構化轉換實現的部分。html
DBus能夠對接多種log數據源,例如:Logstash、Flume、Filebeat等。上述組件都是業界比較流行的日誌抓取工具,一方面便於用戶和業界統一標準,方便用戶技術方案的整合;另外一方面也避免了無謂的重複造輪子。抓取的數據咱們稱爲原始數據日誌(raw data log),由抓取組件將其寫入Kafka中,等待DBus後續處理。python
用戶可自定義配置日誌源和目標端。同一個日誌源的數據能夠輸出到多個目標端。每一條「日誌源-目標端」線,用戶能夠根據本身的須要來配置相應的過濾規則。通過規則算子處理後的日誌是結構化的,即:有schema約束,相似於數據庫中的表。git
DBus設計了豐富易用的算子,用於對數據進行定製化操做。用戶對數據的處理可分爲多個步驟進行,每一個步驟的數據處理結果可即時查看、驗證;而且可重複使用不一樣算子,直到轉換、裁剪出本身須要的數據。github
將配置好的規則算子組應用到執行引擎中,對目標日誌數據進行預處理,造成結構化數據,輸出到Kafka,供下游數據使用方使用。系統流程圖以下所示:正則表達式
根據DBus log設計原則,同一條原始日誌,能夠被提取到一個或多個表中。每一個表是結構化的,知足相同的schema約束。數據庫
對於任意一條原始數據日誌(raw data log),它應該屬於哪張表呢?segmentfault
假如用戶定義了若干張邏輯表(T1,T2…),用於抽取不一樣類型的日誌,那麼,每條日誌須要與規則算子組進行匹配:數組
例如,對於同一條應用日誌,其可能屬於不止一個規則組或Table,而在咱們定義的規則組或Table中,只要其知足過濾條件,該應用日誌就能夠被規則組提取,即保證了同一條應用日誌能夠同屬於不一樣的規則組或Table。架構
規則算子是對數據進行過濾、加工、轉換的基本單元。常見的規則算子如上圖所示。工具
算子之間具備獨立性,算子之間能夠任意組合使用,從而能夠實現許多複雜的、高級的功能,經過對算子進行迭代使用,最終能夠實現對任意數據進行加工的目的。用戶能夠開發自定義算子,算子的開發很是容易,用戶只要遵循基本接口原則,就能夠開發任意的算子。
以DBus集羣環境爲例,DBus集羣中有兩臺機器(即master-slave)部署了心跳程序,用於監控、統計、預警等,心跳程序會產生一些應用日誌,這些應用日誌中包含各種事件信息,假如咱們想要對這些日誌進行分類處理並結構化到數據庫中,咱們就能夠採用DBus log程序對日誌進行處理。
DBus能夠接入多種數據源(Logstash、Flume、Filebeat等),此處以Logstash爲例來講明如何接入DBus的監控和報警日誌數據。
因爲在dbus-n2和dbus-n3兩臺機器上分別存在監控和預警日誌,爲此咱們分別在兩臺機器上部署了Logstash程序。心跳數據由Logstash自帶的心跳插件產生,其做用是便於DBus對數據進行統計和輸出,以及對源端日誌抽取端(此處爲Logstash)進行預警(對於Flume和Filebeat來講,由於它們沒有心跳插件,因此須要額外爲其定時產生心跳數據)。Logstash程序寫入到Kafka中的數據中既有普通格式的數據,同時也有心跳數據。
這裏不僅是侷限於2臺部署有Logstash程序的機器,DBus對Logstash數量不作限制,好比應用日誌分佈在幾十上百臺機器上,只須要在每臺機器上部署Logstash程序,並將數據統一抽取到同一個Kafka Topic中,DBus就可以對全部主機的數據進行數據處理、監控、預警、統計等。
在啓動Logstash程序後,咱們就能夠從topic : heartbeat_log_logstash中讀取數據,數據樣例以下:
1)心跳數據
2)普通日誌數據
接下來,咱們只須要在DBus Web中配置相應的規則就能夠對數據進行處理了。
首先新建一個邏輯表sink_info_table,該表用來抽取sink事件的日誌信息,而後配置該表的規則組(一個或多個,但全部的規則組過濾後的數據須要知足相同schema特性),heartbeat_log_logstash做爲原始數據topic,咱們能夠實時的對數據進行可視化操做配置(所見即所得,即席驗證)。
1)讀取原始數據日誌
能夠看到由Logstash預先提取已經包含了log4j的基本信息,例如path、@timestamp、level等。可是數據日誌的詳細信息在字段log中。因爲不一樣的數據日誌輸出是不同的,所以能夠看到log列數據是不一樣的。
2)提取感興趣的列
假如咱們對timestamp、log 等原始信息感興趣,那麼能夠添加一個toIndex算子,來提取這些字段:
這裏須要指出,咱們考慮使用數組下標方式,是有緣由的:
所以後續操做所有基於數組下標方式訪問。
執行規則,就能夠看到被提取後的字段狀況:
3)過濾須要的數據
在這個例子中,咱們只對含有「Sink to influxdb OK!」的數據感興趣。所以添加一個filter算子,提取第7列中包含」Sink to influxdb OK!」內容的行數據:
執行後,只有符合條件的日誌行數據纔會存在。
4)對特定列進行提取
添加一個select算子,咱們對第1和3列的內容感興趣,因此對這兩列進行提取。
執行select算子,數據中就會只含有第1和3列了。
5)以正則表達式的方式處理數據
咱們想從第1列的數據中提取符合特定正則表達式的值,使用regexExtract算子對數據進行過濾。正則表達式以下:http_code=(d*).*type=(.*),ds=(.*),schema=(.*),table=(.*)s.*errorCount=(d*),用戶能夠寫自定義的正則表達式。
執行後,就會獲取正則表達式執行後的數據。
6)選擇輸出列
最後咱們把感興趣的列進行輸出,使用saveAs算子, 指定列名和類型,方便於保存在關係型數據庫中。
執行saveAs算子後,這就是處理好的最終輸出數據樣本。
保存上一步配置好的規則組,日誌數據通過DBus執行算子引擎,就能夠生成相應的結構化數據了。目前根據項目實際,DBus輸出的數據是UMS格式,若是不想使用UMS,能夠通過簡單的開發,實現定製化。
注:UMS是DBus定義並使用的、通用的數據交換格式,是標準的JSON。其中同時包含了schema和數據信息。更多UMS介紹請參考DBus開源項目主頁的介紹。開源地址:https://github.com/bridata/dbus
如下是測試案例,輸出的結構化UMS數據的樣例:
爲了便於掌握數據抽取、規則匹配、監控預警等狀況,咱們提供了日誌數據抽取的可視化實時監控界面,以下圖所示,可隨時瞭解如下信息:
監控信息中包含了來自集羣內各臺主機的監控信息,以主機IP(或域名)對數據分別進行監控、統計和預警等。
監控中還有一張表叫作_unkown_table_ 代表全部沒有被匹配上的數據條數。例如:Logstash抓取的日誌中有5種不一樣事件的日誌數據,咱們只捕獲了其中3種事件,其它沒有被匹配上的數據,所有在_unkown_table_計數中。
DBus一樣能夠接入Flume、Filebeat、UMS等數據源,只須要稍做配置,就能夠實現相似於對Logstash數據源一樣的處理效果,更多關於DBus對log的處理說明,請參考:
應用日誌通過DBus處理後,將原始數據日誌轉換爲告終構化數據,輸出到Kafka中提供給下游數據使用方進行使用,好比經過Wormhole將數據落入數據庫等。具體如何將DBus與Wormhole結合起來使用,請參考:如何設計實時數據平臺(技術篇)。
做者:仲振林