基於Flink的視頻直播案例(下)

直播數字化運營

業務目標python

  • 全站觀看直播總人數以及走勢
  • 房間直播總人數以及走勢
  • 熱門直播房間及主播Top10,分類目主播Top10
// 開始和上一個業務同樣,建立cleanMapFun來提取須要的數據屬性,這裏只須要時間戳、roomid和userid三個屬性

// 第二個功能:先計算每5分鐘各房間的人數,這樣能同時爲總人數的計算進行預聚合。這裏直接利用ProcessWindowFunction進行計算,而不是先aggregate後再進行process計算。這裏想的是因爲窗口變小,因此積聚的數據可能不會太多,並且獲得數據後一次性計算要更快。
SingleOutputStreamOperator<Tuple3<Long, Integer, Set<Long>>> visitorsPerRoom = cleanStream
        .keyBy(OperationRecord::getRoomid)
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .process(new ProcessWindowFunction<OperationRecord, Tuple3<Long, Integer, Set<Long>>, Integer,
                TimeWindow>() {
            @Override
            public void process(Integer integer, Context context, Iterable<OperationRecord> elements,
                                Collector<Tuple3<Long, Integer, Set<Long>>> out) {
                int key = 0;
                HashSet<Long> set = new HashSet<>();
                Iterator<OperationRecord> iter = elements.iterator();
                if (iter.hasNext()) {
                    OperationRecord next = iter.next();
                    key = next.getRoomid();
                    set.add(next.getUserid());
                }
                while (iter.hasNext()) {
                    set.add(iter.next().getUserid());
                }
                out.collect(new Tuple3<>(context.window().getStart(), key, set));
            }
        });

// 第一個功能實現,全網觀看人數。因爲通過了上一步的預聚合,這裏就能夠直接用windowAll來聚合了。
//固然,若是全網人多確實不少,那麼下面實現並不可行,畢竟set會變得很大。更可行的方法在後面的第二種思路。
//另外,下面是一種連續窗口的實現,即上一步[00:00:00~00:05:00)的結果會被髮到這裏的[00:00:00~00:05:00)窗口
SingleOutputStreamOperator<Tuple2<Long, Integer>> totalVisit = visitorsPerRoom
           .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
           .process(new ProcessAllWindowFunction<Tuple3<Long, Integer, Set<Long>>, Tuple2<Long, Integer>,
                   TimeWindow>() {
               @Override
               public void process(Context context, Iterable<Tuple3<Long, Integer, Set<Long>>> elements,
                                   Collector<Tuple2<Long, Integer>> out) throws Exception {
                   HashSet<Long> set = new HashSet<>();
                   Iterator<Tuple3<Long, Integer, Set<Long>>> iter = elements.iterator();
                   while (iter.hasNext()) {
                       set.addAll(iter.next().f2);
                   }
                   out.collect(new Tuple2<>(context.window().getStart(), set.size()));
               }
           });

// 在實現第三個功能前先進行一下數據清洗。
SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> visitPerRoom = visitorsPerRoom
        .map(new MapFunction<Tuple3<Long, Integer, Set<Long>>, Tuple3<Long, Integer, Integer>>() {
            @Override
            public Tuple3<Long, Integer, Integer> map(Tuple3<Long, Integer, Set<Long>> elem) {
                return new Tuple3<>(elem.f0, elem.f1, elem.f2.size());
            }
        });

// 第三個功能,觀看人數最多的前10個房間
SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> topnRoom = visitPerRoom
        .windowAll(TumblingEventTimeWindows.of(Time.minutes(5)))
        .aggregate(new TopK2AllAggFunc(), new ProcessAllWindowFunction<Integer[][],
                Tuple3<Long, Integer, Integer>, TimeWindow>() {
            @Override
            public void process(Context context, Iterable<Integer[][]> elements,
                                Collector<Tuple3<Long, Integer, Integer>> out) {
                Iterator<Integer[][]> iter = elements.iterator();
                while (iter.hasNext()) {
                    Integer[][] next = iter.next();
                    for (Integer[] room2visit : next) {
                        out.collect(new Tuple3<>(context.window().getStart(), room2visit[0], room2visit[1]));
                    }
                }
            }
        });
