Flink進階之使用ProcessFunction實現訂單超時檢測

思路分析安全

在電商網站中,訂單的支付做爲直接與營銷收入掛鉤的一環,在業務流程中很是重要。對於訂單而言,爲了正確控制業務流程,也爲了增長用戶的支付意願,網站通常會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。ide

在電商平臺中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動做的時候。用戶下單的行爲能夠代表用戶對商品的需求,但在現實中,並非每次下單都會被用戶馬上支付。當拖延一段時間後,用戶支付的意願會下降。因此爲了讓用戶更有緊迫感從而提升支付轉化率,同時也爲了防範訂單支付環節的安全風險,電商網站每每會對訂單狀態進行監控,設置一個失效時間(好比15分鐘),若是下單後一段時間仍未支付,訂單就會被取消。咱們一樣能夠利用Process Function,自定義實現檢測訂單超時的功能。爲了簡化問題,咱們只考慮超時報警的情形,在pay事件超時未發生的狀況下,輸出超時報警信息。一個簡單的思路是,能夠在訂單的create事件到來後註冊定時器,15分鐘後觸發;而後再用一個布爾類型的Value狀態來做爲標識位,代表pay事件是否發生過。若是pay事件已經發生,狀態被置爲true,那麼就再也不須要作什麼操做;而若是pay事件一直沒來,狀態一直爲false,到定時器觸發時,就應該輸出超時報警信息。
2
函數

具體實現網站


咱們準備了訂單數據做爲分析數據源OrderLog.csv,這組數據中有訂單建立的相關信息,也有訂單支付的相關信息。
spa

具體實現的代碼以下:
code

public class OrderTimeoutWithoutCep {    // 定義超時事件的側輸出流標籤    private final static OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout"){};
   public static void main(String[] args) throws Exception {        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);
       // 讀取數據並轉換成POJO類型        URL resource = OrderPayTimeout.class.getResource("/OrderLog.csv");        DataStream<OrderEvent> orderEventStream = env.readTextFile(resource.getPath())                .map(line -> {                    String[] fields = line.split(",");                    return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));                })                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {                    @Override                    public long extractAscendingTimestamp(OrderEvent element) {                        return element.getTimestamp() * 1000L;                    }                });
       // 自定義處理函數,主流輸出正常匹配訂單事件,側輸出流輸出超時報警事件        SingleOutputStreamOperator<OrderResult> resultStream = orderEventStream                .keyBy(OrderEvent::getOrderId)                .process(new OrderPayMatchDetect());
       resultStream.print("payed normally");        resultStream.getSideOutput(orderTimeoutTag).print("timeout");
       env.execute("order timeout detect without cep job");    }
   // 實現自定義KeyedProcessFunction    public static class OrderPayMatchDetect extends KeyedProcessFunction<Long, OrderEvent, OrderResult>{        // 定義狀態,保存以前點單是否已經來過create、pay的事件        ValueState<Boolean> isPayedState;        ValueState<Boolean> isCreatedState;        // 定義狀態,保存定時器時間戳        ValueState<Long> timerTsState;
       @Override        public void open(Configuration parameters) throws Exception {            isPayedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-payed", Boolean.class, false));            isCreatedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-created", Boolean.class, false));            timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));        }
       @Override        public void processElement(OrderEvent value, Context ctx, Collector<OrderResult> out) throws Exception {            // 先獲取當前狀態            Boolean isPayed = isPayedState.value();            Boolean isCreated = isCreatedState.value();            Long timerTs = timerTsState.value();
           // 判斷當前事件類型            if( "create".equals(value.getEventType()) ){                // 1. 若是來的是create,要判斷是否支付過                if( isPayed ){                    // 1.1 若是已經正常支付,輸出正常匹配結果                    out.collect(new OrderResult(value.getOrderId(), "payed successfully"));                    // 清空狀態,刪除定時器                    isCreatedState.clear();                    isPayedState.clear();                    timerTsState.clear();                    ctx.timerService().deleteEventTimeTimer(timerTs);                } else {                    // 1.2 若是沒有支付過,註冊15分鐘後的定時器,開始等待支付事件                    Long ts = ( value.getTimestamp() + 15 * 60 ) * 1000L;                    ctx.timerService().registerEventTimeTimer(ts);                    // 更新狀態                    timerTsState.update(ts);                    isCreatedState.update(true);                }            } else if( "pay".equals(value.getEventType()) ){                // 2. 若是來的是pay,要判斷是否有下單事件來過                if( isCreated ){                    // 2.1 已經有過下單事件,要繼續判斷支付的時間戳是否超過15分鐘                    if( value.getTimestamp() * 1000L < timerTs ){                        // 2.1.1 在15分鐘內,沒有超時,正常匹配輸出                        out.collect(new OrderResult(value.getOrderId(), "payed successfully"));                    } else {                        // 2.1.2 已經超時,輸出側輸出流報警                        ctx.output(orderTimeoutTag, new OrderResult(value.getOrderId(), "payed but already timeout"));                    }                    // 統一清空狀態                    isCreatedState.clear();                    isPayedState.clear();                    timerTsState.clear();                    ctx.timerService().deleteEventTimeTimer(timerTs);                } else {                    // 2.2 沒有下單事件,亂序,註冊一個定時器,等待下單事件                    ctx.timerService().registerEventTimeTimer( value.getTimestamp() * 1000L);                    // 更新狀態                    timerTsState.update(value.getTimestamp() * 1000L);                    isPayedState.update(true);                }            }        }
       @Override        public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderResult> out) throws Exception {            // 定時器觸發,說明必定有一個事件沒來            if( isPayedState.value() ){                // 若是pay來了,說明create沒來                ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "payed but not found created log"));            }else {                // 若是pay沒來,支付超時                ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "timeout"));            }            // 清空狀態            isCreatedState.clear();            isPayedState.clear();            timerTsState.clear();        }    }}
相關文章
相關標籤/搜索