社區中有好幾個同窗問過這樣的場景: html
flink 任務中,source 進來的數據,須要鏈接數據庫裏面的字段,再作後面的處理
這裏假設一個 ETL 的場景,輸入數據包含兩個字段 「type, userid....」 ,須要根據 type,鏈接一張 mysql 的配置表,關聯 type 對應的具體內容。相對於輸入數據的數量,type 的值是不多的(這裏默認只有10種), 因此對應配置表就只有10條數據,配置是會定時修改的(好比跑批補充數據),配置的修改必須在必定時間內生效。mysql
實時 ETL,須要用裏面的一個字段去關聯數據庫,補充其餘數據,進來的數據中關聯字段是很單一的(就10個),對應數據庫的數據也不多,若是用 異步 IO,感受會比較傻(浪費資源、性能還很差)。同時數據庫的數據是會不定時修改的,因此不能在啓動的時候一次性加載。sql
Flink 如今對應這種場景可使用 Boradcase state 作,如:基於Broadcast 狀態的Flink Etl Demo數據庫
這裏想說的是另外一種更簡單的方法: 使用定時器,定時加載數據庫的數據 (就是簡單的Java定時器)apache
先說一下代碼流程:api
一、自定義的 source,輸入逗號分隔的兩個字段服務器
二、使用 RichMapFunction 轉換數據,在 open 中定義定時器,定時觸發查詢 mysql 的任務,並將結果放到一個 map 中異步
三、輸入數據關聯 map 的數據,而後輸出ide
先看下數據庫中的數據:post
mysql> select * from timer;
+------+------+
| id | name |
+------+------+
| 0 | 0zOq |
| 1 | 1hKC |
| 2 | 2ibM |
| 3 | 3fCe |
| 4 | 4TaM |
| 5 | 5URU |
| 6 | 6WhP |
| 7 | 7zjn |
| 8 | 8Szl |
| 9 | 9blS |
+------+------+
10 rows in set (0.01 sec)
總共10條數據,id 就是對應的關聯字段,須要填充的數據是 name
下面是主要的代碼:
// 自定義的source,輸出 x,xxx 格式隨機字符 val input = env.addSource(new TwoStringSource) val stream = input.map(new RichMapFunction[String, String] { val jdbcUrl = "jdbc:mysql://venn:3306?useSSL=false&allowPublicKeyRetrieval=true" val username = "root" val password = "123456" val driverName = "com.mysql.jdbc.Driver" var conn: Connection = null var ps: PreparedStatement = null val map = new util.HashMap[String, String]() override def open(parameters: Configuration): Unit = { logger.info("init....") query() // new Timer val timer = new Timer(true) // schedule is 10 second, 5 second between successive task executions 定義了一個10秒的定時器,定時執行查詢數據庫的方法 timer.schedule(new TimerTask { override def run(): Unit = { query() } }, 10000) } override def map(value: String): String = { // concat input and mysql data,簡單關聯輸出 value + "-" + map.get(value.split(",")(0)) } /** * query mysql for get new config data */ def query() = { logger.info("query mysql") try { Class.forName(driverName) conn = DriverManager.getConnection(jdbcUrl, username, password) ps = conn.prepareStatement("select id,name from venn.timer") val rs = ps.executeQuery while (!rs.isClosed && rs.next) { val id = rs.getString(1) val name = rs.getString(2)
// 將結果放到 map 中 map.put(id, name) } logger.info("get config from db size : {}", map.size()) } catch { case e@(_: ClassNotFoundException | _: SQLException) => e.printStackTrace() } finally { ps.close() conn.close() } } }) // .print() val sink = new FlinkKafkaProducer[String]("timer_out" , new MyKafkaSerializationSchema[String]() , Common.getProp , FlinkKafkaProducer.Semantic.EXACTLY_ONCE) stream.addSink(sink)
簡單的Java定時器:
val timer = new Timer(true) // schedule is 10 second, 5 second between successive task executions timer.schedule(new TimerTask { override def run(): Unit = { query() } }, 10000)
看下輸出的數據:
7,N-7zjn 7,C-7zjn 7,U-7zjn 4,T-4TaM 7,J-7zjn 9,R-9blS 4,C-4TaM 9,T-9blS 4,A-4TaM 6,I-6WhP 9,U-9blS
注:「-」 以前是原始數據,後面是關聯後的數據
部署到服務器上定時器的調度:
2019-09-28 18:28:13,476 INFO com.venn.stream.api.timer.CustomerTimerDemo$ - query mysql 2019-09-28 18:28:13,480 INFO com.venn.stream.api.timer.CustomerTimerDemo$ - get config from db size : 10 2019-09-28 18:28:18,553 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 0/1 - checkpoint 17 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666488499} from checkpoint 17 2019-09-28 18:28:23,476 INFO com.venn.stream.api.timer.CustomerTimerDemo$ - query mysql 2019-09-28 18:28:23,481 INFO com.venn.stream.api.timer.CustomerTimerDemo$ - get config from db size : 10 2019-09-28 18:28:28,549 INFO org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction - FlinkKafkaProducer 0/1 - checkpoint 18 complete, committing transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=null, producerId=-1, epoch=-1], transactionStartTime=1569666498505} from checkpoint 18 2019-09-28 18:28:33,477 INFO com.venn.stream.api.timer.CustomerTimerDemo$ - query mysql 2019-09-28 18:28:33,484 INFO com.venn.stream.api.timer.CustomerTimerDemo$ - get config from db size : 10
十秒調度一次
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文