// TopK2AllAggFunc的中間結果這裏利用優先隊列來存儲。
public class TopK2AllAggFunc implements AggregateFunction<Tuple3<Long, Integer, Integer>, PriorityQueue<Integer[]>,
        Integer[][]> {

    /**
     * 0 => timestamp
     * 1 => roomid
     * 2 => num of visitor
     */

    @Override
    public PriorityQueue<Integer[]> createAccumulator() {
        return new PriorityQueue<>(10, Comparator.comparing((elem) -> elem[1]));
    }

    @Override
    public PriorityQueue<Integer[]> add(Tuple3<Long, Integer, Integer> value, PriorityQueue<Integer[]> accumulator) {

        if (accumulator.size() < 10) {
            Integer[] room2visit = new Integer[2];
            room2visit[0] = value.f1;
            room2visit[1] = value.f2;
            accumulator.add(room2visit);
        } else {
            Integer[] tmp = accumulator.poll();
            if (tmp[1] < value.f2) {
                Integer[] room2visit = new Integer[2];
                room2visit[0] = value.f1;
                room2visit[1] = value.f2;
                accumulator.add(room2visit);
            } else accumulator.add(tmp);
        }
        
        return accumulator;
    }

    @Override
    public Integer[][] getResult(PriorityQueue<Integer[]> accumulator) {

        List<Integer[]> list = new ArrayList<>(10);
        list.addAll(accumulator);
        Integer[][] res = new Integer[list.size()][2];
        res = list.toArray(res);

        return res;
    }

    @Override
    public PriorityQueue<Integer[]> merge(PriorityQueue<Integer[]> acc1, PriorityQueue<Integer[]> acc2) {

        List<Integer[]> list = new ArrayList<>(10);
        list.addAll(acc2);
        Integer[][] acc2list = new Integer[list.size()][2];
        acc2list = list.toArray(acc2list);

        for (int i = 0; i < acc2list.length; i++) {
            Integer[] curArr = acc2list[i];
            if (acc1.size() < 10) {
                acc1.add(curArr);
            } else {
                Integer[] tmp = acc1.poll();
                if (tmp[1] < curArr[1]) {
                    acc1.add(curArr);
                } else acc1.add(tmp);
            }
        }

        return acc1;
    }
}

// 第四個功能,分類別的top10實現。這裏須要引入外部的維度數據,給每一個房間加上類別標籤。
// 這個維度表預先存儲在redis中,經過自定義sourcefunction來獲取,並利用broadcast來讓這個表存儲在每一個operator中,這樣就相似spark中的廣播變量了,每條須要處理的數據都能獲取到這個表的數據,實現以下。
DataStreamSource<String> room2cat = env.addSource(new MyRedisSource());
room2cat.name("RedisSource");

