本文全部的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。html
上週四在 Flink 中文社區釘釘羣中直播分享了《Demo:基於 Flink SQL 構建流式應用》,直播內容偏向實戰演示。這篇文章是對直播內容的一個總結,而且改善了部份內容,好比除 Flink 外其餘組件所有采用 Docker Compose 安裝,簡化準備流程。讀者也能夠結合視頻和本文一塊兒學習。完整分享能夠觀看視頻回顧:https://www.bilibili.com/vide...java
Flink 1.10.0 於近期剛發佈,釋放了許多使人激動的新特性。尤爲是 Flink SQL 模塊,發展速度很是快,所以本文特地從實踐的角度出發,帶領你們一塊兒探索使用 Flink SQL 如何快速構建流式應用。mysql
本文將基於 Kafka, MySQL, Elasticsearch, Kibana,使用 Flink SQL 構建一個電商用戶行爲的實時分析應用。本文全部的實戰演練都將在 Flink SQL CLI 上執行,全程只涉及 SQL 純文本,無需一行 Java/Scala 代碼,無需安裝 IDE。本實戰演練的最終效果圖:git
一臺裝有 Docker 和 Java8 的 Linux 或 MacOS 計算機。github
本實戰演示所依賴的組件全都編排到了容器中,所以能夠經過 docker-compose
一鍵啓動。你能夠經過 wget
命令自動下載該 docker-compose.yml
文件,也能夠手動下載。sql
mkdir flink-demo; cd flink-demo; wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/master/docker-compose.yml
該 Docker Compose 中包含的容器有:docker
docker-compose.yml
中 datagen 的 speedup
參數來調整生成速率(重啓 docker compose 才能生效)。category
),預先填入了子類目與頂級類目的映射關係,後續做爲維表使用。在啓動容器前,建議修改 Docker 的配置,將資源調整到 4GB 以及 4核。啓動全部的容器,只須要在 docker-compose.yml
所在目錄下運行以下命令。數據庫
docker-compose up -d
該命令會以 detached 模式自動啓動 Docker Compose 配置中定義的全部容器。你能夠經過 docker ps
來觀察上述的五個容器是否正常啓動了。 也能夠訪問 http://localhost:5601/ 來查看 Kibana 是否運行正常。apache
另外能夠經過以下命令中止全部的容器:json
docker-compose down
咱們推薦用戶手動下載安裝 Flink,而不是經過 Docker 自動啓動 Flink。由於這樣能夠更直觀地理解 Flink 的各個組件、依賴、和腳本。
flink-1.10.0
):https://www.apache.org/dist/f... cd flink-1.10.0
經過以下命令下載依賴 jar 包,並拷貝到 lib/
目錄下,也可手動下載和拷貝。由於咱們運行時須要依賴各個 connector 實現。
wget -P ./lib/ https://repo1.maven.org/maven... | \
wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7_2.11/1.10.0/flink-sql-connector-elasticsearch7_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar | \ wget -P ./lib/ https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.48/mysql-connector-java-5.1.48.jar
4. 將 `conf/flink-conf.yaml` 中的 `taskmanager.numberOfTaskSlots` 修改爲 10,由於咱們會同時運行多個任務。 5. 執行 `./bin/start-cluster.sh`,啓動集羣。 運行成功的話,能夠在 http://localhost:8081 訪問到 Flink Web UI。而且能夠看到可用 Slots 數爲 10 個。 ![2.jpg](https://ucc.alicdn.com/pic/developer-ecology/ff8fa73598584c83aadea3414ecc4b98.jpg) 6. 執行 `bin/sql-client.sh embedded` 啓動 SQL CLI。便會看到以下的松鼠歡迎界面。 ![3.png](https://ucc.alicdn.com/pic/developer-ecology/35a3fab21cbe4fe4bda635c7964b34a6.png) ## 使用 DDL 建立 Kafka 表 Datagen 容器在啓動後會往 Kafka 的 `user_behavior` topic 中持續不斷地寫入數據。數據包含了2017年11月27日一天的用戶行爲(行爲包括點擊、購買、加購、喜歡),每一行表示一條用戶行爲,以 JSON 的格式由用戶ID、商品ID、商品類目ID、行爲類型和時間組成。該原始數據集來自[阿里雲天池公開數據集](https://tianchi.aliyun.com/dataset/dataDetail?dataId=649),特此鳴謝。 咱們能夠在 `docker-compose.yml` 所在目錄下運行以下命令,查看 Kafka 集羣中生成的前10條數據。
docker-compose exec kafka bash -c 'kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10'
{"user_id": "952483", "item_id":"310884", "category_id": "4580532", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
{"user_id": "794777", "item_id":"5119439", "category_id": "982926", "behavior": "pv", "ts": "2017-11-27T00:00:00Z"}
...
有了數據源後,咱們就能夠用 DDL 去建立並鏈接這個 Kafka 中的 topic 了。在 Flink SQL CLI 中執行該 DDL。
CREATE TABLE user_behavior (
user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING, ts TIMESTAMP(3), proctime as PROCTIME(), -- 經過計算列產生一個處理時間列 WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定義watermark,ts成爲事件時間列
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本 'connector.topic' = 'user_behavior', -- kafka topic 'connector.startup-mode' = 'earliest-offset', -- 從起始 offset 開始讀取 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址 'format.type' = 'json' -- 數據源格式爲 json
);
如上咱們按照數據的格式聲明瞭 5 個字段,除此以外,咱們還經過計算列語法和 `PROCTIME()` 內置函數聲明瞭一個產生處理時間的虛擬列。咱們還經過 WATERMARK 語法,在 ts 字段上聲明瞭 watermark 策略(容忍5秒亂序), ts 字段所以也成了事件時間列。關於時間屬性以及 DDL 語法能夠閱讀官方文檔瞭解更多: - 時間屬性:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/time_attributes.html - DDL:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html#create-table 在 SQL CLI 中成功建立 Kafka 表後,能夠經過 `show tables;` 和 `describe user_behavior;` 來查看目前已註冊的表,以及表的詳細信息。咱們也能夠直接在 SQL CLI 中運行 `SELECT * FROM user_behavior;` 預覽下數據(按`q`退出)。 接下來,咱們會經過三個實戰場景來更深刻地瞭解 Flink SQL 。 ## 統計每小時的成交量 ### 使用 DDL 建立 Elasticsearch 表 咱們先在 SQL CLI 中建立一個 ES 結果表,根據場景需求主要須要保存兩個數據:小時、成交量。
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT, buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector 'connector.version' = '6', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本 'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址 'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,至關於數據庫的表名 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,至關於數據庫的庫名 'connector.bulk-flush.max-actions' = '1', -- 每條數據都刷新 'format.type' = 'json', -- 輸出數據格式 json 'update-mode' = 'append'
);
咱們不須要在 Elasticsearch 中事先建立 `buy_cnt_per_hour` 索引,Flink Job 會自動建立該索引。 ### 提交 Query 統計每小時的成交量就是每小時共有多少 "buy" 的用戶行爲。所以會須要用到 TUMBLE 窗口函數,按照一小時切窗。而後每一個窗口分別統計 "buy" 的個數,這能夠經過先過濾出 "buy" 的數據,而後 `COUNT(*)` 實現。
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
這裏咱們使用 `HOUR` 內置函數,從一個 TIMESTAMP 列中提取出一天中第幾個小時的值。使用了 `INSERT INTO`將 query 的結果持續不斷地插入到上文定義的 es 結果表中(能夠將 es 結果表理解成 query 的物化視圖)。另外能夠閱讀該文檔瞭解更多關於窗口聚合的內容:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#group-windows 在 Flink SQL CLI 中運行上述查詢後,在 Flink Web UI 中就能看到提交的任務,該任務是一個流式任務,所以會一直運行。 ![4.jpg](https://ucc.alicdn.com/pic/developer-ecology/a6c063cce7074622be7e586206a8ef2d.jpg) ### 使用 Kibana 可視化結果 咱們已經經過 Docker Compose 啓動了 Kibana 容器,能夠經過 http://localhost:5601 訪問 Kibana。首先咱們須要先配置一個 index pattern。點擊左側工具欄的 "Management",就能找到 "Index Patterns"。點擊 "Create Index Pattern",而後經過輸入完整的索引名 "buy_cnt_per_hour" 建立 index pattern。建立完成後, Kibana 就知道了咱們的索引,咱們就能夠開始探索數據了。 先點擊左側工具欄的"Discovery"按鈕,Kibana 就會列出剛剛建立的索引中的內容。 ![5.jpg](https://ucc.alicdn.com/pic/developer-ecology/1d072be0fcb34ac7befa50c551806793.jpg) 接下來,咱們先建立一個 Dashboard 用來展現各個可視化的視圖。點擊頁面左側的"Dashboard",建立一個名爲 」用戶行爲日誌分析「 的Dashboard。而後點擊 "Create New" 建立一個新的視圖,選擇 "Area" 面積圖,選擇 "buy_cnt_per_hour" 索引,按照以下截圖中的配置(左側)畫出成交量面積圖,並保存爲」每小時成交量「。 ![6.jpg](https://ucc.alicdn.com/pic/developer-ecology/86f9ff9ddc39443d927f86222e46868a.jpg) 能夠看到凌晨是一天中成交量的低谷。 ## 統計一天每10分鐘累計獨立用戶數 另外一個有意思的可視化是統計一天中每一刻的累計獨立用戶數(uv),也就是每一刻的 uv 數都表明從0點到當前時刻爲止的總計 uv 數,所以該曲線確定是單調遞增的。 咱們仍然先在 SQL CLI 中建立一個 Elasticsearch 表,用於存儲結果彙總數據。主要有兩個字段:時間和累積 uv 數。
CREATE TABLE cumulative_uv (
time_str STRING, uv BIGINT
) WITH (
'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'cumulative_uv', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert'
);
爲了實現該曲線,咱們能夠先經過 OVER WINDOW 計算出每條數據的當前分鐘,以及當前累計 uv(從0點開始到當前行爲止的獨立用戶數)。 uv 的統計咱們經過內置的 `COUNT(DISTINCT user_id)`來完成,Flink SQL 內部對 COUNT DISTINCT 作了很是多的優化,所以能夠放心使用。
CREATE VIEW uv_per_10min AS
SELECT
MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
這裏咱們使用 `SUBSTR` 和 `DATE_FORMAT` 還有 `||` 內置函數,將一個 TIMESTAMP 字段轉換成了 10分鐘單位的時間字符串,如: `12:10`, `12:20`。關於 OVER WINDOW 的更多內容能夠參考文檔:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/queries.html#aggregations 咱們還使用了 CREATE VIEW 語法將 query 註冊成了一個邏輯視圖,能夠方便地在後續查詢中對該 query 進行引用,這有利於拆解複雜 query。注意,建立邏輯視圖不會觸發做業的執行,視圖的結果也不會落地,所以使用起來很是輕量,沒有額外開銷。因爲 `uv_per_10min` 每條輸入數據都產生一條輸出數據,所以對於存儲壓力較大。咱們能夠基於 `uv_per_10min` 再根據分鐘時間進行一次聚合,這樣每10分鐘只有一個點會存儲在 Elasticsearch 中,對於 Elasticsearch 和 Kibana 可視化渲染的壓力會小不少。
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
提交上述查詢後,在 Kibana 中建立 `cumulative_uv` 的 index pattern,而後在 Dashboard 中建立一個"Line"折線圖,選擇 `cumulative_uv` 索引,按照以下截圖中的配置(左側)畫出累計獨立用戶數曲線,並保存。 ![7.jpg](https://ucc.alicdn.com/pic/developer-ecology/d3a79eaf1ce44cc6bfa271affa1d4e47.jpg) ![渠道文章宣傳內頁.png](/img/bVbDSHZ) ## 頂級類目排行榜 最後一個有意思的可視化是類目排行榜,從而瞭解哪些類目是支柱類目。不過因爲源數據中的類目分類太細(約5000個類目),對於排行榜意義不大,所以咱們但願能將其歸約到頂級類目。因此筆者在 mysql 容器中預先準備了子類目與頂級類目的映射數據,用做維表。 在 SQL CLI 中建立 MySQL 表,後續用做維表查詢。
CREATE TABLE category_dim (
sub_category_id BIGINT, -- 子類目 parent_category_id BIGINT -- 頂級類目
) WITH (
'connector.type' = 'jdbc', 'connector.url' = 'jdbc:mysql://localhost:3306/flink', 'connector.table' = 'category', 'connector.driver' = 'com.mysql.jdbc.Driver', 'connector.username' = 'root', 'connector.password' = '123456', 'connector.lookup.cache.max-rows' = '5000', 'connector.lookup.cache.ttl' = '10min'
);
同時咱們再建立一個 Elasticsearch 表,用於存儲類目統計結果。
CREATE TABLE top_category (
category_name STRING, -- 類目名稱 buy_cnt BIGINT -- 銷量
) WITH (
'connector.type' = 'elasticsearch', 'connector.version' = '6', 'connector.hosts' = 'http://localhost:9200', 'connector.index' = 'top_category', 'connector.document-type' = 'user_behavior', 'format.type' = 'json', 'update-mode' = 'upsert'
);
第一步咱們經過維表關聯,補全類目名稱。咱們仍然使用 CREATE VIEW 將該查詢註冊成一個視圖,簡化邏輯。維表關聯使用 temporal join 語法,能夠查看文檔瞭解更多:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/streaming/joins.html#join-with-a-temporal-table
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior,
CASE C.parent_category_id
WHEN 1 THEN '服飾鞋包' WHEN 2 THEN '家裝家飾' WHEN 3 THEN '家電' WHEN 4 THEN '美妝' WHEN 5 THEN '母嬰' WHEN 6 THEN '3C數碼' WHEN 7 THEN '運動戶外' WHEN 8 THEN '食品' ELSE '其餘'
END AS category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
最後根據 類目名稱分組,統計出 `buy` 的事件數,並寫入 Elasticsearch 中。
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
提交上述查詢後,在 Kibana 中建立 `top_category` 的 index pattern,而後在 Dashboard 中建立一個"Horizontal Bar"條形圖,選擇 `top_category` 索引,按照以下截圖中的配置(左側)畫出類目排行榜,並保存。 ![8.jpg](https://ucc.alicdn.com/pic/developer-ecology/7d2fc57ec04041239b67c308324997e3.jpg) 能夠看到服飾鞋包的成交量遠遠領先其餘類目。 到目前爲止,咱們已經完成了三個實戰案例及其可視化視圖。如今能夠回到 Dashboard 頁面,對各個視圖進行拖拽編排,讓咱們的 Dashboard 看上去更加正式、直觀(如本文開篇效果圖)。固然,Kibana 還提供了很是豐富的圖形和可視化選項,而用戶行爲數據中也有不少有意思的信息值得挖掘,感興趣的讀者能夠用 Flink SQL 對數據進行更多維度的分析,並使用 Kibana 展現更多可視化圖,並觀測圖形數據的實時變化。 ## 結尾