【翻譯】FlinkCEP-Flink的復瑣事件處理

本文翻譯自官網:FlinkCEP - Complex event processing for Flink  html

FlinkCEP是在Flink之上實現的復瑣事件處理(CEP)庫。 它使您能夠檢測無窮無盡的事件流中的事件模式,從而有機會掌握數據中的重要信息。java

本頁描述Flink CEP中可用的API調用。 咱們首先介紹模式API,該API容許您指定要在流中檢測的模式,而後介紹如何檢測和處理匹配的事件序列。 而後,咱們介紹CEP庫在處理事件時間的延遲時所作的假設,以及如何將做業從較舊的Flink版本遷移到Flink-1.3。 apache

入門

若是您想直接使用,請設置一個Flink程序,並將FlinkCEP依賴項添加到您項目的pom.xml中。c#

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-cep-scala_2.11</artifactId>
  <version>1.9.0</version>
</dependency>

Info: FlinkCEP不是二進制分發包的一部分。 在此處查看如何與它連接以執行集羣。api

如今,您能夠開始使用Pattern API編寫第一個CEP程序。markdown

 注意:您要向其應用模式匹配的DataStream中的事件必須實現適當的equals()和hashCode()方法,由於FlinkCEP使用它們來比較和匹配事件。ide

val input: DataStream[Event] = ...

val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
  .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0)
  .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(input, pattern)

val result: DataStream[Alert] = patternStream.process(
    new PatternProcessFunction[Event, Alert]() {
        override def processMatch(
              `match`: util.Map[String, util.List[Event]],
              ctx: PatternProcessFunction.Context,
              out: Collector[Alert]): Unit = {
            out.collect(createAlertFrom(pattern))
        }
    })

模式API

模式API容許您定義要從輸入流中提取的複雜模式序列。函數

每一個複雜模式序列都包含多個簡單模式,即尋找具備相同屬性的單個事件的模式。 從如今開始,咱們將稱這些簡單模式爲模式,以及咱們在流中搜索的最終複雜模式序列,即模式序列。 您能夠將模式序列視爲此類模式的圖,其中根據用戶指定的條件(例如,從一個模式到下一個模式)的轉換。 event.getName()。equals(「 end」)。 匹配是一系列輸入事件,經過一系列有效的模式轉換來訪問複雜模式圖的全部模式。oop

注意:每一個模式都必須具備惟一的名稱,稍後您將使用該名稱來標識匹配的事件。 ui

注意:模式名稱不能包含字符":" 

在本節的其他部分,咱們將首先描述如何定義單個模式,而後如何將單個模式組合爲複雜模式

單個模式

 模式能夠是單例或循環模式。 單例模式接受一個事件,而循環模式能夠接受多個事件。 在模式匹配符號中,模式「 a b + c?d」(或「 a」,後跟一個或多個「 b」,可選地後跟「 c」,後跟「 d」),a,c ?和d是單例模式,而b +是循環的模式。 默認狀況下,模式是單例模式,您可使用「量詞」將其轉換爲循環模式。 每一個模式均可以具備一個或多個條件,基於該條件能夠接受事件。

