業務目標python
// 開始和上一個業務同樣,建立cleanMapFun來提取須要的數據屬性,這裏只須要時間戳、roomid和userid三個屬性 // 第二個功能:先計算每5分鐘各房間的人數,這樣能同時爲總人數的計算進行預聚合。這裏直接利用ProcessWindowFunction進行計算,而不是先aggregate後再進行process計算。這裏想的是因爲窗口變小,因此積聚的數據可能不會太多,並且獲得數據後一次性計算要更快。 SingleOutputStreamOperator<Tuple3<Long, Integer, Set<Long>>> visitorsPerRoom = cleanStream .keyBy(OperationRecord::getRoomid) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new ProcessWindowFunction<OperationRecord, Tuple3<Long, Integer, Set<Long>>, Integer, TimeWindow>() { @Override public void process(Integer integer, Context context, Iterable<OperationRecord> elements, Collector<Tuple3<Long, Integer, Set<Long>>> out) { int key = 0; HashSet<Long> set = new HashSet<>(); Iterator<OperationRecord> iter = elements.iterator(); if (iter.hasNext()) { OperationRecord next = iter.next(); key = next.getRoomid(); set.add(next.getUserid()); } while (iter.hasNext()) { set.add(iter.next().getUserid()); } out.collect(new Tuple3<>(context.window().getStart(), key, set)); } }); // 第一個功能實現,全網觀看人數。因爲通過了上一步的預聚合,這裏就能夠直接用windowAll來聚合了。 //固然,若是全網人多確實不少,那麼下面實現並不可行,畢竟set會變得很大。更可行的方法在後面的第二種思路。 //另外,下面是一種連續窗口的實現,即上一步[00:00:00~00:05:00)的結果會被髮到這裏的[00:00:00~00:05:00)窗口 SingleOutputStreamOperator<Tuple2<Long, Integer>> totalVisit = visitorsPerRoom .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new ProcessAllWindowFunction<Tuple3<Long, Integer, Set<Long>>, Tuple2<Long, Integer>, TimeWindow>() { @Override public void process(Context context, Iterable<Tuple3<Long, Integer, Set<Long>>> elements, Collector<Tuple2<Long, Integer>> out) throws Exception { HashSet<Long> set = new HashSet<>(); Iterator<Tuple3<Long, Integer, Set<Long>>> iter = elements.iterator(); while (iter.hasNext()) { set.addAll(iter.next().f2); } out.collect(new Tuple2<>(context.window().getStart(), set.size())); } }); // 在實現第三個功能前先進行一下數據清洗。 SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> visitPerRoom = visitorsPerRoom .map(new MapFunction<Tuple3<Long, Integer, Set<Long>>, Tuple3<Long, Integer, Integer>>() { @Override public Tuple3<Long, Integer, Integer> map(Tuple3<Long, Integer, Set<Long>> elem) { return new Tuple3<>(elem.f0, elem.f1, elem.f2.size()); } }); // 第三個功能,觀看人數最多的前10個房間 SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> topnRoom = visitPerRoom .windowAll(TumblingEventTimeWindows.of(Time.minutes(5))) .aggregate(new TopK2AllAggFunc(), new ProcessAllWindowFunction<Integer[][], Tuple3<Long, Integer, Integer>, TimeWindow>() { @Override public void process(Context context, Iterable<Integer[][]> elements, Collector<Tuple3<Long, Integer, Integer>> out) { Iterator<Integer[][]> iter = elements.iterator(); while (iter.hasNext()) { Integer[][] next = iter.next(); for (Integer[] room2visit : next) { out.collect(new Tuple3<>(context.window().getStart(), room2visit[0], room2visit[1])); } } } }); // TopK2AllAggFunc的中間結果這裏利用優先隊列來存儲。 public class TopK2AllAggFunc implements AggregateFunction<Tuple3<Long, Integer, Integer>, PriorityQueue<Integer[]>, Integer[][]> { /** * 0 => timestamp * 1 => roomid * 2 => num of visitor */ @Override public PriorityQueue<Integer[]> createAccumulator() { return new PriorityQueue<>(10, Comparator.comparing((elem) -> elem[1])); } @Override public PriorityQueue<Integer[]> add(Tuple3<Long, Integer, Integer> value, PriorityQueue<Integer[]> accumulator) { if (accumulator.size() < 10) { Integer[] room2visit = new Integer[2]; room2visit[0] = value.f1; room2visit[1] = value.f2; accumulator.add(room2visit); } else { Integer[] tmp = accumulator.poll(); if (tmp[1] < value.f2) { Integer[] room2visit = new Integer[2]; room2visit[0] = value.f1; room2visit[1] = value.f2; accumulator.add(room2visit); } else accumulator.add(tmp); } return accumulator; } @Override public Integer[][] getResult(PriorityQueue<Integer[]> accumulator) { List<Integer[]> list = new ArrayList<>(10); list.addAll(accumulator); Integer[][] res = new Integer[list.size()][2]; res = list.toArray(res); return res; } @Override public PriorityQueue<Integer[]> merge(PriorityQueue<Integer[]> acc1, PriorityQueue<Integer[]> acc2) { List<Integer[]> list = new ArrayList<>(10); list.addAll(acc2); Integer[][] acc2list = new Integer[list.size()][2]; acc2list = list.toArray(acc2list); for (int i = 0; i < acc2list.length; i++) { Integer[] curArr = acc2list[i]; if (acc1.size() < 10) { acc1.add(curArr); } else { Integer[] tmp = acc1.poll(); if (tmp[1] < curArr[1]) { acc1.add(curArr); } else acc1.add(tmp); } } return acc1; } } // 第四個功能,分類別的top10實現。這裏須要引入外部的維度數據,給每一個房間加上類別標籤。 // 這個維度表預先存儲在redis中,經過自定義sourcefunction來獲取,並利用broadcast來讓這個表存儲在每一個operator中,這樣就相似spark中的廣播變量了,每條須要處理的數據都能獲取到這個表的數據,實現以下。 DataStreamSource<String> room2cat = env.addSource(new MyRedisSource()); room2cat.name("RedisSource"); MapStateDescriptor<Integer, String> roomId2catDescriptor = new MapStateDescriptor<Integer, String>( "RoomId2catBroadcastState", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); BroadcastStream<String> bc = room2cat.broadcast(roomId2catDescriptor); SingleOutputStreamOperator<Tuple5<Long, Integer, String, Integer, Integer>> top2cat = visitPerRoom.connect(bc) .process(new Room2CatBCFunc()) .keyBy(elem -> elem.f1) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .process(new TopK2CatProcFunc()); public class Room2CatBCFunc extends BroadcastProcessFunction<Tuple3<Long, Integer, Integer>, String, Tuple3<Integer, String, Integer>> { /** * 0 => timestamp 去掉 * 1 => roomid * 2 => room_cat_name * 3 => num of visitors */ private static final MapStateDescriptor<Integer, String> roomId2catDescriptor = new MapStateDescriptor<>( "RoomId2catBroadcastState", BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); @Override public void processElement(Tuple3<Long, Integer, Integer> value, ReadOnlyContext ctx, Collector<Tuple3<Integer, String, Integer>> out) throws Exception { ReadOnlyBroadcastState<Integer, String> bcState = ctx.getBroadcastState(roomId2catDescriptor); String cat = bcState.get(value.f1); if (cat != null) { out.collect(new Tuple3<>(value.f1, cat, value.f2)); } else out.collect(new Tuple3<>(value.f1, "UNK", value.f2)); } @Override public void processBroadcastElement(String value, Context ctx, Collector<Tuple3<Integer, String, Integer>> out) throws Exception { String[] split = value.split("="); ctx.getBroadcastState(roomId2catDescriptor) .put(Integer.parseInt(split[0]), split[1]); } } public class TopK2CatProcFunc extends ProcessWindowFunction<Tuple3<Integer, String, Integer>, Tuple5<Long, Integer, String, Integer, Integer>, String, TimeWindow> { /** * input * 0 => roomid * 1 => room_cat_name * 2 => room_visitors_cnt * <p> * output * 0 => timestamp * 1 => roomid * 2 => room_cat_name * 3 => room_visitors_cnt * 4 => rangking */ @Override public void process(String s, Context context, Iterable<Tuple3<Integer, String, Integer>> elements, Collector<Tuple5<Long, Integer, String, Integer, Integer>> out) { PriorityQueue<Tuple3<Integer, String, Integer>> pq = new PriorityQueue<>(10, Comparator.comparing(e -> e.f2)); elements.forEach(elem -> { if (pq.size() < 10) { pq.add(elem); } else { Tuple3<Integer, String, Integer> tmp = pq.poll(); if (tmp.f2 < elem.f2) { pq.add(elem); } else { pq.add(tmp); } } }); List<Tuple3<Integer, String, Integer>> list = new ArrayList<>(10); list.addAll(pq); Tuple3<Integer, String, Integer>[] top10 = (Tuple3<Integer, String, Integer>[])new Object[10]; top10 = list.toArray(top10); Arrays.sort(top10, Comparator.comparingLong(elem -> - elem.f2)); for (int i = 0; i < top10.length; i++) { Tuple3<Integer, String, Integer> cur = top10[i]; Tuple5<Long, Integer, String, Integer, Integer> res = new Tuple5<>(context.window().getStart(), cur.f0, cur.f1, cur.f2, i + 1); out.collect(res); } } }
第二部分的DAG以下,圖標不能移動只能將就一下了。git
寫入Elasticsearch的代碼都是一個樣式,因此在這裏統一放出。redis
ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("localhost", 9200, "http")); ElasticsearchSink.Builder<Tuple5<Long, Integer, String, Integer, Integer>> esSinkBuilder4 = new ElasticsearchSink.Builder<>(httpHosts, new ElasticsearchSinkFunction<Tuple5<Long, Integer, String, Integer, Integer>>() { public IndexRequest createIndexRequest(Tuple5<Long, Integer, String, Integer, Integer> element) { /** * 0 => timestamp * 1 => roomid * 2 => category_name, * 3 => app_room_user_cnt, * 4 => rangking */ Map<String, Object> json = new HashMap<>(); json.put("timestamp", element.f0); json.put("roomid", element.f1); json.put("cat", element.f2); json.put("roomuser", element.f3); json.put("rank", element.f4); String date = INDEX_FORMAT.format(element.f0); // 惟一id String id = Long.toString(element.f0) + element.f2 + element.f4; return Requests.indexRequest() // index 按天來劃分 .index("digital_operation_cattop10-" + date) .type("cattop10") .id(id) .source(json); } @Override public void process(Tuple5<Long, Integer, String, Integer, Integer> element, RuntimeContext ctx, RequestIndexer indexer) { indexer.add(createIndexRequest(element)); } } ); //設置批量寫數據的緩衝區大小,實際工做中的時間這個值須要調大一些 esSinkBuilder4.setBulkFlushMaxActions(100); top2cat.addSink(esSinkBuilder4.build()).name("ElasticsearchSink_digital_operation_cattop10");
上面實現計算全站觀看人數統計時提到,若是數據量過大,用一個set是很差去重的。其實也能夠直接把每一個set的size加總。(同一個user同時觀看幾個主播的狀況應該很少吧)若是確實要全局去重,能夠嘗試下面結合timer的process function來模仿window計算。但在離線測試時結果會受各類因素影響,詳細看後面的總結。json
下面只展現計算每分鐘各房間的人數,全站人數能夠模仿這種方法,利用mapstate進行最終的加總。數據結構
// 計算每分鐘各房間的人數 SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> visitorsPerRoom = cleanStream .keyBy(OperationRecord::getRoomid) .process(new DistinctVisitorsProcFunc()); // 下面利用mapstate進行統計,這個state可以存儲到rockdb,因此可以接受更大量的數據。 public class DistinctVisitorsProcFunc extends KeyedProcessFunction<Integer, OperationRecord, Tuple3<Long, Integer, Integer>> { // 存儲當前roomid的unique visitors MapState<Long, Void> uniqueVisitorState = null; private static FastDateFormat TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS"); @Override public void open(Configuration parameters) throws Exception { MapStateDescriptor<Long, Void> uniqueVisitorStateDescriptor = new MapStateDescriptor<>( "UniqueVisitorState", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.VOID_TYPE_INFO); uniqueVisitorState = getRuntimeContext().getMapState(uniqueVisitorStateDescriptor); } @Override public void processElement(OperationRecord value, Context ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception { // 第一個條件針對第一條數據,實際中能夠省去,忽略第一條數據 if (ctx.timerService().currentWatermark() == Long.MIN_VALUE || value.getTimestamp() >= ctx.timerService().currentWatermark() - 10000L) { if (!uniqueVisitorState.contains(value.getUserid())) { uniqueVisitorState.put(value.getUserid(), null); } // 一分鐘登記一次 long time = (ctx.timestamp() / 60000 + 1) * 60000; ctx.timerService().registerEventTimeTimer(time); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception { int cnt = 0; List<Long> arr = new ArrayList<>(); for (Map.Entry<Long, Void> entry : uniqueVisitorState.entries()) { cnt++; arr.add(entry.getKey()); } // roomid及其在線人數 out.collect(new Tuple3<>(timestamp, ctx.getCurrentKey(), cnt)); // 本房間在線人數列表,經過sideoutput放出 OutputTag<List<Long>> outputTag = DigitalOperationMain.getOutputTag(); ctx.output(outputTag, arr); uniqueVisitorState.clear(); } }
編寫前,在肯定好輸入輸出後還要對整體實現有一個較爲詳細的規劃,好比用什麼函數實現什麼功能,有些實現能夠結合一次處理。多線程
每一個功能的實現要寫一個後立刻檢查,省得把錯誤或不合理的代碼運用到後面的功能。併發
flink離線測試的結果會受到下面幾個因素的影響(若是用windowfunction,那通常不會有影響,有問題的是和ontime相關的操做。但onetime真正實時處理的結果通常不會有問題,這主要是由於真正實時處理時每條數據的時間跨度不會很大,即連續數據的時間戳相差很小,而若是有點大,那這段時間的空檔也足夠超過buffer的timeout,從而flink能在這段空隙間完成watermark的更新)。app
時間語意:elasticsearch
process time:每條數據被打上到達flink時的時間戳,不考慮數據到達的順序,沒有遲到數據。若是source的parallelism爲1,且數據發送順序不變,那麼結果是肯定的、可復現的。但若是source的parallelism不是1,致使數據順序不肯定,那麼結果則是不肯定的。
event:每條數據被打上自身的時間戳(避免map後丟失了時間戳屬性),並利用這些時間戳來推進watermark,此時須要考慮數據到達的順序,結果通常是肯定的、可復現的(使用process function 的 ontime除外)。
理解watermark的前進須要先理解Periodic Watermarks和Punctuated Watermarks。
Periodic經過實現AssignerWithPeriodicWatermarks
來抽取數據時間戳和產生watermark,它會週期性地調用getCurrentWatermark
檢查watermark是否須要前進,經過env.getConfig().setAutoWatermarkInterval(0L);
配置。
注意是檢查,並不表明調用getCurrentWatermark
後watermark就會前進,取決於具體實現。例如BoundedOutOfOrdernessTimestampExtractor
的實現就是按照currentMaxTimestamp - maxOutOfOrderness
來設置watermark的,因此若是有一條比後來數據提前不少的數據出現,即其時間戳比其餘數據大不少,那麼watermark也會有一段時間中止不前。
若是setAutoWatermarkInterval
設置過大,在數據亂序嚴重的狀況,如未排序的離線數據,會出現大量大於watermark的數據進入flink,但watermark並不前進,由於還沒到下一個檢查週期。另外,即使把它設置得足夠小,它也不可能像Punctuated那樣作到緊跟在每條數據後面,它須要等一批數據(buffer)處理完後才能調用。
Punctuated經過AssignerWithPunctuatedWatermarks
實現,與前者的不一樣是,它會針對每一條數據都會調用checkAndGetNextWatermark
來檢查是否須要產生新的watermark。
注意新的watermark會跟在當前數據的後面(watermark自己就是一條含有時間戳的數據),因此會發如今後續operator計算中,即使watermark更新了,也只是前面的operator更新了,後面的尚未更新。
併發度:測試先用1個併發度比較好理解。
窗口函數和結合timer的process function(第二種思路)
先建立index templates,這裏針對「分類目主播Top10」功能的templates進行展現,由於上面的elasticsearch展現的也是這個功能。注意下面編號部分要與前面的elasticsearchsink一致。
PUT _template/digital_operation_cattop10_template { "index_patterns": ["digital_operation_cattop10-*"], // 1 "settings": { "number_of_shards": 3, "number_of_replicas": 1 }, "mappings": { "cattop10": { // 2 "properties": { "roomid": { // 3 "type": "integer" }, "cat": { // 4 "type": "keyword" }, "roomuser": { // 5 "type": "integer" }, "rank": { // 6 "type": "integer" }, "timestamp": { // 7 "type": "date", "format": "epoch_millis" } } } } }
視頻核心指標監控
人均卡頓次數有點多,這與模擬數據有關,大部分結果都用python的pandas進行了的驗證。
直播數字化運營