Flink實戰-惡意登陸行爲檢測-CEP

FlinkCEP是在Flink上層實現的復瑣事件處理庫。 它可讓你在無限事件流中檢測出特定的事件模型,有機會掌握數據中重要的那部分。

官網文檔: https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/libs/cep.html

這裏給個demo,對比下不用cep和用cep的區別,
      實現目標: 從目標csv中讀取模擬登陸的數據,實時檢測,若是5秒鐘以內連續登陸的次數超過2次,則立刻告警

按照以前的正常操做(非CEP實現)

實現步驟:
一、準備環境和數據源加載到內存
二、進行數據切割,轉成須要的格式(樣例類)
三、指定時間窗口watermark及事件時間取哪一個字段
四、按每一個用戶id進行分組,統計每一個用戶id的登陸行爲(畢竟不能放一塊兒統計吧)
五、實現具體的處理邏輯ProcessFunction
六、輸出檢測數據

準備的模擬數據 userLogin.csv:html

1234,10.0.1.1,fail,1611373940
1235,10.0.1.2,fail,1611373941
1234,10.0.1.3,fail,1611373942
1234,10.0.1.3,success,1611373943
1234,10.0.1.3,fail,1611373943
1234,10.0.1.3,fail,1611373944
1236,10.0.1.4,fail,1611373945
1234,10.0.1.4,fail,1611373957
1234,10.0.1.5,fail,1611373958
1234,10.0.11.55,fail,1611373959
1236,2.2.2.2,fail,1611373960
/*
 *
 * @author mafei
 * @date 2021/1/24
*/
package com.mafei

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

import scala.collection.mutable.ListBuffer

/**
 * 定義一個輸入數據的樣例類
 *
 * @param userId   用戶id
 * @param ip       客戶端的ip
 * @param loginState    登陸狀態,目前只有success/fail,後期能夠作擴展,因此定義爲string
 * @param ts       事件的時間戳,單位秒
 */
case class userLogin(userId: Long,ip: String,loginState: String,ts: Long)

/**
 * 定義一個輸出的樣例類
 * @param userId   用戶id
 * @param startTs   開始登陸時間
 * @param endTs     觸發事件的最後一次時間
 * @param loginCount   時間段內總共登陸的次數
 */
case class userLoginWarning(userId: Long, startTs: Long, endTs:Long, loginCount: Long)

object maliceLoginDetect {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件時間爲窗口和watermark的時間
    env.setParallelism(1)

    //從文件中讀取數據
    val resource = getClass.getResource("/userLogin.csv")
    val inputStream = env.readTextFile(resource.getPath)

    // 轉換成樣例類,並提取時間戳watermark
    val loginEventStream = inputStream
      .map(d => {
        val arr = d.split(",")
        // 分別對應  userId        ip      登陸狀態  時間戳
        userLogin(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
      })
      .assignAscendingTimestamps(_.ts * 1000L) //把秒轉爲毫秒

    val loginWarningStream = loginEventStream
      .keyBy(_.userId)
      .process(new loginMaliceDetect(2))

    loginWarningStream.print()
    env.execute()
  }
}

class loginMaliceDetect(warningCount: Long) extends KeyedProcessFunction[Long,userLogin,userLoginWarning]{

  //定義狀態,保存當前全部的登陸事件爲list,方便後邊作數據統計
  lazy val loginFailListState: ListState[userLogin] = getRuntimeContext.getListState(new ListStateDescriptor[userLogin]("loginFail-list", classOf[userLogin]))

  //定義定時器的時間戳狀態,不然無法刪定時器

  lazy val  timerTsState: ValueState[Long] = getRuntimeContext.getState(new ValueStateDescriptor[Long]("timerState", classOf[Long]))

