Flink 1.9 實戰:使用 SQL 讀取 Kafka 並寫入 MySQL

上週六在深圳分享了《Flink SQL 1.9.0 技術內幕和最佳實踐》,會後許多小夥伴對最後演示環節的 Demo 代碼很是感興趣,火燒眉毛地想嘗試下,因此寫了這篇文章分享下這份代碼。但願對於 Flink SQL 的初學者能有所幫助。完整分享能夠觀看 Meetup 視頻回顧 :https://developer.aliyun.com/live/1416html

演示代碼已經開源到了 GitHub 上:https://github.com/wuchong/flink-sql-submitjava

這份代碼主要由兩部分組成:1) 能用來提交 SQL 文件的 SqlSubmit 實現。2) 用於演示的 SQL 示例、Kafka 啓動中止腳本、 一份測試數據集、Kafka 數據源生成器。mysql

經過本實戰,你將學到:git

  1. 如何使用 Blink Planner
  2. 一個簡單的 SqlSubmit 是如何實現的
  3. 如何用 DDL 建立一個 Kafka 源表和 MySQL 結果表
  4. 運行一個從 Kafka 讀取數據,計算 PVUV,並寫入 MySQL 的做業
  5. 設置調優參數,觀察對做業的影響

SqlSubmit 的實現

筆者一開始是想用 SQL Client 來貫穿整個演示環節,但惋惜 1.9 版本 SQL CLI 還不支持處理 CREATE TABLE 語句。因此筆者就只好本身寫了個簡單的提交腳本。後來想一想,也挺好的,可讓聽衆同時瞭解如何經過 SQL 的方式,和編程的方式使用 Flink SQL。github

SqlSubmit 的主要任務是執行和提交一個 SQL 文件,實現很是簡單,就是經過正則表達式匹配每一個語句塊。若是是 CREATE TABLE 或 INSERT INTO 開頭,則會調用 tEnv.sqlUpdate(...)。若是是 SET 開頭,則會將配置設置到 TableConfig 上。其核心代碼主要以下所示:正則表達式

EnvironmentSettings settings = EnvironmentSettings.newInstance()
        .useBlinkPlanner()
        .inStreamingMode()
        .build();
// 建立一個使用 Blink Planner 的 TableEnvironment, 並工做在流模式
TableEnvironment tEnv = TableEnvironment.create(settings);
// 讀取 SQL 文件
List<String> sql = Files.readAllLines(path);
// 經過正則表達式匹配前綴,來區分不一樣的 SQL 語句
List<SqlCommandCall> calls = SqlCommandParser.parse(sql);
// 根據不一樣的 SQL 語句,調用 TableEnvironment 執行
for (SqlCommandCall call : calls) {
  switch (call.command) {
    case SET:
      String key = call.operands[0];
      String value = call.operands[1];
      // 設置參數
      tEnv.getConfig().getConfiguration().setString(key, value);
      break;
    case CREATE_TABLE:
      String ddl = call.operands[0];
      tEnv.sqlUpdate(ddl);
      break;
    case INSERT_INTO:
      String dml = call.operands[0];
      tEnv.sqlUpdate(dml);
      break;
    default:
      throw new RuntimeException("Unsupported command: " + call.command);
  }
}
// 提交做業
tEnv.execute("SQL Job");

使用 DDL 鏈接 Kafka 源表

在 flink-sql-submit 項目中,咱們準備了一份測試數據集(來自阿里雲天池公開數據集,特別鳴謝),位於 src/main/resources/user_behavior.log。數據以 JSON 格式編碼,大概長這個樣子:sql

{"user_id": "543462", "item_id":"1715", "category_id": "1464116", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}
{"user_id": "662867", "item_id":"2244074", "category_id": "1575622", "behavior": "pv", "ts": "2017-11-26T01:00:00Z"}

爲了模擬真實的 Kafka 數據源,筆者還特意寫了一個 source-generator.sh 腳本(感興趣的能夠看下源碼),會自動讀取 user_behavior.log 的數據並以默認每毫秒1條的速率灌到 Kafka 的 user_behavior topic 中。docker

有了數據源後,咱們就能夠用 DDL 去建立並鏈接這個 Kafka 中的 topic(詳見 src/main/resources/q1.sql)。數據庫

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka', -- 使用 kafka connector
    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本
    'connector.topic' = 'user_behavior',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取
    'connector.properties.0.key' = 'zookeeper.connect',  -- 鏈接信息
    'connector.properties.0.value' = 'localhost:2181', 
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'localhost:9092', 
    'update-mode' = 'append',
    'format.type' = 'json',  -- 數據源格式爲 json
    'format.derive-schema' = 'true' -- 從 DDL schema 肯定 json 解析規則
)

