上週六在深圳分享了《Flink SQL 1.9.0 技術內幕和最佳實踐》,會後許多小夥伴對最後演示環節的 Demo 代碼很是感興趣,火燒眉毛地想嘗試下,因此寫了這篇文章分享下這份代碼。但願對於 Flink SQL 的初學者能有所幫助。完整分享能夠觀看 Meetup 視頻回顧 :https://developer.aliyun.com/live/1416html
演示代碼已經開源到了 GitHub 上:https://github.com/wuchong/flink-sql-submitjava
這份代碼主要由兩部分組成:1) 能用來提交 SQL 文件的 SqlSubmit 實現。2) 用於演示的 SQL 示例、Kafka 啓動中止腳本、 一份測試數據集、Kafka 數據源生成器。mysql
經過本實戰,你將學到:git
筆者一開始是想用 SQL Client 來貫穿整個演示環節,但惋惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。因此筆者就只好本身寫了個簡單的提交腳本。後來想一想,也挺好的,可讓聽衆同時瞭解如何經過 SQL 的方式,和編程的方式使用 Flink SQL。github
SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現很是簡單,就是經過正則表達式匹配每一個語句塊。若是是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...)。若是是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要以下所示:正則表達式
EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); // 建立一個使用 Blink Planner 的 TableEnvironment, 並工做在流模式 TableEnvironment tEnv = TableEnvironment.create(settings); // 讀取 SQL 文件 List<String> sql = Files.readAllLines(path); // 經過正則表達式匹配前綴,來區分不一樣的 SQL 語句 List<SqlCommandCall> calls = SqlCommandParser.parse(sql); // 根據不一樣的 SQL 語句,調用 TableEnvironment 執行 for (SqlCommandCall call : calls) { switch (call.command) { case SET: String key = call.operands[0]; String value = call.operands[1]; // 設置參數 tEnv.getConfig().getConfiguration().setString(key, value); break; case CREATE_TABLE: String ddl = call.operands[0]; tEnv.sqlUpdate(ddl); break; case INSERT_INTO: String dml = call.operands[0]; tEnv.sqlUpdate(dml); break; default: throw new RuntimeException("Unsupported command: " + call.command); } } // 提交做業 tEnv.execute("SQL Job");
在 flink-sql-submit 項目中,咱們準備了一份測試數據集(來自阿里雲天池公開數據集,特別鳴謝),位於 src/main/resources/user_behavior.log。數據以 JSON 格式編碼,大概長這個樣子:sql
{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"} {"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
爲了模擬真實的 Kafka 數據源,筆者還特意寫了一個 source-generator.sh 腳本(感興趣的能夠看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。docker
有了數據源後,咱們就能夠用 DDL 去建立並鏈接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql)。數據庫
CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP ) WITH ( 'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取 'connector.properties.0.key' = 'zookeeper.connect', -- 鏈接信息 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', -- 數據源格式爲 json 'format.derive-schema' = 'true' -- 從 DDL schema 肯定 json 解析規則 )
注:可能有用戶會以爲其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。apache
鏈接 MySQL 可使用 Flink 提供的 JDBC connector。例如
CREATE TABLE pvuv_sink ( dt VARCHAR, pv BIGINT, uv BIGINT ) WITH ( 'connector.type' = 'jdbc', -- 使用 jdbc connector 'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url 'connector.table' = 'pvuv_sink', -- 表名 'connector.username' = 'root', -- 用戶名 'connector.password' = '123456', -- 密碼 'connector.write.flush.max-rows' = '1' -- 默認5000條,爲了演示改成1條 )
假設咱們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。不少用戶可能會想到使用滾動窗口來計算。但這裏咱們介紹另外一種方式。即 Group Aggregation 的方式。
INSERT INTO pvuv_sink SELECT DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt, COUNT(*) AS pv, COUNT(DISTINCT user_id) AS uv FROM user_log GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')
它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成「年月日小時」的字符串格式,並根據這個字符串進行分組,即根據每小時分組,而後經過 COUNT(*) 計算用戶訪問量(PV),經過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於以前計算的值作增量計算(如+1),而後將最新結果輸出。因此實時性很高,但輸出量也大。
咱們將這個查詢的結果,經過 INSERT INTO 語句,寫到了以前定義的 pvuv_sink MySQL 表中。
注:在深圳 Meetup 中,咱們有對這種查詢的性能調優作了深度的介紹。
本實戰演示環節須要安裝一些必須的服務,包括:
1.下載 Flink 1.9.0 安裝包並解壓:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
2.下載如下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。由於咱們運行時須要依賴各個 connector 實現。
3.將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改爲 10,由於咱們的演示任務可能會消耗多於1個的 slot。
4.在 flink-1.9.0 目錄下執行 ./bin/start-cluster.sh,啓動集羣。
運行成功的話,能夠在 http://localhost:8081 訪問到 Flink Web UI。
另外,還須要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如個人路徑是
FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0
下載 Kafka 2.2.0 安裝包並解壓:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如個人路徑是
KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0
在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啓動 Kafka 集羣。
在命令行執行 jps,若是看到 Kafka 進程和 QuorumPeerMain 進程即代表啓動成功。
能夠在官方頁面下載 MySQL 並安裝:
https://dev.mysql.com/downloads/mysql/
若是有 Docker 環境的話,也能夠直接經過 Docker 安裝
https://hub.docker.com/_/mysql
$ docker pull mysql $ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql
而後在 MySQL 中建立一個 flink-test
的數據庫,並按照上文的 schema 建立 pvuv_sink
表。
1.在 flink-sql-submit
目錄下運行 ./source-generator.sh
,會自動建立 user_behavior topic
,並實時往裏灌入數據。
2.在 flink-sql-submit
目錄下運行 ./run.sh q1
, 提交成功後,能夠在 Web UI 中看到拓撲。
在 MySQL 客戶端,咱們也能夠實時地看到每一個小時的 pv uv 值在不斷地變化
本文帶你們搭建基礎集羣環境,並使用 SqlSubmit 提交純 SQL 任務來學習瞭解如何鏈接外部系統。flink-sql-submit/src/main/resources/q1.sql
中還有一些註釋掉的調優參數,感興趣的同窗能夠將參數打開,觀察對做業的影響。關於這些調優參數的原理,能夠看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技術內幕和最佳實踐》。
本文做者:巴蜀真人
本文爲雲棲社區原創內容,未經容許不得轉載。