Flink實戰-訂單支付和對帳狀況監控(分別使用CEP和ProcessFunction來實現)

在電商網站中,訂單的支付做爲直接與錢掛鉤的一環,在業務流程中很是重要。對於訂單而言,爲了正確控制業務流程,也爲了增長用戶的支付意願,網站通常會設置一個支付失效時間,超過一段時間沒支付的訂單就會被取消。另外,對於訂單的支付,還應該保證最終支付的正確性,能夠經過第三方支付平臺的交易數據來作一個實時對帳html

第一個實現的效果,實時獲取訂單數據,分析訂單的支付狀況,分別實時統計支付成功的和15分鐘後支付超時的狀況java

新建一個maven項目,這是基礎依賴,若是以前引入了,就不用加了apache

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.10.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <kafka.version>2.2.0</kafka.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.binary.version}</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

這個場景須要用到cep,因此再加入cep依賴

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

準備數據源文件src/main/resources/OrderLog.csv:

1234,create,,1611047605
1235,create,,1611047606
1236,create,,1611047606
1234,pay,akdb3833,1611047616

把java目錄改成scala,新建com.mafei.orderPayMonitor.OrderTimeoutMonitor.scala 的object

/*
 *
 * @author mafei
 * @date 2021/1/31
*/
package com.mafei.orderPayMonitor

import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time

import java.util

/**
 * 定義輸入樣例類類型,
 *
 * @param orderId   訂單id
 * @param eventType 事件類別: 建立訂單create仍是支付訂單pay
 * @param txId      支付流水號
 * @param ts        時間
 */

case class OrderEvent(orderId: Long, eventType:String,txId: String, ts: Long)

/**
 * 定義輸出樣例類類型,
 */
case class OrderResult(orderId: Long, resultMsg: String)

object OrderTimeoutMonitor {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 一、從文件中讀取數據
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEvnetStream = env.readTextFile(resource.getPath)
      .map(d=>{
        val arr = d.split(",")
        OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong)  //把數據讀出來轉換成想要的樣例類類型
      }).assignAscendingTimestamps(_.ts * 1000L)  //指定ts字段
      .keyBy(_.orderId) //按照訂單id分組

    /**
     * 二、定義事件-匹配模式
     *  定義15分鐘內能發現訂單建立和支付
     */
    val orderPayPattern = Pattern
      .begin[OrderEvent]("create").where(_.eventType == "create")  //先出現一個訂單建立的事件
        .followedBy("pay").where(_.eventType == "pay")            //後邊再出來一個支付事件
      .within(Time.minutes(15))                                //定義在15分鐘之內,觸發這2個事件

    // 三、將pattern應用到流裏面,進行模式檢測
    val patternStream = CEP.pattern(orderEvnetStream, orderPayPattern)

    //四、定義一個側輸出流標籤,用於處理超時事件
    val orderTimeoutTag = new OutputTag[OrderResult]("orderTimeout")

    // 五、調用select 方法,提取並處理匹配的成功字符事件以及超時事件
    val resultStream = patternStream.select(
      orderTimeoutTag,
      new OrderTimeoutSelect(),
      new OrderPaySelect()
    )

    resultStream.print("pay")
    resultStream.getSideOutput(orderTimeoutTag).print()
  env.execute(" order timeout monitor")

  }
}

//獲取超時以後定義的事件還沒觸發的狀況,也就是訂單支付超時了。
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{
  override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
    val timeoutOrderId = map.get("create").iterator().next().orderId
    OrderResult(timeoutOrderId, "超時了。。。。超時時間:"+l)
  }
}

class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{
  override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {

    val orderTs = map.get("create").iterator().next().ts
    val paydTs = map.get("pay").iterator().next().ts
    val payedOrderId = map.get("pay").iterator().next().orderId
    OrderResult(payedOrderId, "訂單支付成功,下單時間:"+orderTs+" 支付時間:"+paydTs)
  }
}

代碼結構及運行效果api

Flink實戰-訂單支付和對帳狀況監控(分別使用CEP和ProcessFunction來實現)


用ProcessFunction來實現上面的場景

csv還能夠用上面的數據,新建一個scala的object src/main/scala/com/mafei/orderPayMonitor/OrderTimeoutMonitorWithProcessFunction.scalamaven

/*
 *
 * @author mafei
 * @date 2021/1/31
*/
package com.mafei.orderPayMonitor

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector

object OrderTimeoutMonitorWithProcessFunction {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 一、從文件中讀取數據
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream = env.readTextFile(resource.getPath)
      .map(d=>{
        val arr = d.split(",")
        OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong)  //把數據讀出來轉換成想要的樣例類類型
      }).assignAscendingTimestamps(_.ts * 1000L)  //指定ts字段
      .keyBy(_.orderId) //按照訂單id分組

    val resultStream = orderEventStream
      .process(new OrderPayMatchProcess())
    resultStream.print("支付成功的: ")

    resultStream.getSideOutput(new OutputTag[OrderResult]("timeout")).print("訂單超時事件")

    env.execute("訂單支付監控with ProcessFunction")
  }
}

