從版本1.5.0開始,Apache Flink具備一種稱爲廣播狀態的新型狀態。 在這篇文章中,咱們解釋了廣播狀態是什麼,並展現瞭如何將其應用於評估事件流上的動態模式的應用程序的示例。 咱們將引導您完成開發步驟和代碼,以實現此應用程序。html
什麼是廣播狀態apache
廣播狀態能夠用於以特定的方式組合和聯合兩個事件流。第一個事件流被廣播給算子的全部並行實例,這些實例將他們維持在狀態中。 其它事件流將不會被廣播,可是會被髮給同一個算子的個別實例,並與廣播流事件一塊兒處理。新的廣播狀態很是適合須要加入低吞吐量和高吞吐量流或須要動態更新其處理邏輯的應用程序。咱們將使用後一個用例的具體示例來解釋廣播狀態,並在本文的其他部分更詳細地展現其API。併發
廣播狀態的動態模式評估ide
想象一下一個電子商務網站捕獲全部用戶的交互做爲用戶行爲流。運營該網站的公司對於分析交互以增長收入,改善用戶體驗,以及檢測和防止惡意行爲很感興趣。該網站實現了一個流應用程序,用於檢測用戶事件流上的模式。可是,公司但願每次模式更改時都避免修改和從新部署應用程序。相反,應用程序在從模式流接收新行爲時獲取第二個模式流並更新其活動模式。在下文中,咱們將逐步討論此應用程序,並展現它如何利用Apache Flink中的廣播狀態功能。函數
咱們的示例應用程序獲取了兩個數據流。第一個流在網站上提供用戶操做,並在上圖的左上方顯示。用戶交互事件包括操做的類型(用戶登陸,用戶註銷,添加到購物車或完成支付)和用戶的ID,他們都被各類顏色進行編碼。在咱們的圖示中的用戶動做事件流包含用戶1001的註銷動做,其後是用戶1003的支付完成事件,以及用戶1002的「添加到購物車」動做。網站
第二個流的操做模式將會經過應用進行評估。模式由兩個連續的動做組成。this
在上圖中,模式流包含如下兩個:編碼
模式#1:用戶登陸並當即註銷並無瀏覽電子商務網站上的其餘頁面。lua
模式#2:用戶將項目添加到購物車並在不完成購買的狀況下注銷。設計
這些模式有助於企業更好地分析用戶行爲,檢測惡意行爲並改善網站體驗。例如,若是項目被添加到購物車而沒有後續購買,網站團隊能夠採起適當的措施來更好地瞭解用戶未完成購買的緣由並啓動特定程序以改善網站環境( 如提供折扣,限時免費送貨優惠等)。
在右側,該圖顯示了一個算子的三個並行任務,即侵入模式和用戶操做流,評估操做流上的模式,並在下游發出模式匹配。爲了簡單起見,在咱們例子中的算子僅僅評估具備兩個後續操做的單個模式。當從模式流接收到新模式時,當前活動模式會被替換。實質上,這個算子還能夠同時評估更復雜的模式或多個模式,這些模式能夠單獨添加或移除。
咱們將描述匹配應用程序的模式如何處理用戶操做和模式流。
首先一個模式被髮送給一個算子。這個模式將會被廣播給全部算子的三個並行任務。任務將會將這個模式存儲在廣播狀態中。因爲廣播狀態只應使用廣播數據進行更新,所以全部任務的狀態始終預期相同。
接下來,第一個用戶的操做將會根據用戶的id進行分區,而且會被髮送到相應算子的任務中。這個分區可以確保同一個用戶的全部操做都會被同一個任務處理。上圖顯示了該算子處理了第一個模式和前三個操做事件後應用程序的狀態。
當一個任務收到了一個新的用戶操做,它會經過查看用戶的最新和先前操做來評估當前活動的模式。對於每一個用戶,算子會將先前的操做儲存在key state中。因爲上圖中的任務到目前爲止僅僅收到了每一個用戶的一個操做(咱們剛剛啓動了應用程序),所以不須要評估該模式。最後,存儲在key state中的用戶的先前操做將會被更新爲最新動做,以便可以在同一用戶的下一個動做到達時查找它。
在前三個動做被處理以後,下一個事件(用戶1001的註銷操做)是被髮送處處理用戶1001的事件的任務。當用戶獲取動做時,它從廣播狀態和用戶1001的先前動做中查找當前模式。模式匹配兩個動做以後,任務提交模式匹配事件。 最後,任務經過使用最新操做覆蓋上一個事件來更新其key state。
當一個新模式到達模式流時,它被廣播到全部任務,而且每一個任務經過用新模式替換當前模式來更新其廣播狀態。
一旦廣播狀態被一種新的模式更新後,匹配邏輯可以如先前那樣繼續,換句話說,用戶的操做事件將會按key進行分區,而且由負責的任務進行評估。
如何使用廣播狀態實現應用程序?
到目前爲止,咱們在概念上討論了該應用程序並解釋了它如何使用廣播狀態來評估事件流上的動態模式。 接下來,咱們將展現如何使用Flink的DataStream API和廣播狀態功能實現示例應用程序。
讓咱們從應用程序的輸入數據開始。 咱們有兩個數據流,操做和模式。 在這一點上,咱們並不關心流來自何處。 能夠從Apache Kafka或Kinesis或任何其餘系統獲取流。 動做和模式是擁有兩個字段的Pojos:
DataStream<Action> actions = ??? DataStream<Pattern> patterns = ???
動做和模式是擁有兩個字段的Pojos
Action: Long userId, String action
Pattern: String firstAction, String secondAction
做爲第一步,咱們將操做流上的userId屬性。
KeyedStream<Action, Long> actionsByUser = actions .keyBy((KeySelector<Action, Long>) action -> action.userId);
接下來,我將開始嘗試廣播狀態。廣播狀態通常以MapState爲表明,這是Flink提供的最通用的狀態原語。
MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
因爲咱們的應用程序一次僅評估和存儲單個Pattern,所以咱們將廣播狀態配置爲具備鍵類型Void和值類型Pattern的MapState。 Pattern始終存儲在MapState中,並將null做爲鍵。
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);
使用MapStateDescriptor
做爲廣播狀態,咱們對模式流應用broadcast()轉換並接收廣播流bcedPatterns
。
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser .connect(bcedPatterns) .process(new PatternEvaluator());
在咱們得到keyed actionsByUser流和廣播的bcedPatterns流以後,咱們鏈接兩個流並在鏈接的流上應用PatternEvaluator。 PatternEvaluator是一個實現KeyedBroadcastProcessFunction接口的自定義函數。 它應用咱們以前討論過的模式匹配邏輯,併發出包含用戶ID和匹配模式的Tuple2 <Long,Pattern>記錄。
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> { // handle for keyed state (per user) ValueState<String> prevActionState; // broadcast state descriptor MapStateDescriptor<Void, Pattern> patternDesc; @Override public void open(Configuration conf) { // initialize keyed state prevActionState = getRuntimeContext().getState( new ValueStateDescriptor<>("lastAction", Types.STRING)); patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)); } /** * Called for each user action. * Evaluates the current pattern against the previous and * current action of the user. */ @Override public void processElement( Action action, ReadOnlyContext ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // get current pattern from broadcast state Pattern pattern = ctx .getBroadcastState(this.patternDesc) // access MapState with null as VOID default value .get(null); // get previous action of current user from keyed state String prevAction = prevActionState.value(); if (pattern != null && prevAction != null) { // user had an action before, check if pattern matches if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) { // MATCH out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern)); } } // update keyed state and remember action for next pattern evaluation prevActionState.update(action.action); } /** * Called for each new pattern. * Overwrites the current pattern with the new pattern. */ @Override public void processBroadcastElement( Pattern pattern, Context ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc); // storing in MapState with null as VOID default value bcState.put(null, pattern); } }
KeyedBroadcastProcessFunction接口提供了三種處理記錄和發出結果的方法。
processBroadcastElement()
被廣播流上的每一個記錄調用。在咱們的 PatternEvaluator
函數中, 咱們簡單的使用null
健將接收到的 Pattern
記錄放入廣播狀態(記住,咱們只在MapState
中存儲單個模式)。processElement()
被 keyed stream上的每條記錄調用。 它提供對廣播狀態的只讀訪問,以防止經過函數的並行實例修改不一樣廣播狀態中的結果。PatternEvaluator
的processElement()
方法從廣播狀態檢索當前模式,從keyed狀態檢索用戶的上一個動做。若是兩個都存在,它將會檢查以前的模式和如今操做是否和模式匹配,若是相匹配,將會發送匹配的記錄。最後,它會更新當前用戶操做的keyed state。onTimer()
將會在先前註冊的計時器觸發時被調用。定時器能夠在processElement
方法中註冊,並用於執行計算或未來清理狀態。爲了保持代碼的簡潔,在咱們的示例中沒有實現該方法。可是,當用戶在一段時間內未處於活動狀態時,它可用於刪除用戶的最後一個操做,以免因爲非活動用戶而致使狀態增加您可能已經注意到KeyedBroadcastProcessFunction的處理方法的上下文對象。
上下文對象提供對其餘功能的訪問,例如:
TimerService
,能夠訪問記錄的時間戳,當前的水印,以及哪些能夠註冊定時器,processElement()
中可用)和, processBroadcastElement()
中可用)KeyedBroadcastProcessFunction能夠像任何其餘ProcessFunction同樣徹底訪問Flink狀態和時間功能,所以可用於實現複雜的應用程序邏輯。廣播狀態被設計爲一種適用於不一樣場景和用例的通用功能。雖然咱們只討論了一個至關簡單且受限制的應用程序,但您能夠經過多種方式使用廣播狀態來實現應用程序的要求。
結論
在這篇博文中,咱們向您介紹了一個示例應用程序,以解釋Apache Flink的廣播狀態以及它如何用於評估事件流上的動態模式。 咱們還討論了API並展現了咱們的示例應用程序的源代碼。
咱們邀請您查看此功能的文檔,並經過咱們的郵件列表提供反饋或建議以進一步改進。
原文連接:https://flink.apache.org/2019/06/26/broadcast-state.html