Fllink實時計算運用(八)Flink 大數據實戰案例一

1. Flink大數據實時處理設計方案

file

整套方案經過Canal + Kafka 鏈接器 + Protobuf,實現數據的同步接入, 由Flink服務負責對各種業務數據的實時統計處理。java

2. 熱銷商品的統計處理

  • 功能sql

    實現對熱銷商品的統計, 統計週期爲一天, 每3秒刷新一次數據。json

  • 核心代碼 bootstrap

    主邏輯實現:app

    /**
         * 執行Flink任務處理
         * @throws Exception
         */
        private void executeFlinkTask() throws Exception {
    
            // 1. 建立運行環境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. 設置kafka服務鏈接信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            properties.setProperty("group.id", "fink_group");
    
            // 3. 建立Kafka消費端
            FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(
                    "order_binlog",                  // 目標 topic
                    new SimpleStringSchema(),   // 序列化 配置
                    properties);
    
            // 調試,從新從最先記錄消費
            kafkaProducer.setStartFromEarliest();     // 儘量從最先的記錄開始
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            // 4. 讀取Kafka數據源
            DataStreamSource<String> socketStr = env.addSource(kafkaProducer);
    
            // 5. 數據過濾轉換處理
            socketStr.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);
                    String isDDL = jsonObject.get("isDdl").getAsString();
                    String type = jsonObject.get("type").getAsString();
                    // 過濾條件: 非DDL操做, 而且是新增的數據
                    return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);
              }
            }).flatMap(new FlatMapFunction<String, Order>() {
              @Override
                public void flatMap(String value, Collector<Order> out) throws Exception {
                    // 獲取JSON中的data數據
                    JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");
                    // 將data數據轉換爲java對象
                    for(int i =0; i< dataArray.size(); i++) {
                        JsonObject jsonObject = dataArray.get(i).getAsJsonObject();
                        Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class);
                        System.out.println("order => " + order);
                        out.collect(order);
                    }
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(Order element) {
                    return element.getExecTime();
                }
            })
            .keyBy(Order::getGoodsId)
            .timeWindow(Time.hours(24), Time.seconds(3))
            .aggregate(new TotalAmount(), new AmountWindow())
            .keyBy(HotOrder::getTimeWindow)
            .process(new TopNHotOrder());
    
            // 6. 執行任務
            env.execute("job");
        }

熱銷商品的金額累加處理:socket

/**
     * 商品金額累加器
     */
    private static class TotalAmount implements AggregateFunction<Order, Order, Order> {
        @Override
        public Order createAccumulator() {
            Order order = new Order();
            order.setTotalAmount(0l);
            return order;
        }

        /**
         * 累加統計商品銷售總金額
         * @param value
         * @param accumulator
         * @return
         */
        @Override
        public Order add(Order value, Order accumulator) {
            accumulator.setGoodsId(value.getGoodsId());
            accumulator.setGoodsName((value.getGoodsName()));
            accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));
            return accumulator;
        }

        @Override
        public Order getResult(Order accumulator) {
            return accumulator;
        }

        @Override
        public Order merge(Order a, Order b) {
            return null;
        }
    }

熱銷商品的數據轉換處理, 用於統計:ide

/**
     * 熱銷商品, 在時間窗口內, 對象數據的轉換處理
     */
    private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> {

        @Override
        public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception {
            Order order = input.iterator().next();
            out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd()));
        }
    }

熱銷商品的統計排行處理邏輯:大數據

/**
     * 熱銷商品的統計排行實現
     */
    private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {

        private ListState<HotOrder> orderState;

        @Override
        public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {
            // 將數據加入到狀態列表裏面
            orderState.add(value);
            // 註冊定時器
            ctx.timerService().registerEventTimeTimer(value.getTimeWindow());
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            List<HotOrder> orderList = new ArrayList<>();
            for(HotOrder order : orderState.get()){
                orderList.add(order);
            }
            // 按照成交總金額, 倒序排列
            orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());
            orderState.clear();
            // 將數據寫入至ES
            HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");
            StringBuffer strBuf = new StringBuffer();
            for(HotOrder order: orderList) {
                order.setId(order.getGoodsId());
                order.setCreateDate(new Date(order.getTimeWindow()));
                hotOrderRepository.save(order);
                strBuf.append(order).append("\n");
                System.out.println("result => " + order);
            }
            out.collect(strBuf.toString());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));

        }
    }

