假定需求以下,每間隔1分鐘,統計過去5分鐘的UV,PV。很容易想到,經過數據庫的count,以及count distinct能夠得出正確結果。在大數據量下,傳統數據庫或者HADOOP(hbase...)的count效率都不高。若是數據是增量的,那麼流式計算每每能提供更高的吞吐和更低的延時。html
接下來經過使用Flink實現這個功能,並借這個案例描述一些Flink的基本概念。若是對其餘流式計算框架有所瞭解,能夠發現許多東西是互通的。java
很容易理解,在這個案例中,咱們須要在內存中緩存5分鐘的數據,時間往前推移到一分鐘的時候,統計一次,而且清理數據。數據庫
Flink提供了多種窗口,能夠按需選擇。apache
考慮到網絡的延遲和數據的亂序,不能簡單的使用Flink的系統時間作統計。例如14:25分的數據可能在14:27分纔到系統中,若是直接按Flink系統時間,即會影響14:20~14:25這段時間的計算結果,同時也會影響14:25~14:30的計算結果。windows
在Flink中,有如下三種時間特徵,查看詳細說明:緩存
在統計PV UV時,咱們須要根據用戶訪問的時間,因此使用Event Time。網絡
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
接着,咱們須要告知Flink記錄的真實時間timestamp,以及觸發window計算的watermark。在Flink中經過實現接口AssignerWithPeriodicWatermarks來完成。app
考慮到數據可能亂序,選擇BoundedOutOfOrdernessTimestampExtractor:框架
long MAX_EVENT_DELAY = 3500; BoundedOutOfOrdernessTimestampExtractor<String> assigner = new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(MAX_EVENT_DELAY)) { @Override public long extractTimestamp(String element) { VisitEvent visitEvent = null; try { visitEvent = objectMapper.readValue(element, VisitEvent.class); return visitEvent.getVisitTime(); } catch (IOException e) { e.printStackTrace(); } return Instant.now().toEpochMilli(); } };
上面的一些代碼主要用來作時間的處理,真實的計算經過window來完成,代碼以下。ide
int[] arr = {0,2}; FlinkKafkaConsumerBase<String> consumerWithEventTime = myConsumer.assignTimestampsAndWatermarks(assigner); TypeInformation<Tuple3<String, VisitEvent, String>> typeInformation = TypeInformation.of(new TypeHint<Tuple3<String, VisitEvent, String>>() {}); DataStreamSource<String> dataStreamByEventTime = env.addSource(consumerWithEventTime); SingleOutputStreamOperator<UrlVisitBy> uvCounter = dataStreamByEventTime .map(str->objectMapper.readValue(str,VisitEvent.class)) .map(visitEvent-> new Tuple3<>(visitEvent.getVisitUrl(), visitEvent,visitEvent.getVisitUserId())) .returns(typeInformation) .keyBy(arr) .window(SlidingProcessingTimeWindows.of(Time.minutes(5), Time.minutes(1),Time.hours(-8))) .allowedLateness(Time.minutes(1)) .process(new ProcessWindowFunction<Tuple3<String, VisitEvent, String>, UrlVisitBy, Tuple, TimeWindow>() { @Override public void process(Tuple tuple, Context context, Iterable<Tuple3<String, VisitEvent, String>> elements, Collector<UrlVisitBy> out) throws Exception { long count = 0; Tuple2<String,String> tuple2 = null; if (tuple instanceof Tuple2){ tuple2 = (Tuple2) tuple; } for (Tuple3<String, VisitEvent, String> element : elements) { count++; }; TimeWindow window = context.window(); out.collect(new UrlVisitBy(window.getStart(),window.getEnd(),tuple2.f0,count,tuple2.f1)); } }); uvCounter.print();
因爲JDK默認的編譯器在編譯過程當中會擦除泛型信息,這樣Flink在執行的時候沒法獲取足夠的信息來推斷真實類型,那麼可能會碰到這樣的錯誤「The generic type parameters of 'XXX' are missing」。
如今只有Eclipse JDT compiler在編譯後能夠保留足夠的信息,可是它限制了開發者只能使用Eclipse編譯以及調試。另外因爲兼容性問題,Eclipse對Flink的支持並不友好。官方推薦使用Intelij idea。
爲了擺脫編譯器的限制,Flink採用了TypeInfomation告知Flink真實類型。