量詞

 在FlinkCEP中,您可使用如下方法指定循環模式:pattern.oneOrMore(),用於指望一個或多個給定事件發生的模式(例如,前面提到的b +); 和pattern.times(#ofTimes),用於指望在給定類型的事件中出現特定次數的模式,例如 4次;pattern.times(#fromTimes,#toTimes)用於指望在給定類型的事件中出現特定的最小發生次數和最大發生次數的模式,例如 2-4

您可使用pattern.greedy()方法使循環模式變得貪婪(注:儘量的多匹配),但仍沒法使組模式變得貪婪。 您可使用pattern.optional()方法將全部模式(是否循環)設爲可選。

對於名爲start的模式,如下是有效的量詞:

 // expecting 4 occurrences
 start.times(4)

 // expecting 0 or 4 occurrences
 start.times(4).optional()

 // expecting 2, 3 or 4 occurrences
 start.times(2, 4)

 // expecting 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).greedy()

 // expecting 0, 2, 3 or 4 occurrences
 start.times(2, 4).optional()

 // expecting 0, 2, 3 or 4 occurrences and repeating as many as possible
 start.times(2, 4).optional().greedy()

 // expecting 1 or more occurrences
 start.oneOrMore()

 // expecting 1 or more occurrences and repeating as many as possible
 start.oneOrMore().greedy()

 // expecting 0 or more occurrences
 start.oneOrMore().optional()

 // expecting 0 or more occurrences and repeating as many as possible
 start.oneOrMore().optional().greedy()

 // expecting 2 or more occurrences
 start.timesOrMore(2)

 // expecting 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).greedy()

 // expecting 0, 2 or more occurrences
 start.timesOrMore(2).optional()

 // expecting 0, 2 or more occurrences and repeating as many as possible
 start.timesOrMore(2).optional().greedy()

條件

 對於每種模式,您均可以指定一個傳入事件必須知足的條件才能被「接受」到模式中,例如 其值應大於5,或大於先前接受的事件的平均值。 您能夠經過pattern.where(),pattern.or()或pattern.until()方法在事件屬性上指定條件。 這些能夠是IterativeConditions或SimpleConditions。

迭代條件:這是最通用的條件類型。 這樣,您能夠根據先前接受的事件的屬性或部分事件的統計信息來指定接受後續事件的條件。

如下是一個迭代條件的代碼,若是名稱以「 foo」開頭,而且該模式先前接受的事件的價格加上當前價格不超過5.0 ,則接受下一個事件 爲「中間」的模式的事件。 迭代條件可能很強大,尤爲是與循環模式(例如循環模式)結合使用時 一個或多個()。

middle.oneOrMore()
    .subtype(classOf[SubEvent])
    .where(
        (value, ctx) => {
            lazy val sum = ctx.getEventsForPattern("middle").map(_.getPrice).sum
            value.getName.startsWith("foo") && sum + value.getPrice < 5.0
        }
    )

注意:調用ctx.getEventsForPattern(...)會找到給定潛在匹配項的全部先前接受的事件。 該操做的成本可能會有所不一樣,所以在實施時,請儘可能減小其使用。

所描述的上下文也使事件時間特性具備一種訪問方式。 有關更多信息,請參見時間上下文

簡單條件:這種類型的條件擴展了前面提到的IterativeCondition類,並僅基於事件自己的屬性來決定是否接受事件。

start.where(event => event.getName.startsWith("foo"))

最後,您還能夠經過pattern.subtype(subClass)方法將接受事件的類型限制爲初始事件類型的子類型(此處爲Event)。

start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)

組合條件:如上所示,您能夠將子類型條件與其餘條件組合。 這適用於全部條件。 您能夠經過順序調用where()任意組合條件。 最終結果將是各個條件結果的邏輯與。

要使用OR合併條件,可使用or()方法,以下所示。

pattern.where(event => ... /* some condition */).or(event => ... /* or condition */)

中止條件:若是是循環模式(oneOrMore()和oneOrMore()。optional()),您還能夠指定中止條件,例如 接受值大於5的事件,直到值的總和小於50。

爲了更好地理解它,請看如下示例。

  • "(a+ until b)"(一個或多個"a"直到"b")的模式

  • 一系列傳入事件 "a1" "c" "a2" "b" "a3"

  • 該庫將輸出結果:{a1 a2} {a1} {a2} {a3}

如您所見{a1 a2 a3}{a2 a3}因爲中止條件而未返回。

Pattern Operation Description
where(condition)

定義當前模式的條件。 爲了匹配模式,事件必須知足條件。 多個連續的where()子句致使其條件與:

pattern.where(event => ... /* some condition */)
or(condition)

添加與現有條件進行「或」運算的新條件。 一個事件只有經過至少一個條件才能匹配模式:

pattern.where(event => ... /* some condition */) .or(event => ... /* alternative condition */)
until(condition)

指定循環模式的中止條件。 意味着若是發生符合給定條件的事件,則模式中將再也不接受任何事件.

僅與oneOrMore()結合使用

注意:它容許在基於事件的狀況下爲相應的模式清除狀態.

pattern.oneOrMore().until(event => ... /* some condition */)
subtype(subClass)

定義當前模式的子類型條件。 若是事件屬於此子類型,則該事件只能與該模式匹配:

pattern.subtype(classOf[SubEvent])
oneOrMore()

