基於Broadcast 狀態的Flink Etl Demo

接上文: 【翻譯】The Broadcast State Pattern(廣播狀態) html

最近嘗試了一下Flink 的 Broadcase 功能,在Etl,流表關聯場景很是適用:一個流數據量大,一個流數據量小(配置表)須要更新java

業務邏輯以下:apache

  

注: 正常狀況廣播流只有一個輸出源,更新也在這個源裏,這裏作了個優化:將廣播流的輸入源改成兩部分配置文件和更新topic(緣由:flink 讀取文件,讀完就結束了沒法作更新,而每次從kafka獲取全量配置數據,涉及到kafka topic數據的刪除時間,除非涉及很是長的刪除時間,否則每次讀取全量也不太方便),這裏不使用flink的CacheFile,由於不能更新api

具體業務以下:轉碼三位城市編碼爲對應城市中文dom

  1. 自定義輸入流,輸入三位的城市編碼和五位的隨機字符串ide

  2. 廣播流讀取配置文件和配置文件更新topicpost

  3. connect兩個流,讀取配置文件對應的數據解析數據流輸入的數據優化

自定義輸入流以下:ui

class RadomFunction extends SourceFunction[String]{
  var flag = true
  override def cancel(): Unit = {
    flag = false
  }

  override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
    while (flag){
      for (i <- 0 to 300) {
        var nu = i.toString
        while (nu.length < 3) {
          nu = "0" + nu
        }
        ctx.collect(nu + "," + StringUtil.getRandomString(5))
        Thread.sleep(2000)
      }
    }
  }
}

Etl 代碼以下:編碼

import java.io.File
import com.venn.flink.util.{StringUtil}
import com.venn.index.conf.Common
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.runtime.state.filesystem.FsStateBackend
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector

/**
  * broadcast
  */
object BroadCastDemo {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    if ("/".equals(File.separator)) {
      val backend = new FsStateBackend(Common.CHECK_POINT_DATA_DIR, true)
      env.setStateBackend(backend)
      env.enableCheckpointing(10 * 1000, CheckpointingMode.EXACTLY_ONCE)
    } else {
      env.setMaxParallelism(1)
      env.setParallelism(1)
    }
    // 配置更新流
    val configSource = new FlinkKafkaConsumer[String]("broad_cast_demo", new SimpleStringSchema, Common.getProp)
    // 配置流的初始化,能夠經過讀取配置文件實現
    var initFilePath = ""
    if ("/".equals(File.separator)){
      initFilePath = "hdfs:///venn/init_file.txt"
    }else{
      initFilePath = "D:\\idea_out\\broad_cast.txt"
    }
    val init = env.readTextFile(initFilePath)
    val descriptor = new MapStateDescriptor[String,  String]("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
    val configStream = env.addSource(configSource).union(init).broadcast(descriptor)


    val input = env.addSource(new RadomFunction)
      .connect(configStream)
      .process(new BroadcastProcessFunction[String, String, String] {
        override def processBroadcastElement(value: String, ctx: BroadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = {

          println("new config : " + value)
          val configMap = ctx.getBroadcastState(descriptor)
          // process update configMap,讀取配置數據,寫入廣播狀態中
          val line = value.split(",")
          configMap.put(line(0), line(1))
        }
        override def processElement(value: String, ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = {
          // use give key, return value
          val configMap = ctx.getBroadcastState(descriptor)
          // 解析三位城市編碼,根據廣播狀態對應的map,轉碼爲城市對應中文
//          println(value)
          val line = value.split(",")
          val code = line(0)
          var va = configMap.get(code)
          // 不能轉碼的數據默認輸出 中國(code=xxx)
          if ( va == null){
            va = "中國(code="+code+")";
          }else{
            va = va + "(code="+code+")"
          }
          out.collect(va + "," + line(1))
        }
      })
    input.print()

    env.execute("BroadCastDemo")
  }
}

 

配置數據以下:

001,邯鄲市
002,石家莊
003,保定市
004,張家口
005,承德市
006,唐山市
007,廊坊市
008,滄州市
009,衡水市
010,邢臺市

數據源數據以下:

001,bGTqQM
002,sCfdSK
003,RWtLNC
004,qkGita
005,fOemDF
006,KRaUmj
007,MNwKdS
008,RgZDlI
009,QbUyeh

轉碼後輸出以下:

邯鄲市(code=001),bGTqQM
石家莊(code=002),sCfdSK
保定市(code=003),RWtLNC
張家口(code=004),qkGita
承德市(code=005),fOemDF
唐山市(code=006),KRaUmj
廊坊市(code=007),MNwKdS
滄州市(code=008),RgZDlI
衡水市(code=009),QbUyeh

執行結果以下:

...
new config : 047,十堰市
new config : 048,隨棗市
new config : 049,荊門市
new config : 050,江漢(仙桃)
邯鄲市(code=001),ovLKQN
石家莊(code=002),QTgxXn
保定市(code=003),bIPefX
張家口(code=004),XcdHUd
...
宜昌市(code=045),sQRonA
恩施市(code=046),gfipAY
十堰市(code=047),ASPulh
隨棗市(code=048),mqurwg
荊門市(code=049),hfTlue
江漢(仙桃)(code=050),EfiXec
中國(code=051),xGuihq  # 不能轉碼數據
中國(code=052),niMlrb
中國(code=053),fHvIpU
中國(code=054),MdqqCb
中國(code=055),CFgNmM
...

廣播流數據更新以下:

new config : 150,xxx   # 獲取當新配置數據
中國(code=148),fLtwye
中國(code=149),bEJfMP
new config : 151,fff
xxx(code=150),TTIPii   # 新配置數據轉碼數據
fff(code=151),iJSAjJ
中國(code=152),yBvlUZ
new config : 152,ggg

搞定

相關文章
相關標籤/搜索