3. 區域熱銷商品統計處理 (多維度條件)

  • 功能ui

    功能: 根據不一樣區域(好比省份、城市), 實現對熱銷商品的統計, 統計週期爲一天, 每3秒刷新一次數據。spa

  • 核心代碼

    主邏輯代碼:

    /**
         * 執行Flink任務處理
         * @throws Exception
         */
        private void executeFlinkTask() throws Exception {
    
            // 1. 建立運行環境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2. 設置kafka服務鏈接信息
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.10.20.132:9092");
            properties.setProperty("group.id", "fink_group");
    
            // 3. 建立訂單的Kafka消費端
            FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer(
                    "order_binlog",                  // 目標 topic
                    new SimpleStringSchema(),   // 序列化 配置
                    properties);
    
            // 調試,從新從最先記錄消費
            orderKafkaProducer.setStartFromEarliest();     // 儘量從最先的記錄開始
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            // 4. 建立地址信息的kafka消費端
            FlinkKafkaConsumer addressKafkaProducer = new FlinkKafkaConsumer(
                    "orderAddress_binlog",                  // 目標 topic
                    new SimpleStringSchema(),   // 序列化 配置
                    properties);
    
            // 調試,從新從最先記錄消費
            addressKafkaProducer.setStartFromEarliest();     // 儘量從最先的記錄開始
    
            // 5. 讀取Kafka數據源(訂單數據源和地址數據源)
            DataStreamSource<String> orderStream = env.addSource(orderKafkaProducer);
            DataStreamSource<String> addressStream = env.addSource(addressKafkaProducer);
    
            // 6. 數據過濾轉換處理(訂單數據)
            DataStream<Order> orderDataStream = orderStream.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);
                    String isDDL = jsonObject.get("isDdl").getAsString();
                    String type = jsonObject.get("type").getAsString();
                    // 過濾條件: 非DDL操做, 而且是新增的數據
                    return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);
                }
            }).flatMap(new FlatMapFunction<String, Order>() {
                @Override
                public void flatMap(String value, Collector<Order> out) throws Exception {
                    // 獲取JSON中的data數據
                    JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");
                    // 將data數據轉換爲java對象
                    for(int i =0; i< dataArray.size(); i++) {
                        JsonObject jsonObject = dataArray.get(i).getAsJsonObject();
                        Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class);
                        System.out.println("order => " + order);
                        out.collect(order);
                    }
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(Order element) {
                    return element.getExecTime();
                }
            });
    
            // 7. 過濾轉換地址數據源
            DataStream<OrderAddress> orderAddressDataStream = addressStream.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);
                    String isDDL = jsonObject.get("isDdl").getAsString();
                    String type = jsonObject.get("type").getAsString();
                    // 過濾條件: 非DDL操做, 而且是新增的數據
                    return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);
                }
            }).flatMap(new FlatMapFunction<String, OrderAddress>() {
                @Override
                public void flatMap(String value, Collector<OrderAddress> out) throws Exception {
                    // 獲取JSON中的data數據
                    JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");
                    // 將data數據轉換爲java對象
                    for(int i =0; i< dataArray.size(); i++) {
                        JsonObject jsonObject = dataArray.get(i).getAsJsonObject();
                        OrderAddress orderAddress = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderAddress.class);
                        System.out.println("orderAddress => " + orderAddress);
                        out.collect(orderAddress);
                    }
                }
            })
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderAddress>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(OrderAddress element) {
                    return element.getExecTime();
                }
            });
    
            // 8. 訂單數據流和地址數據流的join處理
            orderDataStream.join(orderAddressDataStream).where(new KeySelector<Order, Object>() {
                @Override
                public Object getKey(Order value) throws Exception {
                    return value.getId();
                }
            }).equalTo(new KeySelector<OrderAddress, Object>() {
                @Override
                public Object getKey(OrderAddress value) throws Exception {
                    return value.getOrderId();
                }
            })
            // 這裏的時間, 相比下面的時間窗滑動值slide快一些
            .window(TumblingEventTimeWindows.of(Time.seconds(2)))
            .apply(new JoinFunction<Order, OrderAddress, JoinOrderAddress>() {
    
                @Override
                public JoinOrderAddress join(Order first, OrderAddress second) throws Exception {
                    return JoinOrderAddress.build(first, second);
                }
            }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderAddress>(Time.seconds(0)) {
                @Override
                public long extractTimestamp(JoinOrderAddress element) {
                    return element.getExecTime();
                }
            })
            // 9. 根據省份和商品ID進行數據分組
            .keyBy(new KeySelector<JoinOrderAddress, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> getKey(JoinOrderAddress value) throws Exception {
                    return Tuple2.of(value.getProvince(), value.getGoodsId());
                }
            })
            .timeWindow(Time.hours(24), Time.seconds(3))
            .aggregate(new TotalAmount(), new AmountWindow())
            .keyBy(HotDimensionOrder::getTimeWindow)
            .process(new TopNDimensionOrder());
    
            // 10. 執行任務
            env.execute("job");
        }

