[譯] Introducing Complex Event Processing (CEP) with Apache Flink

原文連接html

正文

隨着傳感網絡的普及,智能設備持續收集着愈來愈多的數據,分析近乎實時,不斷增加的數據流是一個巨大的挑戰。快速應對變化趨勢、交付最新的 BI 應用會成爲一個公司成敗的關鍵因素。其中關鍵問題就是數據流的事件模型檢測。git

Complex event processing (CEP) 要處理的就是在持續事件中匹配模式的問題。匹配結果一般就是:從輸入事件中提取的復瑣事件。傳統 DBMSs 在固定數據上執行查詢,而 CEP 在存儲的 query 上執行(譯者注:某個範圍)。全部不相關的數據會當即丟棄,因爲 CEP 查詢都是在一個無限的數據流中,這樣的優點顯而易見。更重要的是,輸入實時被處理,系統一旦收到某一個序列的全部數據,結果就會被輸出。CEP 所以有着很是高效的實時分析能力。github

由此,CEP 的處理範式吸引了不少技術人員興趣,有着普遍的應用場景。值得注意的是,CEP 如今用在了金融應用,例如:股票市場趨勢、信用卡欺詐檢測。還有基於 RFID 的追蹤和監控,例如:庫房小偷檢測。CEP 還能夠被用於基於用戶可疑行爲的網絡入侵檢測。正則表達式

Apache Flink 有着天生的真正的流處理能力,具備低延遲、高吞吐量的特性,和 CEP 簡直絕配。所以,Flink 社區在 Flink 1.0 引入了第一個版本的 CEP library。接下來咱們會使用一個數據中心監控的案例介紹其使用。apache

cep-monitoring

假設這樣一個場景:數據中心有不少機架,每個機架都有功率和溫度監控。監控設備會不斷產生功率和溫度事件。基於這些監控事件數據流,咱們想要檢測出可能要過熱的機架,從而調整負載和降溫。小程序

針對這種場景,咱們採起兩階段處理方法。首先,監控溫度事件,當檢測到連續兩個超過閾值的溫度事件,即生成一個當前平均溫度的警告(warning),溫度報警不必定意味着過熱。可是若是看到兩個連續的升溫警告事件,則生成機架過熱報警(alert)。此時,須要採起措施冷卻機架。服務器

首先,定義來源的監控事件流,每個 message 都包含來源 rack ID(機架 ID)。溫度事件包含當前溫度,功率事件包含當前電壓。咱們把事件模型定義爲 POJOs.網絡

public abstract class MonitoringEvent {
        private int rackID;
        ...
    }
    
    public class TemperatureEvent extends MonitoringEvent {
        private double temperature;
        ...
    }
    
    public class PowerEvent extends MonitoringEvent {
        private double voltage;
        ...
    }

咱們可使用 Flink 的 connector(好比:Kafka, RabbitMQ 等),生成 DataStream<MonitoringEvent> inputEventStream 給 Flink 的 CEP 算子提供輸入。首先,咱們須要定義檢測溫度警告的事件模式 (pattern),CEP library 提供了很是直觀的 Pattern API 來定義複雜的模式。spa

每一個模式都包含了一個能夠定義過濾 (filter) 條件的事件序列。模式 (pattern) 的第一個事件一般都命名爲"First Event"。code

Pattern.<MonitoringEvent>begin("First Event");

這句話會匹配每個輸入的監控事件(monitoring event),而咱們只須要溫度大於必定閾值的溫度事件(TemperatureEvents),因此咱們須要添加 subtype 和 where 語句限制。

Pattern.<MonitoringEvent>begin("First Event")
        .subtype(TemperatureEvent.class)
        .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD);

以前說:對於同一個機架,當看到兩個連續的高溫事件(超過閾值)就產生一個溫度報警(TemperatureWarning),Pattern API 提供了 next 調用方法,來添加事件到模式定義中。next 添加的事件發生時間必須緊跟着第一個匹配事件以後,才能觸發整個模式的匹配。

Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .next("Second Event")
    .subtype(TemperatureEvent.class)
    .where(evt -> evt.getTemperature() >= TEMPERATURE_THRESHOLD)
    .within(Time.seconds(10));