注:可能有用戶會以爲其中的 connector.properties.0.key 等參數比較奇怪,社區計劃將在下一個版本中改進並簡化 connector 的參數配置。apache

使用 DDL 鏈接 MySQL 結果表

鏈接 MySQL 可使用 Flink 提供的 JDBC connector。例如

CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- jdbc url
    'connector.table' = 'pvuv_sink', -- 表名
    'connector.username' = 'root', -- 用戶名
    'connector.password' = '123456', -- 密碼
    'connector.write.flush.max-rows' = '1' -- 默認5000條,爲了演示改成1條
)

PV UV 計算

假設咱們的需求是計算每小時全網的用戶訪問量,和獨立用戶數。不少用戶可能會想到使用滾動窗口來計算。但這裏咱們介紹另外一種方式。即 Group Aggregation 的方式。

INSERT INTO pvuv_sink
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00')

它使用 DATE_FORMAT 這個內置函數,將日誌時間歸一化成「年月日小時」的字符串格式,並根據這個字符串進行分組,即根據每小時分組,而後經過 COUNT(*) 計算用戶訪問量(PV),經過 COUNT(DISTINCT user_id) 計算獨立用戶數(UV)。這種方式的執行模式是每收到一條數據,便會進行基於以前計算的值作增量計算(如+1),而後將最新結果輸出。因此實時性很高,但輸出量也大。

咱們將這個查詢的結果,經過 INSERT INTO 語句,寫到了以前定義的 pvuv_sink MySQL 表中。

注:在深圳 Meetup 中,咱們有對這種查詢的性能調優作了深度的介紹。

實戰演示

環境準備

本實戰演示環節須要安裝一些必須的服務,包括:

  • Flink 本地集羣:用來運行 Flink SQL 任務。
  • Kafka 本地集羣:用來做爲數據源。
  • MySQL 數據庫:用來做爲結果表。
  • Flink 本地集羣安裝

1.下載 Flink 1.9.0 安裝包並解壓:https://www.apache.org/dist/flink/flink-1.9.0/flink-1.9.0-bin-scala_2.11.tgz
2.下載如下依賴 jar 包,並拷貝到 flink-1.9.0/lib/ 目錄下。由於咱們運行時須要依賴各個 connector 實現。

3.將 flink-1.9.0/conf/flink-conf.yaml 中的 taskmanager.numberOfTaskSlots 修改爲 10,由於咱們的演示任務可能會消耗多於1個的 slot。
4.在 flink-1.9.0 目錄下執行 ./bin/start-cluster.sh,啓動集羣。

運行成功的話,能夠在 http://localhost:8081 訪問到 Flink Web UI。

另外,還須要將 Flink 的安裝路徑填到 flink-sql-submit 項目的 env.sh 中,用於後面提交 SQL 任務,如個人路徑是

FLINK_DIR=/Users/wuchong/dev/install/flink-1.9.0

Kafka 本地集羣安裝

下載 Kafka 2.2.0 安裝包並解壓:https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz

將安裝路徑填到 flink-sql-submit 項目的 env.sh 中,如個人路徑是

KAFKA_DIR=/Users/wuchong/dev/install/kafka_2.11-2.2.0

在 flink-sql-submit 目錄下運行 ./start-kafka.sh 啓動 Kafka 集羣。

在命令行執行 jps,若是看到 Kafka 進程和 QuorumPeerMain 進程即代表啓動成功。

MySQL 安裝

能夠在官方頁面下載 MySQL 並安裝:
https://dev.mysql.com/downloads/mysql/
若是有 Docker 環境的話,也能夠直接經過 Docker 安裝
https://hub.docker.com/_/mysql

$ docker pull mysql
$ docker run --name mysqldb -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql

而後在 MySQL 中建立一個 flink-test 的數據庫,並按照上文的 schema 建立 pvuv_sink 表。

提交 SQL 任務

1.在 flink-sql-submit 目錄下運行 ./source-generator.sh,會自動建立 user_behavior topic,並實時往裏灌入數據。

2.在 flink-sql-submit 目錄下運行 ./run.sh q1, 提交成功後,能夠在 Web UI 中看到拓撲。

在 MySQL 客戶端,咱們也能夠實時地看到每一個小時的 pv uv 值在不斷地變化

結尾

本文帶你們搭建基礎集羣環境,並使用 SqlSubmit 提交純 SQL 任務來學習瞭解如何鏈接外部系統。flink-sql-submit/src/main/resources/q1.sql 中還有一些註釋掉的調優參數,感興趣的同窗能夠將參數打開,觀察對做業的影響。關於這些調優參數的原理,能夠看下我在 深圳 Meetup 上的分享《Flink SQL 1.9.0 技術內幕和最佳實踐》。

 

本文做者:巴蜀真人 

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索