MapStateDescriptor<Integer, String> roomId2catDescriptor =
        new MapStateDescriptor<Integer, String>(
                "RoomId2catBroadcastState",
                BasicTypeInfo.INT_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

BroadcastStream<String> bc = room2cat.broadcast(roomId2catDescriptor);

SingleOutputStreamOperator<Tuple5<Long, Integer, String, Integer, Integer>> top2cat = visitPerRoom.connect(bc)
        .process(new Room2CatBCFunc())
        .keyBy(elem -> elem.f1)
        .window(TumblingEventTimeWindows.of(Time.minutes(5)))
        .process(new TopK2CatProcFunc());

public class Room2CatBCFunc extends BroadcastProcessFunction<Tuple3<Long, Integer, Integer>, String,
        Tuple3<Integer, String, Integer>> {

    /**
     * 0 => timestamp 去掉
     * 1 => roomid
     * 2 => room_cat_name
     * 3 => num of visitors
     */

    private static final MapStateDescriptor<Integer, String> roomId2catDescriptor =
            new MapStateDescriptor<>(
                    "RoomId2catBroadcastState",
                    BasicTypeInfo.INT_TYPE_INFO,
                    BasicTypeInfo.STRING_TYPE_INFO);

    @Override
    public void processElement(Tuple3<Long, Integer, Integer> value, ReadOnlyContext ctx,
                               Collector<Tuple3<Integer, String, Integer>> out) throws Exception {
        ReadOnlyBroadcastState<Integer, String> bcState = ctx.getBroadcastState(roomId2catDescriptor);
        String cat = bcState.get(value.f1);
        if (cat != null) {
            out.collect(new Tuple3<>(value.f1, cat, value.f2));
        } else out.collect(new Tuple3<>(value.f1, "UNK", value.f2));
    }

    @Override
    public void processBroadcastElement(String value, Context ctx,
                                        Collector<Tuple3<Integer, String, Integer>> out) throws Exception {
        String[] split = value.split("=");
        ctx.getBroadcastState(roomId2catDescriptor)
                .put(Integer.parseInt(split[0]), split[1]);
    }
}

public class TopK2CatProcFunc extends ProcessWindowFunction<Tuple3<Integer, String, Integer>,
        Tuple5<Long, Integer, String, Integer, Integer>, String, TimeWindow> {


    /**
     * input
     * 0 => roomid
     * 1 => room_cat_name
     * 2 => room_visitors_cnt
     * <p>
     * output
     * 0 => timestamp
     * 1 => roomid
     * 2 => room_cat_name
     * 3 => room_visitors_cnt
     * 4 => rangking
     */

    @Override
    public void process(String s, Context context, Iterable<Tuple3<Integer, String, Integer>> elements,
                        Collector<Tuple5<Long, Integer, String, Integer, Integer>> out) {
        PriorityQueue<Tuple3<Integer, String, Integer>> pq = new PriorityQueue<>(10, Comparator.comparing(e -> e.f2));
        elements.forEach(elem -> {
            if (pq.size() < 10) {
                pq.add(elem);
            } else {
                Tuple3<Integer, String, Integer> tmp = pq.poll();
                if (tmp.f2 < elem.f2) {
                    pq.add(elem);
                } else {
                    pq.add(tmp);
                }
            }
        });

        List<Tuple3<Integer, String, Integer>> list = new ArrayList<>(10);
        list.addAll(pq);
        Tuple3<Integer, String, Integer>[] top10 = (Tuple3<Integer, String, Integer>[])new Object[10];
        top10 = list.toArray(top10);

        Arrays.sort(top10, Comparator.comparingLong(elem -> - elem.f2));

        for (int i = 0; i < top10.length; i++) {
            Tuple3<Integer, String, Integer> cur = top10[i];
            Tuple5<Long, Integer, String, Integer, Integer> res = new Tuple5<>(context.window().getStart(), cur.f0, cur.f1, cur.f2, i + 1);
            out.collect(res);
        }
    }
}

第二部分的DAG以下,圖標不能移動只能將就一下了。git

結果寫入Elasticsearch

寫入Elasticsearch的代碼都是一個樣式,因此在這裏統一放出。redis

ArrayList<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("localhost", 9200, "http"));
ElasticsearchSink.Builder<Tuple5<Long, Integer, String, Integer, Integer>> esSinkBuilder4 =
        new ElasticsearchSink.Builder<>(httpHosts,
                new ElasticsearchSinkFunction<Tuple5<Long, Integer, String, Integer, Integer>>() {
                    public IndexRequest createIndexRequest(Tuple5<Long, Integer, String, Integer, Integer> element) {
 
                        /**
                         * 0 => timestamp
                         * 1 => roomid
                         * 2 => category_name,
                         * 3 => app_room_user_cnt,
                         * 4 => rangking
                         */
                        Map<String, Object> json = new HashMap<>();
                        json.put("timestamp", element.f0);
                        json.put("roomid", element.f1);
                        json.put("cat", element.f2);
                        json.put("roomuser", element.f3);
                        json.put("rank", element.f4);
                        String date = INDEX_FORMAT.format(element.f0);
 
                        // 惟一id
                        String id = Long.toString(element.f0) + element.f2 + element.f4;
 
                        return Requests.indexRequest()
                                // index 按天來劃分
                                .index("digital_operation_cattop10-" + date)
                                .type("cattop10")
                                .id(id)
                                .source(json);
                    }

                    @Override
                    public void process(Tuple5<Long, Integer, String, Integer, Integer> element,
                                        RuntimeContext ctx, RequestIndexer indexer) {
                        indexer.add(createIndexRequest(element));
                    }
                }
        );
//設置批量寫數據的緩衝區大小,實際工做中的時間這個值須要調大一些
esSinkBuilder4.setBulkFlushMaxActions(100);
top2cat.addSink(esSinkBuilder4.build()).name("ElasticsearchSink_digital_operation_cattop10");

第二種思路

上面實現計算全站觀看人數統計時提到,若是數據量過大,用一個set是很差去重的。其實也能夠直接把每一個set的size加總。(同一個user同時觀看幾個主播的狀況應該很少吧)若是確實要全局去重,能夠嘗試下面結合timer的process function來模仿window計算。但在離線測試時結果會受各類因素影響,詳細看後面的總結。json

下面只展現計算每分鐘各房間的人數,全站人數能夠模仿這種方法,利用mapstate進行最終的加總。數據結構

