Fink| CEP

 

什麼是復瑣事件CEP?

一個或多個由簡單事件構成的事件流經過必定的規則匹配,而後輸出用戶想獲得的數據,知足規則的復瑣事件。java

特徵:

  • 目標:從有序的簡單事件流中發現一些高階特徵apache

  • 輸入:一個或多個由簡單事件構成的事件流架構

  • 處理:識別簡單事件之間的內在聯繫,多個符合必定規則的簡單事件構成復瑣事件ide

  • 輸出:知足規則的復瑣事件spa

           

CEP架構

            

 

CEP用於分析低延遲、頻繁產生的不一樣來源的事件流。CEP能夠幫助在複雜的、不相關的事件流中找出有意義的模式和複雜的關係,以接近實時或準實時的得到通知並阻止一些行爲。scala

CEP支持在流上進行模式匹配,根據模式的條件不一樣,分爲連續的條件或不連續的條件;模式的條件容許有時間的限制,當在條件範圍內沒有達到知足的條件時,會致使模式匹配超時。設計

CEP的工做流圖:3d

             

看起來很簡單,可是它有不少不一樣的功能:code

  1. 輸入的流數據,儘快產生結果對象

  2. 2個event流上,基於時間進行聚合類的計算

  3. 提供實時/準實時的警告和通知

  4. 多樣的數據源中產生關聯並分析模式

  5. 高吞吐、低延遲的處理

市場上有多種CEP的解決方案,例如Spark、Samza、Beam等,但他們都沒有提供專門的library支持。可是Flink提供了專門的CEP library

Flink CEP

Flink爲CEP提供了專門的Flink CEP library,它包含以下組件:

  1. Event Stream

  2. pattern定義

  3. pattern檢測

  4. 生成Alert

  

 

首先,開發人員要在DataStream流上定義出模式條件,以後Flink CEP引擎進行模式檢測,必要時生成告警。

爲了使用Flink CEP,咱們須要導入依賴:

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-cep_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

Event Streams

咱們首先須要爲Stream Event設計java pojo,可是注意,因爲要對event對象進行對比,因此咱們須要重寫hashCode()方法和equals()方法。下面進行監控溫度事件流。

建立抽象類MonitoringEvent,重寫hashCode()和equals()方法;再建立POJO:TemperatureEvent,一樣重寫hashCode()和equals()方法:

MonitoringEvent:

 TemperatureEvent:

 建立env,建立source:

Pattern API

每一個Pattern都應該包含幾個步驟,或者叫作state。從一個state到另外一個state,一般咱們須要定義一些條件,例以下列的代碼:

DataStream<Event> input = ...

Pattern<Event, ?> pattern = Pattern.begin("start").where(evt -> evt.getId() == 42)
    // next表示"middle"事件緊跟着"start"事件發生
    .next("middle").subtype(SubEvent.class).where(subEvt -> subEvt.getVolume() >= 10.0)
    // followedBy表示"end"事件不必定緊跟着"middle"事件發生
    .followedBy("end").where(evt -> evt.getName().equals("end"));

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

DataStream<Alert> result = patternStream.select(pattern -> {
    return createAlertFrom(pattern);
});

例子:

Flink-CEP:
登陸事件檢測:同一用戶連續兩次登陸失敗,報警; LoginEvent Stream
--> key by -->userid:LoginEvent Stream --> pattern matching -->select(定義報警輸出) -->LoginWarning Stream -->print()
溫度事件檢測:同一機箱連續兩次溫度超標,報警; TemperatureEvent Stream
--> pattern matching(filter>27℃) --> select(定義報警輸出) --> Alert Stream -->print()

 

異常檢測:機箱溫度檢測

需求:同一個機箱連續兩次溫度超標,報警

拓展需求:鍋爐房溫度檢測;信用卡反欺詐:連續大額消費;反做弊:同一個用戶短期內連續登錄失敗 

  • flink cep

  • pattern定義

  • pattern匹配

  • select選出匹配到的事件,報警

public class CEPExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<TemperatureEvent> inputEventStream = env.fromElements( //不是DataStreamSource類型
                new TemperatureEvent("xyz", 22.0),
                new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1),
                new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1),
                new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7),
                new TemperatureEvent("xyz", 27.0));
        // 定義Pattern,檢查10秒鐘內溫度是否高於26度
        Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("start") //加泛型
                .subtype(TemperatureEvent.class)
                .where(new SimpleCondition<TemperatureEvent>() {
                    private static final long serialVersionUID = 1L;
                    @Override
                    public boolean filter(TemperatureEvent value) throws Exception {
                        if (value.getTemperature() >= 26) {
                            return true;
                        }
                        return false;
                    }
                }).within(Time.seconds(10));

        //匹配pattern並select事件,符合條件的發生警告,即輸出
        //Alert類屬性message,表示在知足必定的pattern條件後,須要告警的內容:
        DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern) //DataStream類型的
                .select(new PatternSelectFunction<TemperatureEvent, Alert>() {
                    @Override
                    public Alert select(Map<String, List<TemperatureEvent>> pattern) throws Exception {
                        return new Alert("Temperature Rise Detected: " + pattern.get("start").get(0).getTemperature() +
                                " on machine name: " + pattern.get("start").get(0).getMachineName());
                    }
                });
        patternStream.print();
        env.execute();
    }

}

 

 登陸事件異常檢測:同一個用戶連續兩次登錄失敗,報警

  • flink cep

  • pattern定義

  • pattern匹配

  • select輸出報警事件

//需求: 若是同一個userid在三秒以內連續兩次登錄失敗,報警。
public class FlinkLoginFail {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 這裏mock了事件流,這個事件流通常從Kafka過來
        DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList(  //自定義一個pojo類:userId、ip、type
                new LoginEvent("1", "192.168.0.1", "fail"),
                new LoginEvent("1", "192.168.0.2", "fail"),
                new LoginEvent("1", "192.168.0.3", "fail"),
                new LoginEvent("2", "192.168.10.10", "success")
        ));//不用DataStreamSource,使用DataStream

        Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start")
                //泛型類或泛型接口上的泛型形參是不能用於靜態成員的,那麼當靜態方法須要用到泛型時,只能用泛型方法。
                .where(new IterativeCondition<LoginEvent>() { // 模式開始事件的匹配條件爲事件類型爲fail, 爲迭代條件
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                }).next("next")
                .where(new IterativeCondition<LoginEvent>() { // 事件的匹配條件爲事件類型爲fail
                    @Override
                    public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                        return loginEvent.getType().equals("fail");
                    }
                }).within(Time.seconds(3));// 要求緊鄰的兩個事件發生的時間間隔不能超過3秒鐘

        // 以userid分組,造成keyedStream,而後進行模式匹配   ::方法引用
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);

        DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {
            List<LoginEvent> first = pattern.get("start");
            List<LoginEvent> second = pattern.get("next");
            return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType());
        });//不用SingleOutputStreamOperator類型的,使用

        loginFailDataStream.print();
        env.execute();

    }

}
相關文章
相關標籤/搜索