Fink| 實時熱門商品

 

HotNItems程序員

  拓展需求:實時統計雙十一下單量,實時統計成交額,實時查看鍋爐溫度變化曲線,每一個5分鐘看一下過去一個小時溫度變化曲線,apache

  涉及到的技術點:sliding window、Watermark、event time數據結構

  用到的算子或者說叫鏈式調用:keyby、timeWindow、aggregate、assignTimestampsAndWatermarks、filter、processFunction底層API併發

 PopularPlacesToEsapp

  框架:flume -> Kafka、flink、es、kibana框架

  涉及到的技術點:sliding window、watermark、event timeide

  用到的算子:keyby、filter、apply、map、timeWindow函數

實現一個「實時熱門商品」的需求,咱們能夠將「實時熱門商品」翻譯成程序員更好理解的需求:ui

  每隔5分鐘輸出最近一小時內點擊量最多的前N個商品。將這個需求進行分解咱們大概要作這麼幾件事情:this

 

  • 抽取出業務時間戳,告訴Flink框架基於業務時間作窗口

  • 過濾出點擊行爲數據

  • 按一小時的窗口大小,每5分鐘統計一次,作滑動窗口聚合(Sliding Window)

  • 按每一個窗口聚合,輸出每一個窗口中點擊量前N名的商品

public class HotItems {
    public static void main(String[] args) throws Exception {
        //建立執行環境 execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 告訴系統按照 EventTime 處理
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        // 爲了打印到控制檯的結果不亂序,咱們配置全局的併發爲1,改變併發對結果正確性沒有影響
        //env.setMaxParallelism(1);
        //Caused by: org.apache.flink.runtime.JobException: Vertex Split Reader: Custom File source -> Timestamps/Watermarks -> Filter's parallelism (8) is higher than the max parallelism (1). Please lower the parallelism or increase the max parallelism.
        env.setParallelism(1);
        // UserBehavior.csv 的本地文件路徑, 在 resources 目錄下
        URL fileURL = HotItems.class.getClassLoader().getResource("UserBehavior.csv");
        Path filePath = Path.fromLocalFile(new File(fileURL.toURI())); //拋出異常URISyntaxException

        // 抽取 UserBehavior 的 TypeInformation,是一個 PojoTypeInfo ???
        PojoTypeInfo<UserBehavior> pojoType = (PojoTypeInfo<UserBehavior>) TypeExtractor.createTypeInfo(UserBehavior.class);//TypeInformation<UserBehavior>
        String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};

