Flink中的CEP復瑣事件處理 (源碼分析)

其實CEP復瑣事件處理,簡單來講你能夠用經過相似正則表達式的方式去表示你的邏輯,表現能力很是的強,用過的人都知道html

開篇先偷一張圖,總體瞭解FlinkCEP中的  一種重要的圖  NFAjava

FlinkCEP在運行時會將用戶的邏輯轉化成這樣的一個NFA Graph (nfa對象)正則表達式

graph 中包含狀態(Flink中State對象),以及鏈接狀態的邊(Flink中StateTransition對象)源碼分析

當從一個State跳變到另外一個State時須要經過一條邊StateTransition,這條邊中包含一個Condition對象包含了用戶的邏輯就是咱們用戶代碼中.where()中返回Boolean的方法spa

也就是說Condition對象中包含是否能夠完成狀態跳變的條件,A狀態要跳變到B狀態就必須知足鏈接AB的邊中的條件(邊StateTransition對象屬於B state)設計

其中邊StateTransition分爲三種3d

  take: 狀態知足跳變條件後直接跳變到B狀態htm

  ignore: 狀態知足跳變條件之後又回到原來狀態,狀態保持不變對象

  process: 這條邊能夠忽略也能夠不忽略blog

後面源碼分析的時候能夠看到他們之間的區別

接着從源碼來看一下如何用這個NFA圖實現Flink中的CEP復瑣事件處理的

由於CEP在Flink中被設計成算子的一種而不是單獨的計算引擎,因此直接找到CepOperator.java中

來看一下它的初始化Open()

這裏看到有一個NFAFactory的工廠建立了一個NFA,這裏的這個工廠是在Driver端經過用戶編寫的代碼返回的Patten對象轉換獲得的,也就是用戶env.exection()的時候解析的,工廠對象還包含了用戶全部的State集合

繼續,在createNFA()方法中

 將工廠中的全部頂點也就是狀態States放到了NFA對象的一個Map中

 Key爲這個States的Name(其實就是用戶代碼中的.next("Name"))

接着看CepOperator.java中接收到數據processElement()方法作了什麼

 這裏是處理時間的,這裏其實就是直接執行了,這裏就不看了,直接看事件時間是如何處理的

先是取出數據的事件時間,判斷是否是小於當前水印了,小於這條數據就證實遲到過久了,若是有側輸出丟給側輸出處理,沒有就直接丟棄了,和WindowOperater同樣

而後看saveRegisterWatermarkTimer()方法

將 (當前水印+1) 註冊成了一個定時器timer用於觸發計算,和window原理同樣(不知道的能夠看看前面的文章)

這裏主要是由於窗口是一批一批觸發而CEP須要逐個觸發,因此用(當前水印+1)當作定時器,也就是說只要水印往前推動了就觸發推動這段時間的全部計算

而後bufferEvent()將這條數據加入到了一個Queue中

如今來看觸發計算的具體邏輯

來到onEventTime()方法中

先是拿到一個用時間排序的優先隊列PriorityQueue裏面就是排序的事件時間

getNFAState()這裏比較重要,這裏經過nfa獲得了一個nfaState具體來看一下

這裏這個NFAstate會初始化,NFAstate裏面包含了一個ComputationState的queue,主要目的是用於每條數據來的時候都會去遍歷這個queue,看這條數據是否能匹配上裏面的state若是匹配上了就更新下一個準備匹配的狀態

這裏就知道他爲何NFAstate初始化的時候會把用戶全部的State中能夠做爲開始start的狀態放queue了吧

由於一開始沒數據,當來數據的時候我要判斷這條數據是否是屬於我CEP的Begin頭,這個state也就是咱們用戶的begin()方法,因此才把全部的能夠做爲開始的狀態都放到這個PartialMatches這個queue中去,這個PartialMatches後面計算的時候會用到,注意

NFAState的初始化就講完了

繼續,回處處理邏輯

而後根據事件時間做爲key拉取前面將數據放入的那個queue中數據,返回的是一個List包含這個事件時間的全部數據

而後排序,這裏是二次排序,第一次排序是用的事件時間,二次排序排的是同一時間的數據按什麼順序處理

而後這裏ProcessEvent()方法就是具體執行的邏輯了,這裏同時會把剛剛初始化好的NFAState傳遞進去

 一開始會獲取一個共享的緩衝區主要是爲了減少CEP重複數據存儲的內存佔用,這裏不講了由於CEP論文裏面有,比較複雜

這裏process()方法就是具體邏輯了,返回了一個map這個map包含了process()方法這條數據匹配成功結束的數據也就是結果,而processMatchedSequences(patterns, timestamp)就是執行用戶的.select()邏輯了

既然這裏就獲得了CEP匹配的結果,來看下具體計算邏輯nfa.process()

這裏又初始化兩個優先隊列

分別用於

  newPartialMatches  裝nfa匹配到一半沒有結束數據,也就是半匹配,

  potentialMatches     裝成功匹配完成的數據,用於返回,調用用戶的方法去處理結果

接着

這裏就直接去初始化好的NFAState中拿剛剛的那個PartialMatches,而且遍歷它,經過傳入這個computeNextStates()方法,用於判斷這條數據是否能夠知足這個ComputationState完成匹配

注意! 一開始時初始化裏面只有全部可做爲CEP匹配頭的ComputationState,可想而知當後面匹配上了之後確定會更新這個用於看數據是否匹配的queue

 

這裏就能夠知道了整個CEP的處理方式了:  

   一開始會把全部能夠做爲CEP匹配頭的狀態State先放入queue,每來一條數據就會遍歷queue中全部state,看這條數據是否能能匹配上,能匹配上就在queue中加入下一個用於匹配的狀態,  用於看下一條數據可否繼續匹配上

   好比一個正則"abc"用於CEP匹配 當來了一條a數據,就匹配上CEP頭了,會把b state加入queue中,接着來了一條b數據,又繼續匹配上了,又把c state加入queue 直到來了一條c數據整個就匹配完成,返回結果

 

   總結 : 處理過程就是兩步

        1.來一條數據,遍歷queue中全部state,看哪些state能匹配上就匹配

        2.根據1的結果更新queue,用於下一條數據的匹配 

    

而判斷是否能匹配上就是這個computerNextStates()方法中

先把這個狀態state壓棧

從棧中取state遍歷它全部的邊 StateTransitions

調用用戶的方法看是否能知足邊條件,也就是說是否能跳變到這個狀態

當知足時,會根據邊

  ignore: 啥都不作

  take:       加入結果集中

  process:  又把這個狀態的下一個狀態state壓棧了,繼續循環處理

 

結果返回這條數據匹配上的狀態們,因而

遍歷全部匹配上的狀態得結果集,會把匹配上的狀態的下一個(target)用於匹配的狀態加進queue去

若是是結束,默認NFAstate中是有一個自帶"&end"的結束state

遍歷全部完成的狀態,當匹配上最後一個狀態時就是上面說的「&end」就證實完成了,丟到完成queue中

當匹配失敗了就清空狀態

當匹配上了但尚未結束就丟到半匹配queue

接着

會先執行跳過策略把結果篩選一遍

而後

就是用咱們前面說的那個半匹配queue了,用它又繼續更新了NFAState中的PartialMatches了

下一條數據來了之後就會用遍歷這個新queue集合來判斷是否能夠繼續匹配了

而後返回此次匹配成功的數據,調用用戶select方法處理結果了

原文出處:https://www.cnblogs.com/ljygz/p/11978386.html

相關文章
相關標籤/搜索