指定此模式指望至少發生一次匹配事件.

默認狀況下,使用寬鬆的內部連續性(在後續事件之間)。 有關內部連續性的更多信息,請參見連續.

注意:建議使用until()或within()來啓用清除狀態

pattern.oneOrMore()
timesOrMore(#times)

指定此模式指望至少出現 #times 次匹配事件.

默認狀況下,使用寬鬆的內部連續性(在後續事件之間)。 有關內部連續性的更多信息,請參見連續.

pattern.timesOrMore(2)
times(#ofTimes)

指定此模式指望發生匹配事件的確切次數.

默認狀況下,使用寬鬆的內部連續性(在後續事件之間)。 有關內部連續性的更多信息,請參見連續.

pattern.times(2)
times(#fromTimes, #toTimes)

指定此模式指望匹配事件的#fromTimes和#toTimes之間發生.

默認狀況下,使用寬鬆的內部連續性(在後續事件之間)。 有關內部連續性的更多信息,請參見連續.

pattern.times(2, 4)
optional()

指定此模式是可選的,即它可能根本不會發生。 這適用於全部上述量詞.

pattern.oneOrMore().optional()
greedy()

指定此模式爲貪婪模式,即將重複儘量多的匹配。 這僅適用於量詞,目前不支持分組模式.

pattern.oneOrMore().greedy()

 

組合模式

既然您已經瞭解了單個模式的描述,那麼如今該看看如何將它們組合成完整的模式序列了。

模式序列必須從初始模式開始,以下所示:

val start : Pattern[Event, _] = Pattern.begin("start")

接下來,能夠經過在模式序列之間指定所需的連續性條件將更多模式附加到模式序列中FlinkCEP支持如下事件之間的連續性形式:

  1. 嚴格連續性:指望全部匹配事件嚴格地一個接一個地出現,而中間沒有任何不匹配事件。

  2. 寬鬆連續性:忽略在匹配事件之間出現的不匹配事件。

  3. 非肯定性寬鬆連續性:進一步的寬鬆連續性,容許其餘匹配忽略某些匹配事件。

要在連續模式之間應用它們,可使用:

  1. next()嚴格的
  2. followedBy()爲寬鬆的
  3. followedByAny(),用於不肯定的寬鬆鄰接

  1. notNext(),若是您不但願某個事件類型直接跟隨另外一個事件
  2. notFollowedBy(),若是您不但願事件類型介於其餘兩個事件類型之間。

注意:模式序列不能以 notFollowedBy() 結尾

注意:NOT模式不能在可選模式以前

// strict contiguity
val strict: Pattern[Event, _] = start.next("middle").where(...)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)

// NOT pattern with strict contiguity
val strictNot: Pattern[Event, _] = start.notNext("not").where(...)

// NOT pattern with relaxed contiguity
val relaxedNot: Pattern[Event, _] = start.notFollowedBy("not").where(...)

寬鬆連續性意味着僅將匹配第一個後續匹配事件,而對於不肯定的寬鬆連續性,將針對同一開始發出多個匹配項。例如,"a b"給定事件序列的pattern "a", "c", "b1", "b2"將給出如下結果:

  1. "a"和"b"之間的嚴格鄰接:{}(不匹配),「 a」以後的「 c」會致使「 a」被丟棄

  2. "a""b"之間寬鬆連續性{a b1}做爲輕鬆的連續性被視爲「跳過不匹配的事件,直到下一個匹配的一個」。

  3. 「 a」和「 b」之間的不肯定肯定的寬鬆連續性:{a b1},{a b2},由於這是最通用的形式

也能夠定義時間約束以使模式有效。例如,您能夠經過pattern.within()方法定義一個模式應該在10秒內發生處理和事件時間都支持時間模式

注意:模式序列只能具備一個時間約束。若是在不一樣的單獨模式上定義了多個這樣的約束,那麼將應用最小的約束。

next.within(Time.seconds(10))

循環模式內的連續性

您能夠在循環模式中應用與上一節中討論的相同的鄰接條件。 連續性將應用於接受到這種模式的元素之間。 爲了舉例說明上述內容,請使用輸入「 a」的模式序列「 a b + c」(「 a」,而後是一個或多個「 b」的任意(非肯定性寬鬆)序列,後跟「 c」) 「,」 b1「,」 d1「,」 b2「,」 d2「,」 b3「,」 c「將具備如下結果:

  1. 嚴格連續性:{a b3 c} –在「 b1」以後的「 d1」致使「 b1」被丟棄,因爲「 d2」,對於「 b2」也同樣

  2. 寬鬆的連續性:{a b1 c},{a b1 b2 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c}-忽略「 d」

  3. 非肯定性寬鬆的連續性:{a b1 c},{a b1 b2 c},{a b1 b3 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c}-請注意{ a b1 b3 c},這是「 b」之間的鬆弛連續性的結果

對於循環模式(例如oneOrMore()times()),默認設置爲寬鬆連續性若是要嚴格鄰接,則必須使用consecutive()調用顯式指定它,若是要 非肯定性寬鬆鄰接,則可使用該allowCombinations()調用。

Pattern Operation Description
consecutive()

與oneOrMore()和times()結合使用,並在匹配事件之間施加嚴格的連續性,

即任何不匹配的元素都會破壞匹配(如next()所示).

若是未應用,則使用寬鬆的連續性(如followBy()所示).

E.g. a pattern like:

Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().consecutive() .followedBy("end1").where(_.getName().equals("b"))

將爲輸入序列生成如下匹配項:C D A1 A2 A3 D A4 B

連續應用: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}

沒有連續應用: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

allowCombinations()

與oneOrMore()和times()結合使用,並在匹配事件之間施加不肯定的寬鬆連續性(如followByAny()中所示).

若是未應用,則使用寬鬆的連續性(如followBy()所示).

E.g. a pattern like:

Pattern.begin("start").where(_.getName().equals("c")) .followedBy("middle").where(_.getName().equals("a")) .oneOrMore().allowCombinations() .followedBy("end1").where(_.getName().equals("b"))

將爲輸入序列生成如下匹配項: C D A1 A2 A3 D A4 B

啓用組合: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B},