class OrderPayMatchProcess() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
  // 先定義狀態標識,標識create、payed、是否已經出現,以及對應的時間戳
  lazy  val isCreateOrderState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isCreateOrderState", classOf[Boolean]))
  lazy  val isPayedOrderState: ValueState[Boolean] = getRuntimeContext.getState(new ValueStateDescriptor[Boolean]("isPayedOrderState", classOf[Boolean]))
  lazy  val timerTsState : ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerTsState", classOf[Long]))

  // 定義一個側輸出流,捕獲timeout的訂單信息
  val orderTimeoutOutputTag = new OutputTag[OrderResult]("timeout")

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {

    //到這裏,確定不會出現訂單建立和支付同時存在的狀況,由於會在processElement處理掉

    //若是隻有訂單建立
    if (isCreateOrderState.value()){
      ctx.output(orderTimeoutOutputTag,OrderResult(ctx.getCurrentKey,"訂單沒支付或超時"))
    }else if(isPayedOrderState.value()){
      ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey,"只有支付,沒看到訂單提交"))
    }
    isCreateOrderState.clear()
    isPayedOrderState.clear()
    timerTsState.clear()
  }

  override def processElement(i: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
    /**
     * 判斷當前事件類型,是create仍是pay
     * 分幾種狀況:
     * 一、判斷create和pay都來了
     *    要看有沒有超時,沒有超時就正常輸出
     *    超時了輸出到側輸出流
     * 二、create或者pay有一個沒來
     *    註冊一個定時器等着,而後等定時器觸發後再輸出
     *
     */
    val isCreate = isCreateOrderState.value()
    val isPayed = isPayedOrderState.value()
    val timerTs = timerTsState.value()
    // 一、create來了
    if (i.eventType == "create"){
      // 1.1  若是已經支付過了,那是正常支付完成,輸出匹配成功的結果
      if (isPayed){
        isCreateOrderState.clear()
        isPayedOrderState.clear()
        timerTsState.clear()
        context.timerService().deleteEventTimeTimer(timerTs)
        collector.collect(OrderResult(context.getCurrentKey,"支付成功"))
      }else{   //若是沒有支付過,那註冊一個定時器,等待15分鐘後觸發
        context.timerService().registerEventTimeTimer(i.ts)
        timerTsState.update(i.ts * 1000L + 900*1000L)
        isCreateOrderState.update(true)
      }
    }
    else if(i.eventType == "pay"){  //若是當前事件是支付事件
      if(isCreate){   //判讀訂單建立事件已經發生
        if(i.ts * 1000L < timerTs){  // 建立訂單到支付的時間在超時時間內,表明正常支付
          collector.collect(OrderResult(context.getCurrentKey,"支付成功"))
        }else{
          context.output(orderTimeoutOutputTag, OrderResult(context.getCurrentKey,"已經支付,可是沒有找到訂單超時了"))
        }
        isCreateOrderState.clear()
        isPayedOrderState.clear()
        timerTsState.clear()
        context.timerService().deleteEventTimeTimer(timerTs)
      }else{  //若是沒看到訂單建立的事件,那就註冊一個定時器等着
        context.timerService().registerEventTimeTimer(i.ts)
        isPayedOrderState.update(true)
        timerTsState.update(i.ts)
      }
    }

  }
}

代碼結構及運行效果

Flink實戰-訂單支付和對帳狀況監控(分別使用CEP和ProcessFunction來實現)



上面實現了監測用戶支付的狀況,實際中還須要對支付後的帳單跟第三方支付平臺作一個實時對帳功能

會涉及到2條數據流(支付和帳單)的合流計算

這裏模擬帳單,因此須要準備一個數據ReceiptLog.csv

akdb3833,alipay,1611047619
akdb3832,wechat,1611049617

上代碼: src/main/scala/com/mafei/orderPayMonitor/TxMatch.scalaide

/*
 *
 * @author mafei
 * @date 2021/1/31
*/
package com.mafei.orderPayMonitor

import com.mafei.orderPayMonitor.OrderTimeoutMonitor.getClass
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector

case class ReceiptEvent(orderId: String, payChannel:String, ts: Long)