        //建立PojoCsvInputFormat
        PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
        // 建立數據源,獲得 UserBehavior 類型的 DataStream
        env.createInput(csvInput, pojoType)
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
                    @Override
                    public long extractAscendingTimestamp(UserBehavior userBehavior) {
                        return userBehavior.timestamp * 1000;// 原始數據單位秒,將其轉成毫秒
                    }
                }).filter(new FilterFunction<UserBehavior>() { // 過濾出只有點擊的數據
            @Override
            public boolean filter(UserBehavior userBehavior) throws Exception {
                return userBehavior.behavior.equals("pv");
            }
        }).keyBy("itemId")// 咱們使用.keyBy("itemId")對商品進行分組聚合
                // 使用.timeWindow(Time size, Time slide)對每一個商品作滑動窗口(1小時窗口,5分鐘滑動一次)。
                .timeWindow(Time.minutes(60), Time.minutes(5)) //別導錯包了
                .aggregate(new CountAgg(), new WindowResultFunction())
                //CountAgg統計窗口中的條數; 商品ID,窗口,點擊量封裝成了ItemViewCount進行輸出
                .keyBy("windowEnd")
                .process(new TopNHotItems(3)).print();

        env.execute("Hot Items job");

    }
    /**
     * 求某個窗口中前 N 名的熱門點擊商品,key 爲窗口時間戳,輸出爲 TopN 的結果字符串
     */
    public static class TopNHotItems extends KeyedProcessFunction<Tuple, ItemViewCount, String> {
        private final int topSize;
        public TopNHotItems(int topSize) throws Exception {
            this.topSize = topSize;
        }

        // 用於存儲商品與點擊數的狀態,待收齊同一個窗口的數據後,再觸發 TopN 計算
        private ListState<ItemViewCount> itemState;

        /*
         * 這裏咱們還使用了ListState<ItemViewCount>來存儲收到的每條ItemViewCount消息,
         * 保證在發生故障時,狀態數據的不丟失和一致性。
         * ListState是Flink提供的相似Java List接口的State API,
         * 它集成了框架的checkpoint機制,自動作到了exactly-once的語義保證。*/
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            ListStateDescriptor<ItemViewCount> itemsStateDesc = new ListStateDescriptor<>(
                    "itemState-state",//狀態的描述符
                    ItemViewCount.class);//存儲的類型
            //從運行時上下文獲取
            itemState = getRuntimeContext().getListState(itemsStateDesc);
        }
        /*         * ProcessFunction是Flink提供的一個low-level API,用於實現更高級的功能。
         * 它主要提供了定時器timer的功能(支持EventTime或ProcessingTime)。
         * 本案例中咱們將利用timer來判斷什麼時候收齊了某個window下全部商品的點擊量數據。
         * 因爲Watermark的進度是全局的,在processElement方法中,每當收到一條數據(ItemViewCount),咱們就註冊一個windowEnd+1的定時器(Flink框架會自動忽略同一時間的重複註冊)。
         * windowEnd+1的定時器被觸發時,意味着收到了windowEnd+1的Watermark,即收齊了該windowEnd下的全部商品窗口統計值。
         * 咱們在onTimer()中處理將收集的全部商品及點擊量進行排序,選出TopN,並將排名信息格式化成字符串後進行輸出。*/

        @Override
        public void processElement(ItemViewCount input, Context context, Collector<String> collector) throws Exception {
            // 每條數據都保存到狀態中
            itemState.add(input);
            // 註冊 windowEnd+1 的 EventTime Timer, 當觸發時,說明收齊了屬於windowEnd窗口的全部商品數據
            context.timerService().registerEventTimeTimer(input.windowEnd + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 獲取收到的全部商品點擊量
            List<ItemViewCount> allItems = new ArrayList<>();
            for (ItemViewCount item : itemState.get()) {
                allItems.add(item);
            }
            // 提早清除狀態中的數據,釋放空間
            itemState.clear();
            // 按照點擊量從大到小排序
            allItems.sort(new Comparator<ItemViewCount>() {
                @Override
                public int compare(ItemViewCount o1, ItemViewCount o2) {
                    return (int) (o2.viewCount - o1.viewCount);
                }
            });
            // 將排名信息格式化成 String, 便於打印
            StringBuilder result = new StringBuilder();
            result.append("===========================\n");
            result.append("時間:").append(new Timestamp(timestamp - 1)).append("\n");
            for (int i = 0; i < allItems.size() && i < topSize; i++) {
                ItemViewCount currentItem = allItems.get(i);
                // No1:  商品ID=12224  瀏覽量=2413
                result.append("No").append(i).append(":")
                        .append(" 商品ID=").append(currentItem.itemId)
                        .append(" 瀏覽量=").append(currentItem.viewCount)
                        .append("\n");
            }
            result.append("==========================\n\n");
            // 控制輸出頻率,模擬實時滾動結果
            Thread.sleep(1000);
            out.collect(result.toString());
            //super.onTimer(timestamp, ctx, out);
        }
    }

    /** 用於輸出窗口的結果 */
    /** 將每一個key每一個窗口聚合後的結果帶上其餘信息進行輸出。*/
    /**
     * 咱們這裏實現的WindowResultFunction將主鍵商品ID,窗口,點擊量封裝成了ItemViewCount進行輸出。
     */

    private static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key,  // 窗口的主鍵,即 itemId
                          TimeWindow window, // 窗口
                          Iterable<Long> aggregateResult,// 聚合函數的結果,即 count 值
                          Collector<ItemViewCount> collector) // 輸出類型爲 ItemViewCount
        {
            Long itemId = ((Tuple1<Long>) key).f0;
            Long count = aggregateResult.iterator().next();
            collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));

        }

    }

    /**
     * 商品點擊量(窗口操做的輸出類型)
     */
    public static class ItemViewCount {  //public
        public long itemId;     // 商品ID
        public long windowEnd;  // 窗口結束時間戳
        public long viewCount;  // 商品的點擊量

        public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
            ItemViewCount result = new ItemViewCount();
            result.itemId = itemId;
            result.windowEnd = windowEnd;
            result.viewCount = viewCount;
            return result;
        }
    }

    /** COUNT 統計的聚合函數實現,每出現一條記錄加一 */
    /** 接口: AggregateFunction(in, acc, out) */
    /**
     * 這裏的CountAgg實現了AggregateFunction接口,功能是統計窗口中的條數,即遇到一條數據就加一。
     */
    public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {

        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(UserBehavior userBehavior, Long acc) {
            return acc + 1;
        }

        @Override
        public Long getResult(Long acc) {
            return acc;
        }

        @Override
        public Long merge(Long acc1, Long acc2) {
            return acc1 + acc2;
        }
    }

    /**
     * 用戶行爲數據結構
     **/
    public static class UserBehavior {
        public long userId;         // 用戶ID
        public long itemId;         // 商品ID
        public int categoryId;      // 商品類目ID
        public String behavior;     // 用戶行爲, 包括("pv", "buy", "cart", "fav")
        public long timestamp;      // 行爲發生的時間戳,單位秒


    }
}
===========================
時間:2017-11-26 09:05:00.0
No0: 商品ID=5051027 瀏覽量=3
No1: 商品ID=3493253 瀏覽量=3
No2: 商品ID=4261030 瀏覽量=3
==========================


===========================
時間:2017-11-26 09:10:00.0
No0: 商品ID=812879 瀏覽量=5
No1: 商品ID=2600165 瀏覽量=4
No2: 商品ID=2828948 瀏覽量=4
==========================


===========================
時間:2017-11-26 09:15:00.0
No0: 商品ID=812879 瀏覽量=7
No1: 商品ID=138964 瀏覽量=5
No2: 商品ID=4568476 瀏覽量=5
==========================
相關文章
相關標籤/搜索