  override def processElement(i: userLogin, context: KeyedProcessFunction[Long, userLogin, userLoginWarning]#Context, collector: Collector[userLoginWarning]): Unit = {
    //判斷,若是當前事件是登陸失敗事件,那再繼續操做
    if(i.loginState == "fail"){
      loginFailListState.add(i)
      //若是沒有註冊定時器,那就註冊一個定時器,5秒以後觸發
      if(timerTsState.value()== 0){
        val timerTs = i.ts * 1000L + 5000L
        context.timerService().registerEventTimeTimer(timerTs)
        timerTsState.update(timerTs)
      }
    }
    else if(i.loginState == "success"){
      context.timerService().deleteEventTimeTimer(timerTsState.value())
      timerTsState.clear()
      loginFailListState.clear()
    }
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, userLogin, userLoginWarning]#OnTimerContext, out: Collector[userLoginWarning]): Unit = {
    // 判斷下若是登陸失敗次數超過了設置的閾值,則告警
    val loginFailList: ListBuffer[userLogin] = new ListBuffer[userLogin]
    val iterable = loginFailListState.get().iterator()
    while (iterable.hasNext){
      loginFailList += iterable.next()
    }
    if (loginFailList.size > warningCount){
      out.collect(userLoginWarning(userId = ctx.getCurrentKey, startTs = loginFailList.head.ts, endTs = loginFailList.last.ts, loginCount = loginFailList.size))
    }
    loginFailList.clear()
    loginFailListState.clear()
    timerTsState.clear()
  }
}

代碼結構及運行效果

Flink實戰-惡意登陸行爲檢測-CEP

使用flink CEP實現

上面代碼栗子是能夠實現基本的登陸異常檢測了,可是若是碰到數據亂序等狀況,
有3個失敗事件在時間範圍內,可是有個亂序的數據插在中間,這時候按照邏輯中間就會狀況從新計算。。這時候就須要用到flink提供的cep(復瑣事件檢測)的功能了

在pom.xml中增長cep的依賴java

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>

        <flink.version>1.10.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>
/*
 *
 * @author mafei
 * @date 2021/1/24
*/
package com.mafei

import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

import java.util

object maliceLoginDetectWithCep {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //指定事件時間爲窗口和watermark的時間
    env.setParallelism(1)

    //從文件中讀取數據
    val resource = getClass.getResource("/userLogin.csv")
    val inputStream = env.readTextFile(resource.getPath)

    // 轉換成樣例類,並提取時間戳watermark
    val loginEventStream = inputStream
      .map(d => {
        val arr = d.split(",")
        // 分別對應  userId        ip      登陸狀態  時間戳
        userLogin(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
      })
      .assignAscendingTimestamps(_.ts * 1000L) //把秒轉爲毫秒

    // 一、先定義匹配的模式,需求爲一個登陸失敗事件後,緊接着出現另外一個失敗事件
    val loginFailPattern = Pattern
      .begin[userLogin]("firstFail")
        .where(_.loginState == "fail")
      .next("secondFail")
        .where(_.loginState == "fail")
      .within(Time.seconds(5))

    //二、將匹配的規則應用在數據流中,獲得一個PatternStream
    val patternStream = CEP.pattern(loginEventStream.keyBy(_.userId), loginFailPattern)

    // 三、匹配中符合模式要求的數據流,須要調用select
    val loginFailWarningStream = patternStream.select(new LoginFailEventMatch())
    loginFailWarningStream.print()
    env.execute("login fail detect with cep")

  }
}
class LoginFailEventMatch() extends PatternSelectFunction[userLogin,userLoginWarning]{
  override def select(map: util.Map[String, util.List[userLogin]]): userLoginWarning = {

    //前邊定義的全部pattern,都在Map裏頭,由於map的value裏面只定義了一個事件,因此只會有一條,取第一個就能夠,若是定義了多個,須要按實際狀況來
    val firstFailEvent = map.get("firstFail").get(0)
    val secondFailEvent = map.get("secondFail").iterator().next()
    userLoginWarning(firstFailEvent.userId,firstFailEvent.ts,secondFailEvent.ts,2)
  }
}

代碼結構及運行效果

Flink實戰-惡意登陸行爲檢測-CEP

相關文章
相關標籤/搜索