接上文: 【翻譯】The Broadcast State Pattern(廣播狀態) html
最近嘗試了一下Flink 的 Broadcase 功能,在Etl,流表關聯場景很是適用:一個流數據量大,一個流數據量小(配置表)須要更新java
業務邏輯以下:apache
注: 正常狀況廣播流只有一個輸出源,更新也在這個源裏,這裏作了個優化:將廣播流的輸入源改成兩部分配置文件和更新topic(緣由:flink 讀取文件,讀完就結束了沒法作更新,而每次從kafka獲取全量配置數據,涉及到kafka topic數據的刪除時間,除非涉及很是長的刪除時間,否則每次讀取全量也不太方便),這裏不使用flink的CacheFile,由於不能更新api
具體業務以下:轉碼三位城市編碼爲對應城市中文dom
1. 自定義輸入流,輸入三位的城市編碼和五位的隨機字符串ide
2. 廣播流讀取配置文件和配置文件更新topicpost
3. connect兩個流,讀取配置文件對應的數據解析數據流輸入的數據優化
自定義輸入流以下:ui
class RadomFunction extends SourceFunction[String]{ var flag = true override def cancel(): Unit = { flag = false } override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (flag){ for (i <- 0 to 300) { var nu = i.toString while (nu.length < 3) { nu = "0" + nu } ctx.collect(nu + "," + StringUtil.getRandomString(5)) Thread.sleep(2000) } } } }
Etl 代碼以下:編碼
import java.io.File import com.venn.flink.util.{StringUtil} import com.venn.index.conf.Common import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.util.Collector /** * broadcast */ object BroadCastDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) if ("/".equals(File.separator)) { val backend = new FsStateBackend(Common.CHECK_POINT_DATA_DIR, true) env.setStateBackend(backend) env.enableCheckpointing(10 * 1000, CheckpointingMode.EXACTLY_ONCE) } else { env.setMaxParallelism(1) env.setParallelism(1) } // 配置更新流 val configSource = new FlinkKafkaConsumer[String]("broad_cast_demo", new SimpleStringSchema, Common.getProp) // 配置流的初始化,能夠經過讀取配置文件實現 var initFilePath = "" if ("/".equals(File.separator)){ initFilePath = "hdfs:///venn/init_file.txt" }else{ initFilePath = "D:\\idea_out\\broad_cast.txt" } val init = env.readTextFile(initFilePath) val descriptor = new MapStateDescriptor[String, String]("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) val configStream = env.addSource(configSource).union(init).broadcast(descriptor) val input = env.addSource(new RadomFunction) .connect(configStream) .process(new BroadcastProcessFunction[String, String, String] { override def processBroadcastElement(value: String, ctx: BroadcastProcessFunction[String, String, String]#Context, out: Collector[String]): Unit = { println("new config : " + value) val configMap = ctx.getBroadcastState(descriptor) // process update configMap,讀取配置數據,寫入廣播狀態中 val line = value.split(",") configMap.put(line(0), line(1)) } override def processElement(value: String, ctx: BroadcastProcessFunction[String, String, String]#ReadOnlyContext, out: Collector[String]): Unit = { // use give key, return value val configMap = ctx.getBroadcastState(descriptor) // 解析三位城市編碼,根據廣播狀態對應的map,轉碼爲城市對應中文 // println(value) val line = value.split(",") val code = line(0) var va = configMap.get(code) // 不能轉碼的數據默認輸出 中國(code=xxx) if ( va == null){ va = "中國(code="+code+")"; }else{ va = va + "(code="+code+")" } out.collect(va + "," + line(1)) } }) input.print() env.execute("BroadCastDemo") } }
配置數據以下:
001,邯鄲市 002,石家莊 003,保定市 004,張家口 005,承德市 006,唐山市 007,廊坊市 008,滄州市 009,衡水市 010,邢臺市
數據源數據以下:
001,bGTqQM 002,sCfdSK 003,RWtLNC 004,qkGita 005,fOemDF 006,KRaUmj 007,MNwKdS 008,RgZDlI 009,QbUyeh
轉碼後輸出以下:
邯鄲市(code=001),bGTqQM 石家莊(code=002),sCfdSK 保定市(code=003),RWtLNC 張家口(code=004),qkGita 承德市(code=005),fOemDF 唐山市(code=006),KRaUmj 廊坊市(code=007),MNwKdS 滄州市(code=008),RgZDlI 衡水市(code=009),QbUyeh
執行結果以下:
... new config : 047,十堰市 new config : 048,隨棗市 new config : 049,荊門市 new config : 050,江漢(仙桃) 邯鄲市(code=001),ovLKQN 石家莊(code=002),QTgxXn 保定市(code=003),bIPefX 張家口(code=004),XcdHUd ... 宜昌市(code=045),sQRonA 恩施市(code=046),gfipAY 十堰市(code=047),ASPulh 隨棗市(code=048),mqurwg 荊門市(code=049),hfTlue 江漢(仙桃)(code=050),EfiXec 中國(code=051),xGuihq # 不能轉碼數據 中國(code=052),niMlrb 中國(code=053),fHvIpU 中國(code=054),MdqqCb 中國(code=055),CFgNmM ...
廣播流數據更新以下:
new config : 150,xxx # 獲取當新配置數據 中國(code=148),fLtwye 中國(code=149),bEJfMP new config : 151,fff xxx(code=150),TTIPii # 新配置數據轉碼數據 fff(code=151),iJSAjJ 中國(code=152),yBvlUZ new config : 152,ggg
搞定