從1.5.0開始,Flink提供了一種新的State類型,稱爲Broadcast State。在這篇文章中,咱們將解釋什麼是Broadcast State,並展現如何將其應用於評估事件流上的動態模式的應用的示例。咱們將向您介紹處理步驟和源代碼,以實現此應用。html
Broadcast State可用於以特定方式組合和聯合處理兩個事件流。第一個流的事件被廣播到一個算子的全部並行實例,該算子將它們保存爲狀態。另外一個流的事件不廣播,而是發送給同一個算子的單個實例,並與廣播流的事件一塊兒處理。對於須要鏈接低吞吐量和高吞吐量流或須要動態更新處理邏輯的應用來講,新的broadcast state很是適合。咱們將使用一個具體示例來解釋broadcast state,並在本文的其他部分更詳細地展現其API。java
想象一下,一個電子商務網站捕獲全部用戶的交互做爲用戶行爲流。運營網站的公司有興趣分析交互,以增長收入,改善用戶體驗,並檢測和防止惡意行爲。該網站實現了一個流應用,該應用檢測用戶事件流上的模式。可是,公司但願避免每次模式改變時修改和從新部署應用。相反,當應用接收到來自模式流的新模式時,它會攝取第二個模式流並更新其活動模式。在下面,咱們將逐步討論這個應用,並展現它如何利用Flink中的broadcast state特性。apache
咱們的示例應用包含兩個數據流。第一個流提供用戶在網站上的行爲,如上圖的左上方所示。用戶交互事件包括行爲的類型(用戶登陸、用戶註銷、添加到購物車或完成支付)和用戶的id,該id由顏色編碼(不一樣顏色表明不一樣用戶)。咱們插圖中的用戶行爲事件流包含用戶1001的註銷行爲,用戶1003的支付行爲和用戶1002的加入購物車行爲。微信
第二個流提供應用將評估的行爲模式。模式由兩個連續的行爲組成。在上面的圖中,模式流包含如下兩個:併發
這些模式有助於企業更好地分析用戶行爲、檢測惡意行爲和改善網站體驗。例如,若是項目被添加到購物車而沒有後續購買,網站團隊能夠採起適當的行動,以更好地理解用戶不完成購買的緣由,並啓動特定的程序來提升網站轉化(例如提供折扣碼、限時免費送貨優惠等)。ide
在右側,該圖顯示了算子的三個並行任務,它們攝取模式和用戶行爲流,評估行爲流上的模式,並向下遊發出模式匹配。爲了簡單起見,在咱們的示例中,算子只計算一個模式,只包含兩個後續行爲。當從模式流接收到新模式時,替換當前活動的模式。原則上,還能夠實現一個算子來同時評估更復雜的模式或多個模式,這些模式能夠單獨添加或刪除。函數
咱們將描述模式匹配應用如何處理用戶行爲流和模式流。網站
首先,將模式發送給算子。該模式被廣播到算子的全部三個並行任務。任務將模式存儲在其broadcast state中。因爲broadcast state只應該使用廣播數據更新,因此全部任務的狀態都是相同的。this
接下來,在用戶id上對第一個用戶行爲進行分區,並將其發送給下游算子。分區確保同一個用戶的全部行爲都由同一個任務處理。上圖顯示了應用在第一個模式以後的狀態,以及算子消耗了前三個行爲事件。編碼
當任務接收到新的用戶行爲時,它會經過查看用戶的最新和先前行爲來評估當前活動模式。對於每一個用戶,運算符將前面的操爲存儲在keyed state。因爲圖中的任務到目前爲止只接收到每一個用戶的一個行爲(咱們剛剛啓動應用),所以不須要對模式進行評估。最後,處於用戶keyed state的前一個行爲被更新爲最新的行爲,以便可以在同一用戶的下一個行爲到達時查找它。
在處理前三個行爲以後,下一個事件(用戶1001的註銷行爲)被髮送處處理用戶1001的事件的任務。當任務接收到行爲時,它從broadcast state和用戶1001的前一個行爲中查找當前模式。因爲模式與兩個行爲匹配,任務將發出模式匹配事件。最後,該任務經過使用最新行爲覆蓋前一個事件來更新其keyed state。
當一個新模式到達模式流時,它將被廣播到全部任務,每一個任務經過用新模式替換當前模式來更新其broadcast state。
一旦用新模式更新broadcast state,匹配邏輯就會像之前同樣繼續,即用戶行爲事件按key進行分區,並由負責的任務進行評估。
到目前爲止,咱們從概念上討論了這個應用,並解釋了它如何使用broadcast state來評估事件流上的動態模式。接下來,咱們將展現如何使用Flink的Datastream API和broadcast state特性來實現示例應用。
讓咱們從應用的輸入數據開始。咱們有兩個數據流,行爲流和模式流。在這一點上,咱們並不關流從何而來。這些流多是從Kafka、Kinesis或任何其餘系統中攝取的。行爲和模式是Pojos,每一個字段有兩個:
DataStream<Action> actions = ???
DataStream<Pattern> patterns = ???
複製代碼
Action
和Pattern
Pojos有兩個字段:
Action: Long userId, String action
Pattern: String firstAction, String secondAction
第一步,咱們在流上使用userId
屬性進行keyBy操做。
KeyedStream<Action, Long> actionsByUser = actions
.keyBy((KeySelector<Action, Long>) action -> action.userId);
複製代碼
接下來,咱們準備broadcast state。broadcast state始終表示爲MapState,這是Flink提供的最通用的狀態原語。
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
複製代碼
因爲咱們一次僅評估和存儲單個模式,咱們將broadcast state配置爲具備鍵類型Void和值類型Pattern的MapState。模式始終存儲在MapState中,並將null做爲鍵。
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);
複製代碼
對於broadcast state應該使用MapStateDescriptor
,咱們在patterns流上調用broadcast()
方法將它轉換爲BroadcastStream流bcedPatterns.
DataStream<Tuple2<Long, Pattern>> matches = actionsByUser
.connect(bcedPatterns)
.process(new PatternEvaluator());
複製代碼
咱們獲得了keyed以後的actionsByUser流與廣播流bcedPatterns,咱們調用connect()
方法將他們鏈接在一塊兒而後在流上應用PatternEvaluator
。PatternEvaluator
實現了KeyedBroadcastProcessFunction
接口。它應用咱們前面討論過的模式匹配邏輯,併發送包含用戶id和匹配模式的記錄的Tuple2<Long, Pattern>
。
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> {
// handle for keyed state (per user)
ValueState<String> prevActionState;
// broadcast state descriptor
MapStateDescriptor<Void, Pattern> patternDesc;
@Override
public void open(Configuration conf) {
// initialize keyed state
prevActionState = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastAction", Types.STRING));
patternDesc =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
}
/** * Called for each user action. * Evaluates the current pattern against the previous and * current action of the user. */
@Override
public void processElement( Action action, ReadOnlyContext ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx
.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
/** * Called for each new pattern. * Overwrites the current pattern with the new pattern. */
@Override
public void processBroadcastElement( Pattern pattern, Context ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
}
複製代碼
這個KeyedBroadcastProcessFunction
接口提供了處理記錄和發出結果的三種方法。
processBroadcastElement()
: 在廣播流的每一個記錄調進來的時候用。在PatternEvaluator
函數,咱們簡單地將接收到的Pattern
使用null
鍵(記住,咱們只在MapState
).processElement()
: 在keyed stream的每一個記錄進來的時候調用。它提供對Broadcast State的只讀訪問,以防止對跨函數並行實例的不一樣broadcast state的修改。這PatternEvaluator的processElement()
方法從broadcast state檢索當前模式,從keyed state檢索用戶的先前行爲。若是二者都存在,它將檢查前面和當前的行爲是否與模式匹配,若是匹配話,它會發出模式匹配記錄。最後,它將keyed state更新爲當前用戶行爲。onTimer()
: 在以前註冊過的計時器觸發時調用。計時器能夠在processElement
方法中註冊,用於執行計算或清除未來的狀態。爲了保持代碼的簡潔性咱們沒有在咱們的示例中實現這個方法。可是,當用戶在一段時間內沒有活動時,可使用它來刪除用戶的最後一個行爲,以免因爲不活動的用戶而致使state的增加。你可能已經注意到KeyedBroadcastProcessFunction
的process方法。context 對象容許使用其餘功能,如:
TimerService
,它容許訪問記錄的時間戳、當前watermark,而且能夠註冊計時器processElement()
方法中可用),以及一種將函數應用於每一個註冊key的keyed state的方法(僅在processBroadcastElement()
方法中可用)這個KeyedBroadcastProcessFunction
就像其餘ProcessFunction同樣徹底能夠訪問Flink中的state和時間特性,所以能夠用來實現複雜的邏輯。broadcast state被設計成一個通用的特性,能夠適應不一樣的場景和用例。雖然咱們只討論了一個至關簡單和受限的應用,但您能夠經過多種方式使用broadcast state來實現應用的需求。
在這篇文章中,咱們向您介紹了一個示例應用,以解釋Flink的broadcast state是什麼,以及如何使用它來評估事件流上的動態模式。咱們還討論了API,並展現了示例應用的源碼。
歡迎關注個人微信公衆號