Flink 實戰 : 統計網站PV,UV

Flink 實戰:統計網站PV,UV

PV,UV

  • PV(Page View) : 頁面點擊次數
  • UV(User View): 獨立用戶訪問次數

假定需求以下,每間隔1分鐘,統計過去5分鐘的UV,PV。很容易想到,經過數據庫的count,以及count distinct能夠得出正確結果。在大數據量下,傳統數據庫或者HADOOP(hbase...)的count效率都不高。若是數據是增量的,那麼流式計算每每能提供更高的吞吐和更低的延時。html

接下來經過使用Flink實現這個功能,並借這個案例描述一些Flink的基本概念。若是對其餘流式計算框架有所瞭解,能夠發現許多東西是互通的。java

Window

很容易理解,在這個案例中,咱們須要在內存中緩存5分鐘的數據,時間往前推移到一分鐘的時候,統計一次,而且清理數據。數據庫

Flink提供了多種窗口,能夠按需選擇。apache

Event Time

考慮到網絡的延遲和數據的亂序,不能簡單的使用Flink的系統時間作統計。例如14:25分的數據可能在14:27分纔到系統中,若是直接按Flink系統時間,即會影響14:20~14:25這段時間的計算結果,同時也會影響14:25~14:30的計算結果。windows

在Flink中,有如下三種時間特徵,查看詳細說明緩存

  • Processing time:Operator處理數據的時間。
  • Event time : 事件發生時間。
  • Ingestion time:被Flink攝入的時間。

在統計PV UV時,咱們須要根據用戶訪問的時間,因此使用Event Time。網絡

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

接着,咱們須要告知Flink記錄的真實時間timestamp,以及觸發window計算的watermark。在Flink中經過實現接口AssignerWithPeriodicWatermarks來完成。app

考慮到數據可能亂序,選擇BoundedOutOfOrdernessTimestampExtractor:框架

long MAX_EVENT_DELAY = 3500;
      BoundedOutOfOrdernessTimestampExtractor<String> assigner = new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(MAX_EVENT_DELAY)) {
            @Override
            public long extractTimestamp(String element) {
                VisitEvent visitEvent = null;
                try {
                    visitEvent = objectMapper.readValue(element, VisitEvent.class);
                    return visitEvent.getVisitTime();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return Instant.now().toEpochMilli();
            }
        };

上面的一些代碼主要用來作時間的處理,真實的計算經過window來完成,代碼以下。ide

int[] arr = {0,2};
        FlinkKafkaConsumerBase<String> consumerWithEventTime = myConsumer.assignTimestampsAndWatermarks(assigner);
        TypeInformation<Tuple3<String, VisitEvent, String>> typeInformation = TypeInformation.of(new TypeHint<Tuple3<String, VisitEvent, String>>() {});
        DataStreamSource<String> dataStreamByEventTime = env.addSource(consumerWithEventTime);
        SingleOutputStreamOperator<UrlVisitBy> uvCounter = dataStreamByEventTime
                .map(str->objectMapper.readValue(str,VisitEvent.class))
                .map(visitEvent-> new Tuple3<>(visitEvent.getVisitUrl(), visitEvent,visitEvent.getVisitUserId()))
                .returns(typeInformation)
                .keyBy(arr)
                .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1),Time.hours(-8)))
                .allowedLateness(Time.minutes(1))
                .process(new ProcessWindowFunction<Tuple3<String, VisitEvent, String>, UrlVisitBy, Tuple, TimeWindow>() {
                    @Override
                    public void process(Tuple tuple, Context context, Iterable<Tuple3<String, VisitEvent, String>> elements, Collector<UrlVisitBy> out) throws Exception {
                        long count = 0;
                        Tuple2<String,String> tuple2 = null;
                        if (tuple instanceof Tuple2){
                            tuple2 = (Tuple2) tuple;
                        }
                        for (Tuple3<String, VisitEvent, String> element : elements) {
                            count++;
                        };
                        TimeWindow window = context.window();
                        out.collect(new UrlVisitBy(window.getStart(),window.getEnd(),tuple2.f0,count,tuple2.f1));
                    }
                });
        uvCounter.print();

建議使用returns(typeInformation)

因爲JDK默認的編譯器在編譯過程當中會擦除泛型信息,這樣Flink在執行的時候沒法獲取足夠的信息來推斷真實類型,那麼可能會碰到這樣的錯誤「The generic type parameters of 'XXX' are missing」。

如今只有Eclipse JDT compiler在編譯後能夠保留足夠的信息,可是它限制了開發者只能使用Eclipse編譯以及調試。另外因爲兼容性問題,Eclipse對Flink的支持並不友好。官方推薦使用Intelij idea。

爲了擺脫編譯器的限制,Flink採用了TypeInfomation告知Flink真實類型。

相關文章
相關標籤/搜索