object TxMatch {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 一、從訂單文件中讀取數據
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream = env.readTextFile(resource.getPath)
      .map(d=>{
        val arr = d.split(",")
        OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong)  //把數據讀出來轉換成想要的樣例類類型
      }).assignAscendingTimestamps(_.ts * 1000L)  //指定ts字段
      .filter(_.eventType=="pay")
      .keyBy(_.txId) //按照交易id分組

    // 二、從帳單中讀取數據
    val receiptResource = getClass.getResource("/ReceiptLog.csv")
    val receiptEventStream = env.readTextFile(receiptResource.getPath)
      .map(d=>{
        val arr = d.split(",")
        ReceiptEvent(arr(0),arr(1),arr(2).toLong)  //把數據讀出來轉換成想要的樣例類類型
      }).assignAscendingTimestamps(_.ts * 1000L)  //指定ts字段
      .keyBy(_.orderId) //按照訂單id分組

    // 三、合併兩條流,進行處理
    val resultStream = orderEventStream.connect(receiptEventStream)
      .process(new TxPayMatchResult())
    resultStream.print("match: ")

    resultStream.getSideOutput(new OutputTag[OrderEvent]("unmatched-pay")).print("unmatched-pay")
    resultStream.getSideOutput(new OutputTag[ReceiptEvent]("receipt")).print("unmatched-receipt")
    env.execute()

  }
}

class TxPayMatchResult() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,ReceiptEvent)]{
  lazy val orderEventState: ValueState[OrderEvent] = getRuntimeContext.getState(new ValueStateDescriptor[OrderEvent]("orderEvent", classOf[OrderEvent]))
  lazy val receiptEventState: ValueState[ReceiptEvent] = getRuntimeContext.getState(new ValueStateDescriptor[ReceiptEvent]("payEvent", classOf[ReceiptEvent]))

  // 定義自定義側輸出流
  val unmatchedOrderEventTag = new OutputTag[OrderEvent]("unmatched-pay")
  val unmatchedReceiptEventTag = new OutputTag[ReceiptEvent]("receipt")
  override def processElement1(in1: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    //判斷支付帳單來了
    val receiptEvent = receiptEventState.value()
    if(receiptEvent != null){
      //若是帳單已通過來了,那直接輸出
      collector.collect((in1,receiptEvent))
      orderEventState.clear()
      receiptEventState.clear()
    }else{
      //若是沒來,那就註冊一個定時器,等待10秒鐘
      context.timerService().registerEventTimeTimer(in1.ts*1000L + 10000L)
      orderEventState.update(in1)
    }
  }

  override def processElement2(in2: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
    //判斷支付事件來了
    val orderEvent = orderEventState.value()
    if(orderEvent != null){
      //若是帳單已通過來了,那直接輸出
      collector.collect((orderEvent,in2))
      orderEventState.clear()
      receiptEventState.clear()
    }else{
      //若是沒來,那就註冊一個定時器,等待2秒鐘
      context.timerService().registerEventTimeTimer(in2.ts*1000L + 2000L)
      receiptEventState.update(in2)
    }
  }

  override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {

    if(orderEventState.value() != null){
      ctx.output(unmatchedOrderEventTag, orderEventState.value())
    }
    else if(receiptEventState.value() != null){
      ctx.output(unmatchedReceiptEventTag, receiptEventState.value())
    }

    orderEventState.clear()
    receiptEventState.clear()
  }
}

Flink實戰-訂單支付和對帳狀況監控(分別使用CEP和ProcessFunction來實現)

第二種, 使用join來實現這個效果

關於join的文檔:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/joining.html
這種方式優勢是跟方便了,作了一層封裝,缺點也很明顯若是要實現一些複雜狀況如沒匹配中的也輸出之類的就不行了,具體看實際場景須要網站

/*
 *
 * @author mafei
 * @date 2021/1/31
*/
package com.mafei.orderPayMonitor

import com.mafei.orderPayMonitor.TxMatch.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object TxMatchWithJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // 一、從訂單文件中讀取數據
    val resource = getClass.getResource("/OrderLog.csv")
    val orderEventStream = env.readTextFile(resource.getPath)
      .map(d=>{
        val arr = d.split(",")
        OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong)  //把數據讀出來轉換成想要的樣例類類型
      }).assignAscendingTimestamps(_.ts * 1000L)  //指定ts字段
      .filter(_.eventType=="pay")
      .keyBy(_.txId) //按照交易id分組
    // 二、從帳單中讀取數據
    val receiptResource = getClass.getResource("/ReceiptLog.csv")
    val receiptEventStream = env.readTextFile(receiptResource.getPath)
      .map(d=>{
        val arr = d.split(",")
        ReceiptEvent(arr(0),arr(1),arr(2).toLong)  //把數據讀出來轉換成想要的樣例類類型
      }).assignAscendingTimestamps(_.ts * 1000L)  //指定ts字段
      .keyBy(_.orderId) //按照訂單id分組
    val resultStream = orderEventStream.intervalJoin(receiptEventStream)
      .between(Time.seconds(-3), Time.seconds(5))
      .process(new TxMatchWithJoinResult())
    resultStream.print()
    env.execute()
  }
}
class TxMatchWithJoinResult() extends ProcessJoinFunction[OrderEvent, ReceiptEvent,(OrderEvent,ReceiptEvent)]{
  override def processElement(in1: OrderEvent, in2: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {

    collector.collect((in1,in2))
  }
}

代碼結構及運行效果

Flink實戰-訂單支付和對帳狀況監控(分別使用CEP和ProcessFunction來實現)

相關文章
相關標籤/搜索