商品金額累加器:

/**
   * 商品金額累加器
   */
  private static class TotalAmount implements AggregateFunction<JoinOrderAddress, JoinOrderAddress, JoinOrderAddress> {
      @Override
      public JoinOrderAddress createAccumulator() {
          JoinOrderAddress order = new JoinOrderAddress();
          order.setTotalAmount(0l);
          return order;
      }
  
      /**
       * 商品銷售總金額累加處理
       * @param value
       * @param accumulator
       * @return
       */
      @Override
      public JoinOrderAddress add(JoinOrderAddress value, JoinOrderAddress accumulator) {
          accumulator.setGoodsId(value.getGoodsId());
          accumulator.setGoodsName((value.getGoodsName()));
          accumulator.setProvince(value.getProvince());
          accumulator.setCity(value.getCity());
          accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));
          return accumulator;
      }
  
      @Override
      public JoinOrderAddress getResult(JoinOrderAddress accumulator) {
          return accumulator;
      }  
      @Override
      public JoinOrderAddress merge(JoinOrderAddress a, JoinOrderAddress b) {
          return null;
      }
  }

熱銷商品的數據轉換處理:

private static class AmountWindow implements WindowFunction<JoinOrderAddress, HotDimensionOrder, Tuple2<String, Long>, TimeWindow> {
  
      @Override
      public void apply(Tuple2<String, Long> goodsId, TimeWindow window, Iterable<JoinOrderAddress> input, Collector<HotDimensionOrder> out) throws Exception {
          JoinOrderAddress order = input.iterator().next();
          out.collect(new HotDimensionOrder(order, window.getEnd()));
      }
  }

根據不一樣區域的熱銷商品, 實現統計排行:

private class TopNDimensionOrder extends KeyedProcessFunction<Long, HotDimensionOrder, String> {
  
      private ListState<HotDimensionOrder> orderState;
  
      @Override
      public void processElement(HotDimensionOrder value, Context ctx, Collector<String> out) throws Exception {
          // 將數據加入到狀態列表裏面
          orderState.add(value);
          // 註冊定時器
          ctx.timerService().registerEventTimeTimer(value.getTimeWindow());
      }
  
      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
          List<HotDimensionOrder> orderList = new ArrayList<>();
          for(HotDimensionOrder order : orderState.get()){
              orderList.add(order);
          }
          // 按照省份和商品的成交總金額, 倒序排列
          orderList.sort(Comparator.comparing(HotDimensionOrder::getProvince).thenComparing(HotDimensionOrder::getTotalAmount, Comparator.reverseOrder()));
          orderState.clear();
          // 將數據寫入至ES
          HotDimensionRepository  hotDimensionRepository = (HotDimensionRepository) ApplicationContextUtil.getBean("hotDimensionRepository");
          StringBuffer strBuf = new StringBuffer();
          for(HotDimensionOrder order: orderList) {
              order.setId(order.getProvince() + order.getGoodsId());
              order.setCreateDate(new Date(order.getTimeWindow()));
              hotDimensionRepository.save(order);
              strBuf.append(order).append("\n");
              System.out.println("result => " + order);
          }
          out.collect(strBuf.toString());
      }  
      @Override
      public void open(Configuration parameters) throws Exception {
          super.open(parameters);
          orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotDimensionOrder>("hot-dimension", HotDimensionOrder.class));
  
      }
  }

本文由mirson創做分享,如需進一步交流,請加QQ羣:19310171或訪問www.softart.cn

相關文章
相關標籤/搜索