最後模式的定義包含有一個 within 的 API 調用,用來定義兩個連續 TemperatureEvents 必須在 10s 內發生才能匹配。時間基於 time characteristic 設置,能夠是:處理時間、輸入時間或者事件時間。(譯者注 Event Time / Processing Time / Ingestion Time 解釋)

定義好事件模型以後,能夠將其應用到輸入數據流中。

PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
        inputEventStream.keyBy("rackID"),
        warningPattern);

因爲告警是針對單個機架的告警,必須使用 keyBy 經過 rackID 字段對輸入事件流分流。即匹配出的事件都是同一個機架的。

PatternStream<MonitoringEvent> 能夠訪問匹配的事件序列。經過使用 select API 能夠訪問其上數據,給 select API 傳入 PatternSelectFunction,PatternSelectFunction 會在每個匹配上的事件序列上執行。事件序列經過 Map<String, MonitoringEvent> 訪問,MonitoringEvent 經過以前分配的事件名稱來定位。這裏咱們經過 select function 針對每個匹配的模式產生一個 TemperatureWarning 事件。

public class TemperatureWarning {
        private int rackID;
        private double averageTemperature;
        ...
    }
    
    DataStream<TemperatureWarning> warnings = tempPatternStream.select(
        (Map<String, MonitoringEvent> pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("First Event");
            TemperatureEvent second = (TemperatureEvent) pattern.get("Second Event");
    
            return new TemperatureWarning(
                first.getRackID(), 
                (first.getTemperature() + second.getTemperature()) / 2);
        }
    );

如今咱們從原始監控事件流(monitoring event stream)生成了一個復瑣事件流 DataStream<TemperatureWarning> 警告。這個復瑣事件流能夠再次被用做其餘復瑣事件處理的輸入。當同一個機架產生兩個連續升溫警告時,咱們使用 TemperatureWarnings 來生成 TemperatureAlerts。TemperatureAlerts 定義以下:

public class TemperatureAlert {
        private int rackID;
        ...
    }

首先定義報警事件

Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("First Event")
        .next("Second Event")
        .within(Time.seconds(20));

定義描述了在 20s 內有兩個 TemperatureWarnings 事件,而且第一個事件名稱爲 "First Event",緊接着的第二個爲 「Second Event」。這來了個事件都沒有 where 語句,由於須要訪問兩個事件才能判斷溫度時候增加。所以,下面咱們須要在 select 語句中使用 filter 條件來提取。這裏咱們只是生成了 PatternStream。

PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
        warnings.keyBy("rackID"),
        alertPattern);

一樣,咱們須要 keyBy 對輸入的告警數據流針對同一個機架進行分流。而後使用 flatSelect 方法訪問匹配的事件序列,當判斷溫度上升時生成 TemperatureAlert 告警。

DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
        (Map<String, TemperatureWarning> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureWarning first = pattern.get("First Event");
            TemperatureWarning second = pattern.get("Second Event");
    
            if (first.getAverageTemperature() < second.getAverageTemperature()) {
                out.collect(new TemperatureAlert(first.getRackID()));
            }
        });

DataStream<TemperatureAlert> 告警是針對同一個機架的數據流,基於這個數據咱們如今能夠調整負載和降溫。源代碼地址(譯者注:注意閱讀 readme)

總結:

本文描述了使用 Flink CEP library 能夠很容易處理事件流。咱們經過數據中心的監控和報警案例,完成了服務器機架過熱報警的小程序。
將來 Flink 社區會持續擴展 CEP library 的功能和表述能力。接下來的 road map 是支持類正則表達式的模式實現,包括 *, 上下限制和否認。此外,還計劃容許 where 語句訪問以前匹配的事件字段。這個特性可讓咱們提早刪除不須要的事件序列。

閱讀材料:

本內容爲譯者添加

相關文章
相關標籤/搜索