{C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}

沒有啓用組合: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}

模式組

 也能夠將模式序列定義爲begin,followedBy,followedByAny和next的條件。 模式序列在邏輯上將被視爲匹配條件,而且將返回GroupPattern,而且能夠應用oneOrMore(),times(#ofTimes),times(#fromTimes,#toTimes),optional(),continuous(), allowCombinations()到GroupPattern。

val start: Pattern[Event, _] = Pattern.begin(
    Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)

// strict contiguity
val strict: Pattern[Event, _] = start.next(
    Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)

// relaxed contiguity
val relaxed: Pattern[Event, _] = start.followedBy(
    Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()

// non-deterministic relaxed contiguity
val nonDetermin: Pattern[Event, _] = start.followedByAny(
    Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()

 

Pattern Operation Description
begin(#name)

定義開始模式:

val start = Pattern.begin[Event]("start")
begin(#pattern_sequence)

定義開始模式:

val start = Pattern.begin( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) )
next(#name)

追加一個新模式。 匹配事件必須直接接在先前的匹配事件以後(嚴格連續):

val next = start.next("middle")
next(#pattern_sequence)

追加一個新模式。 一系列匹配事件必須直接接續先前的匹配事件(嚴格連續):

val next = start.next( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) )
followedBy(#name)

追加一個新模式。 匹配事件和上一個匹配事件之間可能會發生其餘事件(寬鬆的連續性):

val followedBy = start.followedBy("middle")
followedBy(#pattern_sequence)

追加一個新模式。 在一系列匹配事件和上一個匹配事件之間可能會發生其餘事件(寬鬆的連續性):

val followedBy = start.followedBy( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) )
followedByAny(#name)

追加一個新模式。 在匹配事件和先前的匹配事件之間可能會發生其餘事件,

而且將爲每一個替代匹配事件顯示替代匹配(不肯定性寬鬆鄰接):

val followedByAny = start.followedByAny("middle")
followedByAny(#pattern_sequence)

追加一個新模式。 在匹配事件序列和先前的匹配事件之間可能會發生其餘事件,

而且將爲匹配事件的每一個替代序列(非肯定性寬鬆連續性)提供替代匹配:

val followedByAny = start.followedByAny( Pattern.begin[Event]("start").where(...).followedBy("middle").where(...) )
notNext()

附加新的否認模式。 匹配(否認)事件必須直接繼承先前的匹配事件(嚴格連續性)才能放棄部分匹配:

val notNext = start.notNext("not")
notFollowedBy()

附加新的否認模式。 即便在匹配(負)事件和上一個匹配事件(寬鬆的鄰接)之間發生其餘事件,部分匹配事件序列也將被丟棄:

val notFollowedBy = start.notFollowedBy("not")
within(time)

定義事件序列與模式匹配的最大時間間隔。 若是未完成的事件序列超過此時間,則將其丟棄:

pattern.within(Time.seconds(10))

匹配後跳過策略 

對於給定的模式,能夠將同一事件分配給多個成功的匹配。 要控制將事件分配給多少個匹配項,您須要指定一個名爲AfterMatchSkipStrategy的跳過策略。 跳過策略有五種類型,列出以下:

  • NO_SKIP:將發出全部可能的匹配項。
  • SKIP_TO_NEXT:丟棄以同一事件開始的全部部分匹配,發出的匹配從事件開始。
  • SKIP_PAST_LAST_EVENT:丟棄匹配開始後但結束以前,開始的全部匹配。
  • SKIP_TO_FIRST:丟棄在匹配開始以後但在PatternName的第一個事件發生以前開始的全部匹配
  • SKIP_TO_LAST:丟棄在匹配開始以後但在PatternName的最後一個事件發生以前開始的全部匹配

請注意,在使用SKIP_TO_FIRST和SKIP_TO_LAST跳過策略時,還應指定有效的PatternName。

例如,對於給定的模式b + c和數據流b1 b2 b3 c,這四種跳過策略之間的差別以下:

 

Skip Strategy Result Description
NO_SKIP b1 b2 b3 c
b2 b3 c
b3 c
找到匹配的b1 b2 b3 c後,匹配過程將不會丟棄任何結果。
SKIP_TO_NEXT b1 b2 b3 c
b2 b3 c
b3 c

找到匹配的b1 b2 b3 c以後,匹配過程將不會丟棄任何結果,

由於沒有其餘匹配能夠從b1開始.

SKIP_PAST_LAST_EVENT b1 b2 b3 c 找到匹配的b1 b2 b3 c後,匹配過程將丟棄全部開始的部分匹配.
SKIP_TO_FIRST[b] b1 b2 b3 c
b2 b3 c
b3 c

找到匹配的b1 b2 b3 c以後,匹配過程將嘗試丟棄b1以前開始的全部部分匹配,

但沒有此類匹配。

所以,什麼都不會被丟棄.

SKIP_TO_LAST[b] b1 b2 b3 c
b3 c

找到匹配的b1 b2 b3 c以後,匹配過程將嘗試丟棄b3以前開始的全部部分匹配。

有這樣的一次匹配 b2 b3 c

 再看看另外一個示例,以更好地瞭解NO_SKIP和SKIP_TO_FIRST之間的區別:模式:(a | b | c)(b | c)c + .greedy d和序列:a b c1 c2 c3 d而後結果將是:

 

Skip Strategy Result Description
NO_SKIP a b c1 c2 c3 d
b c1 c2 c3 d
c1 c2 c3 d
找到匹配a b c1 c2 c3 d後,匹配過程將不會丟棄任何結果.
SKIP_TO_FIRST[c*] a b c1 c2 c3 d
c1 c2 c3 d

在找到與b c1 c2 c3 d匹配以後,匹配過程將丟棄在c1以前開始的全部部分匹配。

有一個這樣的匹配b c1 c2 c3 d.

 爲了更好地理解NO_SKIP和SKIP_TO_NEXT之間的區別,請看如下示例:模式:a b +和序列:a b1 b2 b3而後結果將是:

 

Skip Strategy Result Description
NO_SKIP a b1
a b1 b2
a b1 b2 b3
找到匹配的b1以後,匹配過程將不會丟棄任何結果.
SKIP_TO_NEXT a b1

找到匹配的b1以後,匹配過程將丟棄全部從a開始的部分匹配。

這意味着既不能生成b1 b2也不能生成b1 b2 b3.

 要指定要使用的跳過策略,只需經過調用如下內容來建立AfterMatchSkipStrategy:

Function Description
AfterMatchSkipStrategy.noSkip() Create a NO_SKIP skip strategy
AfterMatchSkipStrategy.skipToNext() Create a SKIP_TO_NEXT skip strategy
AfterMatchSkipStrategy.skipPastLastEvent() Create a SKIP_PAST_LAST_EVENT skip strategy
AfterMatchSkipStrategy.skipToFirst(patternName) 使用引用的模式名稱patternName建立一個SKIP_TO_FIRST跳過策略
AfterMatchSkipStrategy.skipToLast(patternName) 使用引用的模式名稱patternName建立一個SKIP_TO_LAST跳過策略

而後經過調用將跳過策略應用於模式:

val skipStrategy = ...
Pattern.begin("patternName", skipStrategy)

注意:對於SKIP_TO_FIRST / LAST,有兩個選項能夠處理在沒有元素映射到指定變量時的狀況。 默認狀況下,將使用NO_SKIP策略。 另外一個選擇是在這種狀況下引起異常。 能夠經過如下方式啓用此選項:

AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()

檢測模式

 在指定了所需的模式序列以後,是時候將其應用於輸入流以檢測潛在的匹配了。 要針對模式序列運行事件流,必須建立一個PatternStream。 給定一個輸入流輸入,一個模式模式和一個可選的比較器比較器(用於在EventTime的狀況下對具備相同時間戳的事件或在同一時刻到達的事件進行排序),您能夠經過調用如下代碼來建立PatternStream:

val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)

輸入流能夠是鍵的,也能夠是非鍵的,具體取決於您的用例。

注意:將模式應用於非鍵控流將致使並行度等於1的做業。

從模式中選擇

 得到PatternStream以後,能夠將轉換應用於檢測到的事件序列。 建議的實現方式是經過PatternProcessFunction。

PatternProcessFunction具備一個processMatch方法,每一個匹配事件序列都會調用該方法。 它以Map <String,List <IN >>的形式接收匹配,其中鍵是模式序列中每一個模式的名稱,值是該模式全部可接受事件的列表(IN是您的類型 輸入元素)。 給定模式的事件按時間戳排序。 返回每一個模式的接受事件列表的緣由是,當使用循環模式(例如oneToMany()和times())時,給定模式可能會接受多個事件。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}

PatternProcessFunction容許訪問Context對象。 有了它,就能夠訪問與時間相關的特徵,例如currentProcessingTime或當前匹配的時間戳(這是分配給匹配的最後一個元素的時間戳)。 有關更多信息,請參見時間上下文。 經過這種狀況,還能夠將結果發送到副輸出。

處理超時的部分模式

 只要某個模式具備經過inner關鍵字附加的窗口長度,則可能會丟棄部分事件序列,由於它們超過了窗口長度。 要對超時的部分匹配採起行動,可使用TimedOutPartialMatchHandler接口。 該接口應該以混合樣式使用。 這意味着您還可使用PatternProcessFunction實現此接口。 TimedOutPartialMatchHandler提供了額外的processTimedOutMatch方法,將爲每一個超時的部分匹配調用該方法。

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}

注意:processTimedOutMatch不能訪問主輸出。 可是,仍然能夠經過Context對象經過側面輸出發出結果。

Convenience API

 前面提到的PatternProcessFunction是在Flink 1.8中引入的,從那時起,它是與匹配項進行交互的推薦方法。 仍然可使用老式的API,例如select / flatSelect,該API在內部將轉換爲PatternProcessFunction。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val outputTag = OutputTag[String]("side-output")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.flatSelect(outputTag){
    (pattern: Map[String, Iterable[Event]], timestamp: Long, out: Collector[TimeoutEvent]) =>
        out.collect(TimeoutEvent())
} {
    (pattern: mutable.Map[String, Iterable[Event]], out: Collector[ComplexEvent]) =>
        out.collect(ComplexEvent())
}

val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(outputTag)

CEP庫中的時間 

處理事件時間的延遲

 在CEP中,處理元素的順序很重要。 爲了確保在事件時間內工做時按正確的順序處理元素,將傳入的元素最初放在緩衝區中,在該緩衝區中,元素根據其時間戳按升序排序,而且當水印到達時,該緩衝區中的全部元素 小於水印的時間戳被處理。 這意味着水印之間的元素按事件時間順序進行處理。

注意:在事件時間內工做時,cep 庫假定水印正確無誤。

爲了確保跨水印的元素按事件時間順序進行處理,Flink的CEP庫假定水印是正確的,並將其時間戳小於最後看到的水印的時間戳視爲晚元素。 後期元素不會進一步處理。 另外,您能夠指定sideOutput標籤來收集在最後看到的水印以後出現的後期元素,您能夠像這樣使用它。

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)

val lateDataOutputTag = OutputTag[String]("late-data")

val result: SingleOutputStreamOperator[ComplexEvent] = patternStream
      .sideOutputLateData(lateDataOutputTag)
      .select{
          pattern: Map[String, Iterable[ComplexEvent]] => ComplexEvent()
      }

val lateData: DataStream[String] = result.getSideOutput(lateDataOutputTag)

Time context

 在PatternProcessFunction和IterativeCondition中,用戶能夠訪問實現TimeContext的上下文,以下所示:

/**
 * Enables access to time related characteristics such as current processing time or timestamp of
 * currently processed element. Used in {@link PatternProcessFunction} and
 * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
 */
@PublicEvolving
public interface TimeContext {

    /**
     * Timestamp of the element currently being processed.
     *
     * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
     * will be set to the time when event entered the cep operator.
     */
    long timestamp();

    /** Returns the current processing time. */
    long currentProcessingTime();
}

此上下文使用戶能夠訪問已處理事件的時間特徵(在IterativeCondition狀況下爲傳入記錄,在PatternProcessFunction狀況下爲匹配)。 調用TimeContext#currentProcessingTime始終會爲您提供當前處理時間的值,而且此調用應優先於例如 調用System.currentTimeMillis()。

若是使用TimeContext#timestamp(),則返回值等於使用EventTime時分配的時間戳。 在ProcessingTime中,該時間等於所述事件進入cep運算符的時間點(或在PatternProcessFunction的狀況下生成匹配項的時間點)。 這意味着該值在對該方法的屢次調用中將保持一致。

Examples

如下示例在事件的鍵控數據流上檢測模式的開始,中間(名稱=「錯誤」)->結束(名稱=「嚴重」)。 這些事件由其ID進行鍵控,而且有效模式必須在10秒內發生。 整個處理過程隨事件時間而定。

val env : StreamExecutionEnvironment = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val input : DataStream[Event] = ...

val partitionedInput = input.keyBy(event => event.getId)

val pattern = Pattern.begin[Event]("start")
  .next("middle").where(_.getName == "error")
  .followedBy("end").where(_.getName == "critical")
  .within(Time.seconds(10))

val patternStream = CEP.pattern(partitionedInput, pattern)

val alerts = patternStream.select(createAlert(_))

遷移到1.4+

在Flink-1.4中,刪除了CEP庫與<= Flink 1.2的向後兼容性。 不幸的是,不可能恢復曾經使用1.2.x運行的CEP做業。

遷移到1.3.x

 Flink-1.3中的CEP庫附帶了許多新功能,這些新功能致使API發生了一些更改。 在這裏,咱們描述了您須要對舊的CEP做業進行的更改,以便可以使用Flink-1.3運行它們。 進行這些更改並從新編譯做業後,您將可以從做業的舊版本獲取的保存點恢復執行,即無需從新處理過去的數據。

所需的更改是:

  1. 更改條件(where(...)子句中的條件)以擴展SimpleCondition類,而不是實現FilterFunction接口。

  2. 更改做爲select(...)和flatSelect(...)方法的參數提供的函數,以獲取與每一個模式關聯的事件列表(Java中爲List,Scala中爲Iterable)。 這是由於經過添加循環模式,多個輸入事件能夠匹配單個(循環)模式。

  3. Flink 1.1和1.2中的followedBy()隱含了不肯定的寬鬆連續性(請參見此處)。 在Flink 1.3中,此更改已更改,而且followBy()表示寬鬆鄰接,而若是須要非肯定性寬鬆鄰接,則應使用followByAny()。

歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文

相關文章
相關標籤/搜索