// 計算每分鐘各房間的人數
SingleOutputStreamOperator<Tuple3<Long, Integer, Integer>> visitorsPerRoom = cleanStream
       .keyBy(OperationRecord::getRoomid)
       .process(new DistinctVisitorsProcFunc());

// 下面利用mapstate進行統計,這個state可以存儲到rockdb,因此可以接受更大量的數據。
public class DistinctVisitorsProcFunc extends KeyedProcessFunction<Integer, OperationRecord, Tuple3<Long, Integer, Integer>> {

    // 存儲當前roomid的unique visitors
    MapState<Long, Void> uniqueVisitorState = null;

    private static FastDateFormat TIME_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSS");

    @Override
    public void open(Configuration parameters) throws Exception {

        MapStateDescriptor<Long, Void> uniqueVisitorStateDescriptor =
                new MapStateDescriptor<>(
                        "UniqueVisitorState",
                        BasicTypeInfo.LONG_TYPE_INFO,
                        BasicTypeInfo.VOID_TYPE_INFO);
        uniqueVisitorState = getRuntimeContext().getMapState(uniqueVisitorStateDescriptor);

    }

    @Override
    public void processElement(OperationRecord value, Context ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws
            Exception {
        // 第一個條件針對第一條數據,實際中能夠省去,忽略第一條數據
        if (ctx.timerService().currentWatermark() == Long.MIN_VALUE ||
                value.getTimestamp() >= ctx.timerService().currentWatermark() - 10000L) {
            if (!uniqueVisitorState.contains(value.getUserid())) {
                uniqueVisitorState.put(value.getUserid(), null);
            }
            // 一分鐘登記一次
            long time = (ctx.timestamp() / 60000 + 1) * 60000;
            ctx.timerService().registerEventTimeTimer(time);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<Long, Integer, Integer>> out) throws Exception {
        int cnt = 0;
        List<Long> arr = new ArrayList<>();
        for (Map.Entry<Long, Void> entry : uniqueVisitorState.entries()) {
            cnt++;
            arr.add(entry.getKey());
        }

        // roomid及其在線人數
        out.collect(new Tuple3<>(timestamp, ctx.getCurrentKey(), cnt));

        // 本房間在線人數列表,經過sideoutput放出
        OutputTag<List<Long>> outputTag = DigitalOperationMain.getOutputTag();
        ctx.output(outputTag, arr);

        uniqueVisitorState.clear();
    }
}

Flink實現總結

  • 編寫前,在肯定好輸入輸出後還要對整體實現有一個較爲詳細的規劃,好比用什麼函數實現什麼功能,有些實現能夠結合一次處理。多線程

  • 每一個功能的實現要寫一個後立刻檢查,省得把錯誤或不合理的代碼運用到後面的功能。併發

  • flink離線測試的結果會受到下面幾個因素的影響(若是用windowfunction,那通常不會有影響,有問題的是和ontime相關的操做。但onetime真正實時處理的結果通常不會有問題,這主要是由於真正實時處理時每條數據的時間跨度不會很大,即連續數據的時間戳相差很小,而若是有點大,那這段時間的空檔也足夠超過buffer的timeout,從而flink能在這段空隙間完成watermark的更新)。app

    • 時間語意:elasticsearch

      • process time:每條數據被打上到達flink時的時間戳,不考慮數據到達的順序,沒有遲到數據。若是source的parallelism爲1,且數據發送順序不變,那麼結果是肯定的、可復現的。但若是source的parallelism不是1,致使數據順序不肯定,那麼結果則是不肯定的。

      • event:每條數據被打上自身的時間戳(避免map後丟失了時間戳屬性),並利用這些時間戳來推進watermark,此時須要考慮數據到達的順序,結果通常是肯定的、可復現的(使用process function 的 ontime除外)。

        理解watermark的前進須要先理解Periodic Watermarks和Punctuated Watermarks。

        • Periodic經過實現AssignerWithPeriodicWatermarks來抽取數據時間戳和產生watermark,它會週期性地調用getCurrentWatermark檢查watermark是否須要前進,經過env.getConfig().setAutoWatermarkInterval(0L);配置。

          注意是檢查,並不表明調用getCurrentWatermark後watermark就會前進,取決於具體實現。例如BoundedOutOfOrdernessTimestampExtractor的實現就是按照currentMaxTimestamp - maxOutOfOrderness來設置watermark的,因此若是有一條比後來數據提前不少的數據出現,即其時間戳比其餘數據大不少,那麼watermark也會有一段時間中止不前。

          若是setAutoWatermarkInterval設置過大,在數據亂序嚴重的狀況,如未排序的離線數據,會出現大量大於watermark的數據進入flink,但watermark並不前進,由於還沒到下一個檢查週期。另外,即使把它設置得足夠小,它也不可能像Punctuated那樣作到緊跟在每條數據後面,它須要等一批數據(buffer)處理完後才能調用。

        • Punctuated經過AssignerWithPunctuatedWatermarks實現,與前者的不一樣是,它會針對每一條數據都會調用checkAndGetNextWatermark來檢查是否須要產生新的watermark。

        • 注意新的watermark會跟在當前數據的後面(watermark自己就是一條含有時間戳的數據),因此會發如今後續operator計算中,即使watermark更新了,也只是前面的operator更新了,後面的尚未更新。

    • 併發度:測試先用1個併發度比較好理解。

    • 窗口函數和結合timer的process function(第二種思路)

      • window函數主要有三種,reduce、aggregate和processwindow,前二者只存儲一個數據,本質分別是ReducingState和AggregatingState,都相似於ListState。因爲不須要存儲整個窗口的數據,而是每當數據到達時就進行聚合,因此比processwindow更有效率地利用內存。但reduce的限制是input和output須要相同類型,而aggregate能夠不一樣,但存儲中間結果的數據結構須要比較普通的,好比Tuple。曾經嘗使用Tuple2<Long, HashSet>來存儲中間結果來實現去重,結果報錯,但若是直接存HashSet應該沒問題。而後是processwindow,操做比較簡單,由於數據都存儲好了,只等待調用iterator來取。此時新建HashSet也能實現去重,這種方法的不足就是flink須要存儲觸發processwindow前的全部數據,並且這個processwindow須要一次性處理完數據,這個計算過程沒有checkpoint。不過若是窗口跨度不大,影響應該不大。
      • 結合timer的process function一樣能夠實現相似window的功能,並且可以使用state,進而也能實現更多功能,好比針對某個用戶,若是訪問次數達到閾值別不在處理其數據,又或者多維度去重等。但這種函數比window更復雜。在利用它實現相似窗口計算的功能時會出現一些瑕疵。正如上面所說,產生新watermark的數據會在此watermark的前面。假設設定了1分鐘的timer來實現1分鐘的窗口計算,那麼若是有兩條連續數據被這個timer的時間戳分開,那麼後面一條數據也會被算進這個「窗口」,這是由於後面數據更新的時間戳跟在它的後面,在觸發timer計算前這條數據已經被處理了。若是在多線程環境下,瑕疵會更多,例如一堆數據被處理了,但依然沒有觸發ontimer。這是由於processelement和ontimer並不是連續執行,若是時間變化加大的數據被分到同一buffer,那麼就可能遇處處理的數據已經跨過1小時,但設定1分鐘後觸發的timer並無被調用的狀況。不過這通常只出現非生產環境下,由於在buffer不大,timeout不長的狀況下,很難會出現一個buffer有這麼大的時間跨度,更通常的狀況是處理完一個buffer,watermark才前進1s。固然,若是須要極爲嚴格的處理,window函數就不會出現這種狀況,由於window函數會根據數據的時間戳進行劃分。
      • 整體來講window更容易實現,特別在簡單聚合方面效率還很好。而結合timer的process function比較複雜,雖然模擬窗口計算可能會有瑕疵,但若是不要求絕對精確,那麼其複雜聚合的效率應該比window好(涉及processwindow的)。

Elasticsearch部分

先建立index templates,這裏針對「分類目主播Top10」功能的templates進行展現,由於上面的elasticsearch展現的也是這個功能。注意下面編號部分要與前面的elasticsearchsink一致。

PUT _template/digital_operation_cattop10_template
{
    "index_patterns": ["digital_operation_cattop10-*"], // 1
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings": {
        "cattop10": { // 2
            "properties": {
                "roomid": { // 3
                    "type": "integer"
                },
                "cat": { // 4
                    "type": "keyword"
                },
                "roomuser": { // 5
                    "type": "integer"
                },
                "rank": { // 6
                    "type": "integer"
                },
                "timestamp": { // 7
                    "type": "date",
                    "format": "epoch_millis"
                }
            }
        }
    }
}

Kibana部分

視頻核心指標監控

人均卡頓次數有點多,這與模擬數據有關,大部分結果都用python的pandas進行了的驗證。

直播數字化運營

相關文章
相關標籤/搜索