思路分析安全
在電商網站中,訂單的支付做爲直接與營銷收入掛鉤的一環,在業務流程中很是重要。對於訂單而言,爲了正確控制業務流程,也爲了增長用戶的支付意願,網站通常會設置一個支付失效時間,超過一段時間不支付的訂單就會被取消。ide
在電商平臺中,最終創造收入和利潤的是用戶下單購買的環節;更具體一點,是用戶真正完成支付動做的時候。用戶下單的行爲能夠代表用戶對商品的需求,但在現實中,並非每次下單都會被用戶馬上支付。當拖延一段時間後,用戶支付的意願會下降。因此爲了讓用戶更有緊迫感從而提升支付轉化率,同時也爲了防範訂單支付環節的安全風險,電商網站每每會對訂單狀態進行監控,設置一個失效時間(好比15分鐘),若是下單後一段時間仍未支付,訂單就會被取消。咱們一樣能夠利用Process Function,自定義實現檢測訂單超時的功能。爲了簡化問題,咱們只考慮超時報警的情形,在pay事件超時未發生的狀況下,輸出超時報警信息。一個簡單的思路是,能夠在訂單的create事件到來後註冊定時器,15分鐘後觸發;而後再用一個布爾類型的Value狀態來做爲標識位,代表pay事件是否發生過。若是pay事件已經發生,狀態被置爲true,那麼就再也不須要作什麼操做;而若是pay事件一直沒來,狀態一直爲false,到定時器觸發時,就應該輸出超時報警信息。
2
函數
具體實現網站
咱們準備了訂單數據做爲分析數據源OrderLog.csv,這組數據中有訂單建立的相關信息,也有訂單支付的相關信息。
spa
具體實現的代碼以下:
code
public class OrderTimeoutWithoutCep { // 定義超時事件的側輸出流標籤 private final static OutputTag<OrderResult> orderTimeoutTag = new OutputTag<OrderResult>("order-timeout"){};
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1);
// 讀取數據並轉換成POJO類型 URL resource = OrderPayTimeout.class.getResource("/OrderLog.csv"); DataStream<OrderEvent> orderEventStream = env.readTextFile(resource.getPath()) .map(line -> { String[] fields = line.split(","); return new OrderEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3])); }) .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() { @Override public long extractAscendingTimestamp(OrderEvent element) { return element.getTimestamp() * 1000L; } });
// 自定義處理函數,主流輸出正常匹配訂單事件,側輸出流輸出超時報警事件 SingleOutputStreamOperator<OrderResult> resultStream = orderEventStream .keyBy(OrderEvent::getOrderId) .process(new OrderPayMatchDetect());
resultStream.print("payed normally"); resultStream.getSideOutput(orderTimeoutTag).print("timeout");
env.execute("order timeout detect without cep job"); }
// 實現自定義KeyedProcessFunction public static class OrderPayMatchDetect extends KeyedProcessFunction<Long, OrderEvent, OrderResult>{ // 定義狀態,保存以前點單是否已經來過create、pay的事件 ValueState<Boolean> isPayedState; ValueState<Boolean> isCreatedState; // 定義狀態,保存定時器時間戳 ValueState<Long> timerTsState;
@Override public void open(Configuration parameters) throws Exception { isPayedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-payed", Boolean.class, false)); isCreatedState = getRuntimeContext().getState(new ValueStateDescriptor<Boolean>("is-created", Boolean.class, false)); timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class)); }
@Override public void processElement(OrderEvent value, Context ctx, Collector<OrderResult> out) throws Exception { // 先獲取當前狀態 Boolean isPayed = isPayedState.value(); Boolean isCreated = isCreatedState.value(); Long timerTs = timerTsState.value();
// 判斷當前事件類型 if( "create".equals(value.getEventType()) ){ // 1. 若是來的是create,要判斷是否支付過 if( isPayed ){ // 1.1 若是已經正常支付,輸出正常匹配結果 out.collect(new OrderResult(value.getOrderId(), "payed successfully")); // 清空狀態,刪除定時器 isCreatedState.clear(); isPayedState.clear(); timerTsState.clear(); ctx.timerService().deleteEventTimeTimer(timerTs); } else { // 1.2 若是沒有支付過,註冊15分鐘後的定時器,開始等待支付事件 Long ts = ( value.getTimestamp() + 15 * 60 ) * 1000L; ctx.timerService().registerEventTimeTimer(ts); // 更新狀態 timerTsState.update(ts); isCreatedState.update(true); } } else if( "pay".equals(value.getEventType()) ){ // 2. 若是來的是pay,要判斷是否有下單事件來過 if( isCreated ){ // 2.1 已經有過下單事件,要繼續判斷支付的時間戳是否超過15分鐘 if( value.getTimestamp() * 1000L < timerTs ){ // 2.1.1 在15分鐘內,沒有超時,正常匹配輸出 out.collect(new OrderResult(value.getOrderId(), "payed successfully")); } else { // 2.1.2 已經超時,輸出側輸出流報警 ctx.output(orderTimeoutTag, new OrderResult(value.getOrderId(), "payed but already timeout")); } // 統一清空狀態 isCreatedState.clear(); isPayedState.clear(); timerTsState.clear(); ctx.timerService().deleteEventTimeTimer(timerTs); } else { // 2.2 沒有下單事件,亂序,註冊一個定時器,等待下單事件 ctx.timerService().registerEventTimeTimer( value.getTimestamp() * 1000L); // 更新狀態 timerTsState.update(value.getTimestamp() * 1000L); isPayedState.update(true); } } }
@Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<OrderResult> out) throws Exception { // 定時器觸發,說明必定有一個事件沒來 if( isPayedState.value() ){ // 若是pay來了,說明create沒來 ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "payed but not found created log")); }else { // 若是pay沒來,支付超時 ctx.output(orderTimeoutTag, new OrderResult(ctx.getCurrentKey(), "timeout")); } // 清空狀態 isCreatedState.clear(); isPayedState.clear(); timerTsState.clear(); } }}