本文根據Apache Flink 實戰&進階篇系列直播課程整理而成,由哈囉出行大數據實時平臺資深開發劉博分享。經過一些簡單的實際例子,從概念原理,到如何使用,再到功能的擴展,但願可以給打算使用或者已經使用的同窗一些幫助。數據庫
主要的內容分爲以下三個部分:服務器
CEP的意思是復瑣事件處理,例如:起牀-->洗漱-->吃飯-->上班等一系列串聯起來的事件流造成的模式稱爲CEP。若是發現某一次起牀後沒有刷牙洗臉亦或是吃飯就直接上班,就能夠把這種非正常的事件流匹配出來進行分析,看看今天是否是起晚了。網絡
下圖中列出了幾個例子:架構
Flink CEP內部是用NFA(非肯定有限自動機)來實現的,由點和邊組成的一個狀態圖,以一個初始狀態做爲起點,通過一系列的中間狀態,達到終態。點分爲起始狀態、中間狀態、最終狀態三種,邊分爲take、ignore、proceed三種。運維
下面以一個打車的例子來展現狀態是如何流轉的,規則見下圖所示。函數
以乘客制定行程做爲開始,匹配乘客的下單事件,若是這個訂單超時尚未被司機接單的話,就把行程事件和下單事件做爲結果集往下游輸出。大數據
假如消息到來順序爲:行程-->其餘-->下單-->其餘。優化
狀態流轉以下:阿里雲
一、開始時狀態處於行程狀態,即等待用戶制定行程。spa
二、當收到行程事件時,匹配行程狀態的條件,把行程事件放到結果集中,經過take邊將狀態往下轉移到下單狀態。
三、因爲下單狀態上有一條ignore邊,因此能夠忽略收到的其餘事件,直到收到下單事件時將其匹配,放入結果集中,而且將當前狀態往下轉移到超時未接單狀態。這時候結果集當中有兩個事件:制定行程事件和下單事件。
四、超時未接單狀態時,若是來了一些其餘事件,一樣能夠被ignore邊忽略,直到超時事件的觸發,將狀態往下轉移到最終狀態,這時候整個模式匹配成功,最終將結果集中的制定行程事件和下單事件輸出到下游。
上面是一個匹配成功的例子,若是是不成功的例子會怎麼樣?
假如當狀態處於超時未接單狀態時,收到了一個接單事件,那麼就不符合超時未被接單的觸發條件,此時整個模式匹配失敗,以前放入結果集中的行程事件和下單事件會被清理。
本節將詳細介紹Flink CEP的程序結構以及API。
主要分爲兩部分:定義事件模式和匹配結果處理。
官方示例以下:
程序結構分爲三部分:首先須要定義一個模式(Pattern),即第2行代碼所示,接着把定義好的模式綁定在DataStream上(第25行),最後就能夠在具備CEP功能的DataStream上將匹配的結果進行處理(第27行)。
下面對關鍵部分作詳細講解:
定義模式:上面示例中,分爲了三步,首先匹配一個ID爲42的事件,接着匹配一個體積大於等於10的事件,最後等待收到一個name等於end的事件。
匹配結果輸出:此部分,須要重點注意select函數(第30行,注:本文基於Flink 1.7版本)裏邊的Map類型的pattern參數,Key是一個pattern的name,它的取值是模式定義中的Begin節點start,或者是接下來next裏面的middle,或者是第三個步驟的end。後面的map中的value是每一步發生的匹配事件。因在每一步中是可使用循環屬性的,能夠匹配發生屢次,因此map中的value是匹配發生屢次的全部事件的一個集合。
上圖中,藍色方框表明的是一個個單獨的模式;淺黃色的橢圓表明的是這個模式上能夠添加的屬性,包括模式能夠發生的循環次數,或者這個模式是貪婪的仍是可選的;橘色的橢圓表明的是模式間的關係,定義了多個模式之間是怎麼樣串聯起來的。經過定義模式,添加相應的屬性,將多個模式串聯起來三步,就能夠構成了一個完整的Flink CEP程序。
下面是示例代碼:
定義模式主要有以下5個部分組成:
pattern:前一個模式
next/followedBy/...:開始一個新的模式
start:模式名稱
where:模式的內容
filter:核心處理邏輯
接下來介紹一下怎樣設置模式的屬性。模式的屬性主要分爲循環屬性和可選屬性。
循環屬性能夠定義模式匹配發生固定次數(times),匹配發生一次以上(oneOrMore),匹配發生屢次以上。(timesOrMore)。
可選屬性能夠設置模式是貪婪的(greedy),即匹配最長的串,或設置爲可選的(optional),有則匹配,無則忽略。
因爲模式的匹配事件存放在狀態中進行管理,因此須要設置一個全局的有效期(within)。 若不指定有效期,匹配事件會一直保存在狀態中不會被清除。至於有效期能開多大,要依據具體使用場景和數據量來衡量,關鍵要看匹配的事件有多少,隨着匹配的事件增多,新到達的消息遍歷以前的匹配事件會增長CPU、內存的消耗,而且隨着狀態變大,數據傾斜也會愈來愈嚴重。
主要分爲三種:嚴格連續性(next/notNext),寬鬆連續性(followedBy/notFollowedBy),和非肯定寬鬆連續性(followedByAny)。
三種模式匹配的差異見下表所示:
總結以下:
除了前面提到的模式定義和模式間的聯繫,還能夠把相連的多個模式組合在一塊兒當作一個模式組,相似於視圖,能夠在這個模式視圖上進行相關操做。
上圖這個例子裏面,首先匹配了一個登陸事件,而後接下來匹配瀏覽,下單,購買這三個事件反覆發生三次的用戶。
若是沒有模式組的話,代碼裏面瀏覽,下單,購買要寫三次。有了模式組,只需把瀏覽,下單,購買這三個事件當作一個模式組,把相應的屬性加上times(3)就能夠了。
處理匹配的結果主要有四個接口: PatternFlatSelectFunction,PatternSelectFunction,PatternFlatTimeoutFunction和PatternTimeoutFunction。
從名字上能夠看出,輸出能夠分爲兩類:select和flatSelect指定輸出一條仍是多條,timeoutFunction和不帶timeout的Function指定可不能夠對超時事件進行旁路輸出。
下圖是輸出的綜合示例代碼:
當一個事件到來時,若是這個事件同時符合多個輸出的結果集,那麼這個事件是如何保存的?
Flink CEP經過Dewey計數法在多個結果集共享同一個事件副本,以實現對事件副本進行資源共享。
本章主要介紹一些Flink CEP的擴展,講述如何作到超時機制的精確管理,以及規則的動態加載與更新。
原生Flink CEP中超時觸發的功能能夠經過within+outputtag結合來實現,可是在複雜的場景下處理存在問題,以下圖所示,在下單事件後還有一個預付款事件,想要獲得下單而且預付款後超時未被接單的訂單,該如何表示呢?
參照下單後超時未被接單的作法,把下單而且預付款後超時未被接單規則表示爲下單.followedBy(預付款).followedBy(接單).within(time),那麼這樣實現會存在問題嗎?
這種作法的計算結果是會存在髒數據的,由於這個規則不只匹配到了下單而且預付款後超時未被接單的訂單(想要的結果),一樣還匹配到了只有下單行爲後超時未被接單的訂單(髒數據,沒有預付款)。緣由是由於超時within是控制在整個規則上,而不是某一個狀態節點上,因此不論當前的狀態是處在哪一個狀態節點,超時後都會被旁路輸出。
那麼就須要考慮可否經過時間來直接對狀態轉移作到精確的控制,而不是經過規則超時這種曲線救國的方式。 因而乎,在經過消息觸發狀態的轉移以外,須要增長經過時間觸發狀態的轉移的支持。要實現此功能,須要在原來的狀態以及狀態轉移中,增長時間屬性的概念。以下圖所示,經過wait算子來獲得waiting狀態,而後在waiting狀態上設置一個十秒的時間屬性以定義一個十秒的時間窗口。
wait算子對應NFA中的ignore狀態,將在沒有到達時間窗口結束時間時自旋,在ComputationState中記錄wait的開始時間,在NFA的doProcess中,將到來的數據與waiting狀態處理,若是到了waiting的結束時間,則進行狀態轉移。
上圖中紅色方框中爲waiting狀態設置了兩條ignore邊:
1.waitingStatus.addIgnore(lastSink,waitingCondition),waitingCondition中的邏輯是獲取當前的時間(支持事件時間),判斷有沒有超過設置的waiting閾值,若是超過就把狀態向後轉移。
2.waitingStatus.addIgnore(waitingCondition),waitingCondition中若是未達到設置的waiting閾值,就會自旋在當前的waiting狀態不變。
線上運行的CEP中確定常常遇到規則變動的狀況,若是每次變動時都將任務重啓、從新發布是很是不優雅的。尤爲在營銷或者風控這種對實時性要求比較高的場景,若是規則窗口過長(一兩個星期),狀態過大,就會致使重啓時間延長,期間就會形成一些想要處理的異常行爲不能及時發現。
那麼要怎麼樣作到規則的動態更新和加載呢?
梳理一下總體架構,Flink CEP是運行在Flink Job裏的,而規則庫是放在外部存儲中的。首先,須要在運行的Job中能及時發現外部存儲中規則的變化,即須要在Job中提供訪問外部庫的能力。 其次,須要將規則庫中變動的規則動態加載到CEP中,即把外部規則的描述解析成Flink CEP所能識別的pattern結構體。最後,把生成的pattern轉化成NFA,替換歷史NFA,這樣對新到來的消息,就會使用新的規則進行匹配。
下圖就是一個支持將外部規則動態注入、更新的接口。
這個接口裏面主要實現了四個方法:
新規則動態加載到Flink CEP的Job中,替換掉原來的NFA以後,還須要對歷史匹配的結果集進行清理。在AbstractKeyedCEPPatternOperator中實現刷新NFA,注意,歷史狀態是否須要清理和業務相關:
使用Flink CEP,熟知其原理是很重要的,特別是NFA的狀態轉移流程,而後再去看源碼中的狀態圖的構建就會很清晰了。
本文做者:巴蜀真人
本文爲阿里雲內容,未經容許不得轉載。