功能java
實現對熱銷商品的統計, 統計週期爲一天, 每3秒刷新一次數據。sql
核心代碼 json
主邏輯代碼實現:bootstrap
/** * 執行Flink任務處理 * @throws Exception */ private void executeFlinkTask() throws Exception { // 1. 建立運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 設置kafka服務鏈接信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 建立Kafka消費端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "orderPayment_binlog", // 目標 topic new SimpleStringSchema(), // 序列化 配置 properties); // 調試,從新從最先記錄消費 kafkaProducer.setStartFromEarliest(); // 儘量從最先的記錄開始 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 4. 讀取Kafka數據源 DataStreamSource<String> socketStr = env.addSource(kafkaProducer); // 5. 數據過濾轉換處理 DataStream<OrderPayment> orderPaymentDataStream = socketStr.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 過濾條件: 非DDL操做, 而且是新增的數據 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, OrderPayment>() { @Override public void flatMap(String value, Collector<OrderPayment> out) throws Exception { // 獲取JSON中的data數據 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 將data數據轉換爲java對象 for(int i =0; i< dataArray.size(); i++) { JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); OrderPayment orderPayment = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderPayment.class); System.out.println("orderPayment => " + orderPayment); out.collect(orderPayment); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderPayment>(Time.seconds(0)) { @Override public long extractTimestamp(OrderPayment element) { return element.getUpdateTime(); } }) .keyBy(OrderPayment::getOrderId); // 6.經過CEP機制, 判斷支付成功的數據 Pattern<OrderPayment, ?> pattern = Pattern.<OrderPayment>begin("begin") .where(new SimpleCondition<OrderPayment>() { @Override public boolean filter(OrderPayment value) throws Exception { return value.getStatus() == 0; } }).next("follow").where(new SimpleCondition<OrderPayment>() { @Override public boolean filter(OrderPayment value) throws Exception { return value.getStatus() == 1; } }).within(Time.seconds(15)).times(1); PatternStream<OrderPayment> patternStream = CEP.pattern(orderPaymentDataStream, pattern); // 7.定義超時數據的TAG標記 OutputTag orderExpired = new OutputTag<OrderPayment>("orderExpired"){}; DataStream<OrderPaymentResult> selectResult = patternStream.select(orderExpired, new OrderExpiredMatcher(), new OrderPayedMatcher()); selectResult.print("payed"); // 8. 建立Kafka消費端(訂單數據源) FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer( "order_binlog", // 目標 topic new SimpleStringSchema(), // 序列化 配置 properties); orderKafkaProducer.setStartFromEarliest(); // 儘量從最先的記錄開始 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStreamSource<String> orderSource = env.addSource(orderKafkaProducer); // 9. 數據過濾轉換處理(訂單數據源) DataStream<Order> orderDataStream = orderSource.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 過濾條件: 非DDL操做, 而且是新增的數據 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, Order>() { @Override public void flatMap(String value, Collector<Order> out) throws Exception { // 獲取JSON中的data數據 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 將data數據轉換爲java對象 for(int i =0; i< dataArray.size(); i++) { JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class); System.out.println("order => " + order); out.collect(order); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) { @Override public long extractTimestamp(Order element) { return element.getExecTime(); } }); // 10. 數據源關聯處理 orderDataStream.keyBy(Order::getId).intervalJoin(selectResult.keyBy(OrderPaymentResult::getOrderId)) .between(Time.seconds(0), Time.seconds(15)) .process(new ProcessJoinFunction<Order, OrderPaymentResult, JoinOrderPayment>() { @Override public void processElement(Order left, OrderPaymentResult right, Context ctx, Collector<JoinOrderPayment> out) throws Exception { JoinOrderPayment joinResult = JoinOrderPayment.build(left, right); out.collect(joinResult); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderPayment>(Time.seconds(0)) { @Override public long extractTimestamp(JoinOrderPayment element) { return element.getUpdateTime(); } }) .keyBy(JoinOrderPayment::getGoodsId) .timeWindow(Time.hours(24), Time.seconds(3)) .aggregate(new TotalAmount(), new AmountWindow()) .keyBy(HotOrder::getTimeWindow) .process(new TopNHotOrder()); // 11. 執行任務 env.execute("job"); }
商品金額累加器:app
/** * 商品金額累加器 */ private static class TotalAmount implements AggregateFunction<JoinOrderPayment, JoinOrderPayment, JoinOrderPayment> { @Override public JoinOrderPayment createAccumulator() { JoinOrderPayment order = new JoinOrderPayment(); order.setTotalAmount(0l); return order; } /** * 商品銷售總金額累加處理 * @param value * @param accumulator * @return */ @Override public JoinOrderPayment add(JoinOrderPayment value, JoinOrderPayment accumulator) { accumulator.setGoodsId(value.getGoodsId()); accumulator.setGoodsName((value.getGoodsName())); accumulator.setStatus(value.getStatus()); accumulator.setUpdateTime(value.getUpdateTime()); accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume())); return accumulator; } @Override public JoinOrderPayment getResult(JoinOrderPayment accumulator) { return accumulator; } @Override public JoinOrderPayment merge(JoinOrderPayment a, JoinOrderPayment b) { return null; } }
熱銷商品轉換處理:socket
/** * 熱銷商品, 時間窗口對象轉換處理 */ private static class AmountWindow implements WindowFunction<JoinOrderPayment, HotOrder, Long, TimeWindow> { @Override public void apply(Long goodsId, TimeWindow window, Iterable<JoinOrderPayment> input, Collector<HotOrder> out) throws Exception { JoinOrderPayment order = input.iterator().next(); out.collect(new HotOrder(order.getGoodsId(), order.getGoodsName(), order.getTotalAmount(), window.getEnd())); } }
熱銷商品的統計排行實現:ide
/** * 熱銷商品的統計排行實現 */ private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> { private ListState<HotOrder> orderState; @Override public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception { // 將數據加入到狀態列表裏面 orderState.add(value); // 註冊定時器 ctx.timerService().registerEventTimeTimer(value.getTimeWindow()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { List<HotOrder> orderList = new ArrayList<>(); for(HotOrder order : orderState.get()){ orderList.add(order); } // 按照成交總金額, 倒序排列 orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed()); orderState.clear(); // 將數據寫入至ES HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository"); StringBuffer strBuf = new StringBuffer(); for(HotOrder order: orderList) { order.setId(order.getGoodsId()); order.setCreateDate(new Date(order.getTimeWindow())); hotOrderRepository.save(order); strBuf.append(order).append("\n"); System.out.println("result => " + order); } out.collect(strBuf.toString()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class)); } }
超時數據的匹配處理:ui
private class OrderExpiredMatcher implements PatternTimeoutFunction<OrderPayment, OrderPaymentResult> { @Override public OrderPaymentResult timeout(Map<String, List<OrderPayment>> map, long l) throws Exception { OrderPaymentResult result = new OrderPaymentResult(); OrderPayment payment = map.get("begin").iterator().next(); result.setOrderId(payment.getOrderId()); result.setStatus(payment.getStatus()); result.setUpdateTime(payment.getUpdateTime()); result.setMessage("支付超時"); return result; } }
支付成功的匹配處理:調試
private class OrderPayedMatcher implements PatternSelectFunction<OrderPayment, OrderPaymentResult> { @Override public OrderPaymentResult select(Map<String, List<OrderPayment>> map) throws Exception { OrderPaymentResult result = new OrderPaymentResult(); OrderPayment payment = map.get("follow").iterator().next(); result.setOrderId(payment.getOrderId()); result.setStatus(payment.getStatus()); result.setUpdateTime(payment.getUpdateTime()); result.setMessage("支付成功"); return result; } }
功能code
統計商品在一段時間內的UV(Unique Visitor),去重後的點擊量, 根據IP去重。
核心代碼
主邏輯實現:
public class ScreenUniqueVisitorProcessor { /** * 執行flink任務處理 * @throws Exception */ public void executeFlinkTask() throws Exception { // 1. 建立運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); // 2. 讀取Socket數據源 // DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log"); // 3. 數據解析轉換處理 socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() { @Override public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception { // 獲取Json中的data數據 // 根據分隔符解析數據 String[] arrValue = value.split("\t"); System.out.println("receive msg => " + value); // 將數據組裝爲對象 GoodsAccessLog log = new GoodsAccessLog(); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setAccessTime(Long.valueOf(arrValue[i])); }else if( i== 2) { log.setEventType(arrValue[i]); }else if( i== 3) { log.setGoodsId(arrValue[i]); } } out.collect(log); } }) .filter(new FilterFunction<GoodsAccessLog>() { @Override public boolean filter(GoodsAccessLog value) throws Exception { return value.getEventType().equals("view"); } }) .keyBy(GoodsAccessLog::getGoodsId) .timeWindow(Time.seconds(10)) .process( new ProcessWindowFunction<GoodsAccessLog, Map<String, String> , String, TimeWindow>(){ @Override public void process(String key, Context context, Iterable<GoodsAccessLog> elements, Collector<Map<String, String>> out) throws Exception { Set<String> ipSet = new HashSet<>(); Map<String, String> goodsUV = new LinkedHashMap<>(); elements.forEach( log -> { ipSet.add(log.getIp()); }); goodsUV.put(key , context.window().getEnd() + ":" + ipSet.size()); out.collect(goodsUV); } }) .print("uv result").setParallelism(1); // 5. 執行任務 env.execute("job"); } }
熱銷商品的金額累加處理:
/** * 商品金額累加器 */ private static class TotalAmount implements AggregateFunction<Order, Order, Order> { @Override public Order createAccumulator() { Order order = new Order(); order.setTotalAmount(0l); return order; } /** * 累加統計商品銷售總金額 * @param value * @param accumulator * @return */ @Override public Order add(Order value, Order accumulator) { accumulator.setGoodsId(value.getGoodsId()); accumulator.setGoodsName((value.getGoodsName())); accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume())); return accumulator; } @Override public Order getResult(Order accumulator) { return accumulator; } @Override public Order merge(Order a, Order b) { return null; } }
熱銷商品的數據轉換處理, 用於統計:
/** * 熱銷商品, 在時間窗口內, 對象數據的轉換處理 */ private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> { @Override public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception { Order order = input.iterator().next(); out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd())); } }
熱銷商品的統計排行處理邏輯:
/** * 熱銷商品的統計排行實現 */ private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> { private ListState<HotOrder> orderState; @Override public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception { // 將數據加入到狀態列表裏面 orderState.add(value); // 註冊定時器 ctx.timerService().registerEventTimeTimer(value.getTimeWindow()); } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { List<HotOrder> orderList = new ArrayList<>(); for(HotOrder order : orderState.get()){ orderList.add(order); } // 按照成交總金額, 倒序排列 orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed()); orderState.clear(); // 將數據寫入至ES HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository"); StringBuffer strBuf = new StringBuffer(); for(HotOrder order: orderList) { order.setId(order.getGoodsId()); order.setCreateDate(new Date(order.getTimeWindow())); hotOrderRepository.save(order); strBuf.append(order).append("\n"); System.out.println("result => " + order); } out.collect(strBuf.toString()); } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class)); } }
功能
功能: 統計商品在一段時間內的UV(採用布隆過濾器),去重後的點擊量, 根據IP去重。
核心代碼
主邏輯代碼:
/** * 執行flink任務處理 * @throws Exception */ public void executeFlinkTask() throws Exception { // 1. 建立運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2. 讀取Socket數據源 // DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log"); // 3. 數據解析轉換處理 socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() { @Override public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception { // 獲取Json中的data數據 // 根據分隔符解析數據 String[] arrValue = value.split("\t"); // 將數據組裝爲對象 GoodsAccessLog log = new GoodsAccessLog(); for(int i=0; i<arrValue.length; i++) { if(i == 0) { log.setIp(arrValue[i]); }else if( i== 1) { log.setAccessTime(Long.valueOf(arrValue[i])); }else if( i== 2) { log.setEventType(arrValue[i]); }else if( i== 3) { log.setGoodsId(arrValue[i]); } } out.collect(log); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<GoodsAccessLog>(Time.seconds(0)) { @Override public long extractTimestamp(GoodsAccessLog element) { return element.getAccessTime(); } }) .filter(new FilterFunction<GoodsAccessLog>() { @Override public boolean filter(GoodsAccessLog value) throws Exception { return value.getEventType().equals("view"); } }) .keyBy(GoodsAccessLog::getGoodsId) .timeWindow(Time.minutes(30)) .trigger(new CustomWindowTrigger()) .process(new CustomUVBloom()) .keyBy(0) .timeWindow(Time.seconds(3)) .max(1) .print("uv result => ").setParallelism(1); // 5. 執行任務 env.execute("job"); }
本文由mirson創做分享,如需進一步交流,請加QQ羣:19310171或訪問www.softart.cn