在上一篇入門教程中,咱們已經可以快速構建一個基礎的 Flink 程序了。本文會一步步地帶領你實現一個更復雜的 Flink 應用程序:實時熱門商品。在開始本文前咱們建議你先實踐一遍上篇文章,由於本文會沿用上文的my-flink-project項目框架。java
經過本文你將學到:git
「實時熱門商品」的需求,咱們能夠將「實時熱門商品」翻譯成程序員更好理解的需求:每隔5分鐘輸出最近一小時內點擊量最多的前 N 個商品。將這個需求進行分解咱們大概要作這麼幾件事情:程序員
這裏咱們準備了一份淘寶用戶行爲數據集(來自阿里雲天池公開數據集,特別感謝)。本數據集包含了淘寶上某一天隨機一百萬用戶的全部行爲(包括點擊、購買、加購、收藏)。數據集的組織形式和MovieLens-20M相似,即數據集的每一行表示一條用戶行爲,由用戶ID、商品ID、商品類目ID、行爲類型和時間戳組成,並以逗號分隔。關於數據集中每一列的詳細描述以下:github
列名稱 | 說明 |
---|---|
用戶ID | 整數類型,加密後的用戶ID |
商品ID | 整數類型,加密後的商品ID |
商品類目ID | 整數類型,加密後的商品所屬類目ID |
行爲類型 | 字符串,枚舉類型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’) |
時間戳 | 行爲發生的時間戳,單位秒 |
你能夠經過下面的命令下載數據集到項目的 resources 目錄下:數據結構
$ cd my-flink-project/src/main/resources $ curl https://raw.githubusercontent.com/wuchong/my-flink-project/master/src/main/resources/UserBehavior.csv > UserBehavior.csv
這裏是否使用 curl 命令下載數據並不重要,你也可使用 wget 命令或者直接訪問連接下載數據。關鍵是,將數據文件保存到項目的 resources 目錄下,方便應用程序訪問。併發
在 src/main/java/myflink 下建立 HotItems.java 文件:app
package myflink; public class HotItems { public static void main(String[] args) throws Exception { } }
與上文同樣,咱們會一步步往裏面填充代碼。第一步仍然是建立一個 StreamExecutionEnvironment,咱們把它添加到 main 函數中。框架
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 爲了打印到控制檯的結果不亂序,咱們配置全局的併發爲1,這裏改變併發對結果正確性沒有影響 env.setParallelism(1);
在數據準備章節,咱們已經將測試的數據集下載到本地了。因爲是一個csv文件,咱們將使用 CsvInputFormat 建立模擬數據源。curl
注:雖然一個流式應用應該是一個一直運行着的程序,須要消費一個無限數據源。可是在本案例教程中,爲了省去構建真實數據源的繁瑣,咱們使用了文件來模擬真實數據源,這並不影響下文要介紹的知識點。這也是一種本地驗證 Flink 應用程序正確性的經常使用方式。
咱們先建立一個 UserBehavior 的 POJO 類(全部成員變量聲明成public即是POJO類),強類型化後能方便後續的處理。ide
/** 用戶行爲數據結構 **/ public static class UserBehavior { public long userId; // 用戶ID public long itemId; // 商品ID public int categoryId; // 商品類目ID public String behavior; // 用戶行爲, 包括("pv", "buy", "cart", "fav") public long timestamp; // 行爲發生的時間戳,單位秒 }
接下來咱們就能夠建立一個 PojoCsvInputFormat 了, 這是一個讀取 csv 文件並將每一行轉成指定 POJO
類型(在咱們案例中是 UserBehavior)的輸入器。
// UserBehavior.csv 的本地文件路徑 URL fileUrl = HotItems2.class.getClassLoader().getResource("UserBehavior.csv"); Path filePath = Path.fromLocalFile(new File(fileUrl.toURI())); // 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class); // 因爲 Java 反射抽取出的字段順序是不肯定的,須要顯式指定下文件中字段的順序 String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"}; // 建立 PojoCsvInputFormat PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
下一步咱們用 PojoCsvInputFormat 建立輸入源。
DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);
這就建立了一個 UserBehavior 類型的 DataStream。
當咱們說「統計過去一小時內點擊量」,這裏的「一小時」是指什麼呢? 在 Flink 中它能夠是指 ProcessingTime ,也能夠是 EventTime,由用戶決定。
在本案例中,咱們須要統計業務時間上的每小時的點擊量,因此要基於 EventTime 來處理。那麼若是讓 Flink 按照咱們想要的業務時間來處理呢?這裏主要有兩件事情要作。
第一件是告訴 Flink 咱們如今按照 EventTime 模式進行處理,Flink 默認使用 ProcessingTime 處理,因此咱們要顯式設置下。
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
第二件事情是指定如何得到業務時間,以及生成 Watermark。Watermark 是用來追蹤業務事件的概念,能夠理解成 EventTime 世界中的時鐘,用來指示當前處理到什麼時刻的數據了。因爲咱們的數據源的數據已經通過整理,沒有亂序,即事件的時間戳是單調遞增的,因此能夠將每條數據的業務時間就當作 Watermark。這裏咱們用 AscendingTimestampExtractor 來實現時間戳的抽取和 Watermark 的生成。
注:真實業務場景通常都是存在亂序的,因此通常使用 BoundedOutOfOrdernessTimestampExtractor。
DataStream<UserBehavior> timedData = dataSource .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() { @Override public long extractAscendingTimestamp(UserBehavior userBehavior) { // 原始數據單位秒,將其轉成毫秒 return userBehavior.timestamp * 1000; } });
這樣咱們就獲得了一個帶有時間標記的數據流了,後面就能作一些窗口的操做。
在開始窗口操做以前,先回顧下需求「每隔5分鐘輸出過去一小時內點擊量最多的前 N 個商品」。因爲原始數據中存在點擊、加購、購買、收藏各類行爲的數據,可是咱們只須要統計點擊量,因此先使用 FilterFunction 將點擊行爲數據過濾出來。
DataStream<UserBehavior> pvData = timedData .filter(new FilterFunction<UserBehavior>() { @Override public boolean filter(UserBehavior userBehavior) throws Exception { // 過濾出只有點擊的數據 return userBehavior.behavior.equals("pv"); } });
因爲要每隔5分鐘統計一次最近一小時每一個商品的點擊量,因此窗口大小是一小時,每隔5分鐘滑動一次。即分別要統計 [09:00, 10:00), [09:05, 10:05), [09:10, 10:10)… 等窗口的商品點擊量。是一個常見的滑動窗口需求(Sliding Window)。
DataStream<ItemViewCount> windowedData = pvData .keyBy("itemId") .timeWindow(Time.minutes(60), Time.minutes(5)) .aggregate(new CountAgg(), new WindowResultFunction());
咱們使用.keyBy("itemId")對商品進行分組,使用.timeWindow(Time size, Time slide)對每一個商品作滑動窗口(1小時窗口,5分鐘滑動一次)。而後咱們使用 .aggregate(AggregateFunction af, WindowFunction wf) 作增量的聚合操做,它能使用AggregateFunction提早聚合掉數據,減小 state 的存儲壓力。較之.apply(WindowFunction wf)會將窗口中的數據都存儲下來,最後一塊兒計算要高效地多。aggregate()方法的第一個參數用於
這裏的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。
/** COUNT 統計的聚合函數實現,每出現一條記錄加一 */ public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> { @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserBehavior userBehavior, Long acc) { return acc + 1; } @Override public Long getResult(Long acc) { return acc; } @Override public Long merge(Long acc1, Long acc2) { return acc1 + acc2; } }
.aggregate(AggregateFunction af, WindowFunction wf) 的第二個參數WindowFunction將每一個 key每一個窗口聚合後的結果帶上其餘信息進行輸出。咱們這裏實現的WindowResultFunction將主鍵商品ID,窗口,點擊量封裝成了ItemViewCount進行輸出。
/** 用於輸出窗口的結果 */ public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> { @Override public void apply( Tuple key, // 窗口的主鍵,即 itemId TimeWindow window, // 窗口 Iterable<Long> aggregateResult, // 聚合函數的結果,即 count 值 Collector<ItemViewCount> collector // 輸出類型爲 ItemViewCount ) throws Exception { Long itemId = ((Tuple1<Long>) key).f0; Long count = aggregateResult.iterator().next(); collector.collect(ItemViewCount.of(itemId, window.getEnd(), count)); } } /** 商品點擊量(窗口操做的輸出類型) */ public static class ItemViewCount { public long itemId; // 商品ID public long windowEnd; // 窗口結束時間戳 public long viewCount; // 商品的點擊量 public static ItemViewCount of(long itemId, long windowEnd, long viewCount) { ItemViewCount result = new ItemViewCount(); result.itemId = itemId; result.windowEnd = windowEnd; result.viewCount = viewCount; return result; } }
如今咱們獲得了每一個商品在每一個窗口的點擊量的數據流。
爲了統計每一個窗口下最熱門的商品,咱們須要再次按窗口進行分組,這裏根據ItemViewCount中的windowEnd進行keyBy()操做。而後使用 ProcessFunction 實現一個自定義的 TopN 函數 TopNHotItems 來計算點擊量排名前3名的商品,並將排名結果格式化成字符串,便於後續輸出。
DataStream<String> topItems = windowedData .keyBy("windowEnd") .process(new TopNHotItems(3)); // 求點擊量前3名的商品
ProcessFunction 是 Flink 提供的一個 low-level API,用於實現更高級的功能。它主要提供了定時器 timer 的功能(支持EventTime或ProcessingTime)。本案例中咱們將利用 timer 來判斷什麼時候收齊了某個 window 下全部商品的點擊量數據。因爲 Watermark 的進度是全局的,
在 processElement 方法中,每當收到一條數據(ItemViewCount),咱們就註冊一個 windowEnd+1 的定時器(Flink 框架會自動忽略同一時間的重複註冊)。windowEnd+1 的定時器被觸發時,意味着收到了windowEnd+1的 Watermark,即收齊了該windowEnd下的全部商品窗口統計值。咱們在 onTimer() 中處理將收集的全部商品及點擊量進行排序,選出 TopN,並將排名信息格式化成字符串後進行輸出。
這裏咱們還使用了 ListState 來存儲收到的每條 ItemViewCount 消息,保證在發生故障時,狀態數據的不丟失和一致性。ListState 是 Flink 提供的相似 Java List 接口的 State API,它集成了框架的 checkpoint 機制,自動作到了 exactly-once 的語義保證。
/** 求某個窗口中前 N 名的熱門點擊商品,key 爲窗口時間戳,輸出爲 TopN 的結果字符串 */ public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> { private final int topSize; public TopNHotItems(int topSize) { this.topSize = topSize; } // 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據後,再觸發 TopN 計算 private ListState<ItemViewCount> itemState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); // 狀態的註冊 ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>( "itemState-state", ItemViewCount.class); itemState = getRuntimeContext().getListState(itemsStateDesc); } @Override public void processElement( ItemViewCount input, Context context, Collector<String> collector) throws Exception { // 每條數據都保存到狀態中 itemState.add(input); // 註冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的全部商品數據 context.timerService().registerEventTimeTimer(input.windowEnd + 1); } @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { // 獲取收到的全部商品點擊量 List<ItemViewCount> allItems = new ArrayList<>(); for (ItemViewCount item : itemState.get()) { allItems.add(item); } // 提早清除狀態中的數據,釋放空間 itemState.clear(); // 按照點擊量從大到小排序 allItems.sort(new Comparator<ItemViewCount>() { @Override public int compare(ItemViewCount o1, ItemViewCount o2) { return (int) (o2.viewCount - o1.viewCount); } }); // 將排名信息格式化成 String, 便於打印 StringBuilder result = new StringBuilder(); result.append("====================================\n"); result.append("時間: ").append(new Timestamp(timestamp-1)).append("\n"); for (int i=0;i<topSize;i++) { ItemViewCount currentItem = allItems.get(i); // No1: 商品ID=12224 瀏覽量=2413 result.append("No").append(i).append(":") .append(" 商品ID=").append(currentItem.itemId) .append(" 瀏覽量=").append(currentItem.viewCount) .append("\n"); } result.append("====================================\n\n"); out.collect(result.toString()); } }
最後一步咱們將結果打印輸出到控制檯,並調用env.execute執行任務。
topItems.print(); env.execute("Hot Items Job");
直接運行 main 函數,就能看到不斷輸出的每一個時間點的熱門商品ID。
本文的完整代碼能夠經過 GitHub 訪問到。本文經過實現一個「實時熱門商品」的案例,學習和實踐了 Flink 的多個核心概念和 API 用法。包括 EventTime、Watermark 的使用,State 的使用,Window API 的使用,以及 TopN 的實現。但願本文能加深你們對 Flink 的理解,幫助你們解決實戰上遇到的問題。
整代碼請移步 GitHub 訪問 : https://github.com/wuchong/my-flink-project/blob/master/src/main/java/myflink/HotItems.java
本文爲雲棲社區原創內容,未經容許不得轉載。