FLINK SIDDHI ADDON 學習筆記

SIDDHI 是一款功能強大的CEP 引擎,具備本身的DSL,豐富的模式匹配功能和可擴展性, 感謝陳浩同窗提供了SIDDHI和FLINK的整合功能 https://github.com/haoch/flink-siddhi 本文主要介紹了這個ADDON的一些實現思路git

  1. 將FLINK STREAM 轉化爲 SIDDHI STREAM 定義

  用法: SiddhiCEP.registerStream(streamName, FlinkDataStream, fieldNames) github

  經過 FlinkDataStream.getType 得到流對象的類型定義.registerStream方法會構造一個 SiddhiStreamSchema 對象,根據流對象的類型定義,自動獲取每一個field對應的數據類型存在內部的fieldTypes數組中. 數組

  SiddhiStreamSchema 內部會建立一個Siddhi StreamDefinition對象, StreamDefinition的attribute的定義根據fieldNames + fieldTypes 來添加.SiddhiTypeFactory.getAttributeType 負責將Flink 的數據類型映射爲Siddhi的數據類型, 並可自動生成一段Define Stream的定義(見 SiddhiStreamSchema.getStreamDefinitionExpression 方法) define stream [streamName] ([fieldName 1] [fieldType 1], ...[fieldName n] [fieldType n]) 緩存

  SiddhiStreamSchema 包括一個StreamSerializer: 用於將DataStream 中的對象轉化爲 Siddhi Stream 的輸入(Object Array):
    若是流對象是一個簡單類型 Atomic Type 直接將流對象放到 ARRAY中
    若是流對象是Tuple 類型,直接將Tuple 中前N個值放到ARRAY中
    若是流對象是Pojo或者CaseClass類型,直接根據每一個fieldName 獲取Class對應的屬性放到ARRAY中 ide

  1. 串聯FLINK STREAM 和 SIDDHI STREAM

  SiddhiStream: 抽象的Stream基類 orm

  convertDataStream 將原始的FLINK流轉化爲Tuple類型的流,Tuple的第一個元素爲StreamId, 第二個元素爲原來流中的數據,支持普通Stream 和 KeyedStream 對象

  ExecutionSiddhiStream: 構建 SiddhiOperatorContext 並調用SiddhiStreamFactory.createDataStream 建立了集成Siddhi的 DataStream. DataStream的類型爲Tuple的子類.SiddhiTypeFactory.getTupleTypeInformation: 其核心思路是經過Siddhi輸出Stream的StreamDefinition得到其Attribute的定義,再經過 TypeInfoParser.parse構造Flink Tuple 類型的定義 get

  ExecutableStream 根據Siddhi query 建立ExecutionSiddhiStream對象
  SingleSiddhiStream, UnionSiddhiStream: ExecutableStream 的子類,支持Fluent Style的鏈式調用. UnionSiddhiStream 調用了DataStream.union 方法 it

  SiddhiStreamFactory.createDataStream 經過 FLINK DataStream的transform方法使用了自定義的StreamOperator: SiddhiStreamOperator. 在 AbstractSiddhiOperator 的 setup 方法中建立SiddhiManager 和 SiddhiAppRuntime 並註冊了 InputHandler 和 OutputCallback (StreamOutputHandler) io

  SiddhiStreamOperator.processElement 須要處理兩種場景:
    Flink TimeCharacteristic = ProcessingTime: 先調用StreamSerializer將數據轉化爲Object Array, 再直接調用InputHandler.send將數據發送給Siddhi處理
    Flink TimeCharacteristic = EventTime: 緩存接收到的StreamRecord 到內部的priorityQueue中,直到收到Watermark, 將priorityQueue中小於watermark的StreamRecord一次發送給Siddhi處理

  StreamOutputHandler:根據Output的TypeInfo將Siddhi Event 轉化爲 Flink StreamRecord. 再轉發到SiddhiStreamOperator的Output

  1. CHECKPOINT

  SiddhiStreamOperator中保留了兩種State信息,一種是priorityQueue中保存的因爲watermark未發送給Siddhi的消息. 另外一種是Siddhi自己的State, 經過SiddhiAppRuntime.snapshot() 得到

相關文章
相關標籤/搜索