整套方案經過Canal + Kafka 鏈接器 + Protobuf,實現數據的同步接入, 由Flink服務負責對各種業務數據的實時統計處理。java
功能sql
實現對熱銷商品的統計, 統計週期爲一天, 每3秒刷新一次數據。json
核心代碼 bootstrap
主邏輯實現:app
/** * 執行Flink任務處理 * @throws Exception */ private void executeFlinkTask() throws Exception { // 1. 建立運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 設置kafka服務鏈接信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 建立Kafka消費端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "order_binlog", // 目標 topic new SimpleStringSchema(), // 序列化 配置 properties); // 調試,從新從最先記錄消費 kafkaProducer.setStartFromEarliest(); // 儘量從最先的記錄開始 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 4. 讀取Kafka數據源 DataStreamSource<String> socketStr = env.addSource(kafkaProducer); // 5. 數據過濾轉換處理 socketStr.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 過濾條件: 非DDL操做, 而且是新增的數據 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, Order>() { @Override public void flatMap(String value, Collector<Order> out) throws Exception { // 獲取JSON中的data數據 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 將data數據轉換爲java對象 for(int i =0; i< dataArray.size(); i++) { JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class); System.out.println("order => " + order); out.collect(order); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) { @Override public long extractTimestamp(Order element) { return element.getExecTime(); } }) .keyBy(Order::getGoodsId) .timeWindow(Time.hours(24), Time.seconds(3)) .aggregate(new TotalAmount(), new AmountWindow()) .keyBy(HotOrder::getTimeWindow) .process(new TopNHotOrder()); // 6. 執行任務 env.execute("job"); }
熱銷商品的金額累加處理:socket
/** * 商品金額累加器 */ private static class TotalAmount implements AggregateFunction<Order, Order, Order> { @Override public Order createAccumulator() { Order order = new Order(); order.setTotalAmount(0l); return order; } /** * 累加統計商品銷售總金額 * @param value * @param accumulator * @return */ @Override public Order add(Order value, Order accumulator) { accumulator.setGoodsId(value.getGoodsId()); accumulator.setGoodsName((value.getGoodsName())); accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume())); return accumulator; } @Override public Order getResult(Order accumulator) { return accumulator; } @Override public Order merge(Order a, Order b) { return null; } }
熱銷商品的數據轉換處理, 用於統計:ide
/** * 熱銷商品, 在時間窗口內, 對象數據的轉換處理 */ private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> { @Override public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception { Order order = input.iterator().next(); out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd())); } }
熱銷商品的統計排行處理邏輯:大數據
/** * 熱銷商品的統計排行實現 */ private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> { private ListState<HotOrder> orderState; @Override public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception { // 將數據加入到狀態列表裏面 orderState.add(value); // 註冊定時器 ctx.timerService().registerEventTimeTimer(value.getTimeWindow()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { List<HotOrder> orderList = new ArrayList<>(); for(HotOrder order : orderState.get()){ orderList.add(order); } // 按照成交總金額, 倒序排列 orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed()); orderState.clear(); // 將數據寫入至ES HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository"); StringBuffer strBuf = new StringBuffer(); for(HotOrder order: orderList) { order.setId(order.getGoodsId()); order.setCreateDate(new Date(order.getTimeWindow())); hotOrderRepository.save(order); strBuf.append(order).append("\n"); System.out.println("result => " + order); } out.collect(strBuf.toString()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class)); } }
功能ui
功能: 根據不一樣區域(好比省份、城市), 實現對熱銷商品的統計, 統計週期爲一天, 每3秒刷新一次數據。spa
核心代碼
主邏輯代碼:
/** * 執行Flink任務處理 * @throws Exception */ private void executeFlinkTask() throws Exception { // 1. 建立運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 設置kafka服務鏈接信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 建立訂單的Kafka消費端 FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer( "order_binlog", // 目標 topic new SimpleStringSchema(), // 序列化 配置 properties); // 調試,從新從最先記錄消費 orderKafkaProducer.setStartFromEarliest(); // 儘量從最先的記錄開始 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 4. 建立地址信息的kafka消費端 FlinkKafkaConsumer addressKafkaProducer = new FlinkKafkaConsumer( "orderAddress_binlog", // 目標 topic new SimpleStringSchema(), // 序列化 配置 properties); // 調試,從新從最先記錄消費 addressKafkaProducer.setStartFromEarliest(); // 儘量從最先的記錄開始 // 5. 讀取Kafka數據源(訂單數據源和地址數據源) DataStreamSource<String> orderStream = env.addSource(orderKafkaProducer); DataStreamSource<String> addressStream = env.addSource(addressKafkaProducer); // 6. 數據過濾轉換處理(訂單數據) DataStream<Order> orderDataStream = orderStream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 過濾條件: 非DDL操做, 而且是新增的數據 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, Order>() { @Override public void flatMap(String value, Collector<Order> out) throws Exception { // 獲取JSON中的data數據 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 將data數據轉換爲java對象 for(int i =0; i< dataArray.size(); i++) { JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class); System.out.println("order => " + order); out.collect(order); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) { @Override public long extractTimestamp(Order element) { return element.getExecTime(); } }); // 7. 過濾轉換地址數據源 DataStream<OrderAddress> orderAddressDataStream = addressStream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 過濾條件: 非DDL操做, 而且是新增的數據 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, OrderAddress>() { @Override public void flatMap(String value, Collector<OrderAddress> out) throws Exception { // 獲取JSON中的data數據 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 將data數據轉換爲java對象 for(int i =0; i< dataArray.size(); i++) { JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); OrderAddress orderAddress = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderAddress.class); System.out.println("orderAddress => " + orderAddress); out.collect(orderAddress); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderAddress>(Time.seconds(0)) { @Override public long extractTimestamp(OrderAddress element) { return element.getExecTime(); } }); // 8. 訂單數據流和地址數據流的join處理 orderDataStream.join(orderAddressDataStream).where(new KeySelector<Order, Object>() { @Override public Object getKey(Order value) throws Exception { return value.getId(); } }).equalTo(new KeySelector<OrderAddress, Object>() { @Override public Object getKey(OrderAddress value) throws Exception { return value.getOrderId(); } }) // 這裏的時間, 相比下面的時間窗滑動值slide快一些 .window(TumblingEventTimeWindows.of(Time.seconds(2))) .apply(new JoinFunction<Order, OrderAddress, JoinOrderAddress>() { @Override public JoinOrderAddress join(Order first, OrderAddress second) throws Exception { return JoinOrderAddress.build(first, second); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderAddress>(Time.seconds(0)) { @Override public long extractTimestamp(JoinOrderAddress element) { return element.getExecTime(); } }) // 9. 根據省份和商品ID進行數據分組 .keyBy(new KeySelector<JoinOrderAddress, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> getKey(JoinOrderAddress value) throws Exception { return Tuple2.of(value.getProvince(), value.getGoodsId()); } }) .timeWindow(Time.hours(24), Time.seconds(3)) .aggregate(new TotalAmount(), new AmountWindow()) .keyBy(HotDimensionOrder::getTimeWindow) .process(new TopNDimensionOrder()); // 10. 執行任務 env.execute("job"); }
商品金額累加器:
/** * 商品金額累加器 */ private static class TotalAmount implements AggregateFunction<JoinOrderAddress, JoinOrderAddress, JoinOrderAddress> { @Override public JoinOrderAddress createAccumulator() { JoinOrderAddress order = new JoinOrderAddress(); order.setTotalAmount(0l); return order; } /** * 商品銷售總金額累加處理 * @param value * @param accumulator * @return */ @Override public JoinOrderAddress add(JoinOrderAddress value, JoinOrderAddress accumulator) { accumulator.setGoodsId(value.getGoodsId()); accumulator.setGoodsName((value.getGoodsName())); accumulator.setProvince(value.getProvince()); accumulator.setCity(value.getCity()); accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume())); return accumulator; } @Override public JoinOrderAddress getResult(JoinOrderAddress accumulator) { return accumulator; } @Override public JoinOrderAddress merge(JoinOrderAddress a, JoinOrderAddress b) { return null; } }
熱銷商品的數據轉換處理:
private static class AmountWindow implements WindowFunction<JoinOrderAddress, HotDimensionOrder, Tuple2<String, Long>, TimeWindow> { @Override public void apply(Tuple2<String, Long> goodsId, TimeWindow window, Iterable<JoinOrderAddress> input, Collector<HotDimensionOrder> out) throws Exception { JoinOrderAddress order = input.iterator().next(); out.collect(new HotDimensionOrder(order, window.getEnd())); } }
根據不一樣區域的熱銷商品, 實現統計排行:
private class TopNDimensionOrder extends KeyedProcessFunction<Long, HotDimensionOrder, String> { private ListState<HotDimensionOrder> orderState; @Override public void processElement(HotDimensionOrder value, Context ctx, Collector<String> out) throws Exception { // 將數據加入到狀態列表裏面 orderState.add(value); // 註冊定時器 ctx.timerService().registerEventTimeTimer(value.getTimeWindow()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { List<HotDimensionOrder> orderList = new ArrayList<>(); for(HotDimensionOrder order : orderState.get()){ orderList.add(order); } // 按照省份和商品的成交總金額, 倒序排列 orderList.sort(Comparator.comparing(HotDimensionOrder::getProvince).thenComparing(HotDimensionOrder::getTotalAmount, Comparator.reverseOrder())); orderState.clear(); // 將數據寫入至ES HotDimensionRepository hotDimensionRepository = (HotDimensionRepository) ApplicationContextUtil.getBean("hotDimensionRepository"); StringBuffer strBuf = new StringBuffer(); for(HotDimensionOrder order: orderList) { order.setId(order.getProvince() + order.getGoodsId()); order.setCreateDate(new Date(order.getTimeWindow())); hotDimensionRepository.save(order); strBuf.append(order).append("\n"); System.out.println("result => " + order); } out.collect(strBuf.toString()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotDimensionOrder>("hot-dimension", HotDimensionOrder.class)); } }
本文由mirson創做分享,如需進一步交流,請加QQ羣:19310171或訪問www.softart.cn