Apache Flink 零基礎入門(八): SQL 編程實踐

做者:伍翀(雲邪)git

本文是 Apache Flink 零基礎入門系列文章第八篇,將經過五個實例講解 Flink SQL 的編程實踐。github

注: 本教程實踐基於 Ververica 開源的 sql-training 項目。基於 Flink 1.7.2 。sql

經過本課你能學到什麼?

本文將經過五個實例來貫穿 Flink SQL 的編程實踐,主要會涵蓋如下幾個方面的內容。docker

  1. 如何使用 SQL CLI 客戶端
  2. 如何在流上運行 SQL 查詢
  3. 運行 window aggregate 與 non-window aggregate,理解其區別
  4. 如何用 SQL 消費 Kafka 數據
  5. 如何用 SQL 將結果寫入 Kafka 和 ElasticSearch

本文假定您已具有基礎的 SQL 知識。數據庫

環境準備

本文教程是基於 Docker 進行的,所以你只須要安裝了 Docker 便可。不須要依賴 Java、Scala 環境、或是IDE。編程

注意:Docker 默認配置的資源可能不太夠,會致使運行 Flink Job 時卡死。所以推薦配置 Docker 資源到 3-4 GB,3-4 CPUs。bootstrap

本次教程的環境使用 Docker Compose 來安裝,包含了所需的各類服務的容器,包括:性能優化

  • Flink SQL Client:用來提交query,以及可視化結果
  • Flink JobManager 和 TaskManager:用來運行 Flink SQL 任務。
  • Apache Kafka:用來生成輸入流和寫入結果流。
  • Apache Zookeeper:Kafka 的依賴項
  • ElasticSearch:用來寫入結果

咱們已經提供好了Docker Compose 配置文件,能夠直接下載 docker-compose.yml 文件。bash

而後打開命令行窗口,進入存放 docker-compose.yml 文件的目錄,而後運行如下命令:微信

  • Linux & MacOS
docker-compose up -d
複製代碼
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d
複製代碼

docker-compose 命令會啓動全部所需的容器。第一次運行的時候,Docker 會自動地從 Docker Hub 下載鏡像,這可能會須要一段時間(將近 2.3GB)。以後運行的話,幾秒鐘就能啓動起來了。運行成功的話,會在命令行中看到如下輸出,而且也能夠在 http://localhost:8081 訪問到 Flink Web UI。

運行 Flink SQL CLI 客戶端

運行下面命令進入 Flink SQL CLI 。

docker-compose exec sql-client ./sql-client.sh
複製代碼

該命令會在容器中啓動 Flink SQL CLI 客戶端。而後你會看到以下的歡迎界面。

數據介紹

Docker Compose 中已經預先註冊了一些表和數據,能夠運行 SHOW TABLES; 來查看。本文會用到的數據是 Rides 表,這是一張出租車的行車記錄數據流,包含了時間和位置信息,運行 DESCRIBE Rides; 能夠查看錶結構。

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: Long           // 行爲ID (包含兩條記錄,一條入一條出)
 |-- taxiId: Long           // 出租車ID 
 |-- isStart: Boolean       // 開始 or 結束
 |-- lon: Float             // 經度
 |-- lat: Float             // 緯度
 |-- rideTime: TimeIndicatorTypeInfo(rowtime)     // 時間
 |-- psgCnt: Integer        // 乘客數
複製代碼

Rides 表的詳細定義見 training-config.yaml

實例1:過濾

例如咱們如今只想查看發生在紐約的行車記錄

注:Docker 環境中已經預約義了一些內置函數,如 isInNYC(lon, lat) 能夠肯定一個經緯度是否在紐約,toAreaId(lon, lat) 能夠將經緯度轉換成區塊。

所以,此處咱們可使用 isInNYC 來快速過濾出紐約的行車記錄。在 SQL CLI 中運行以下 Query:

SELECT * FROM Rides WHERE isInNYC(lon, lat);
複製代碼

SQL CLI 便會提交一個 SQL 任務到 Docker 集羣中,從數據源(Rides 流存儲在Kafka中)不斷拉取數據,並經過 isInNYC 過濾出所需的數據。SQL CLI 也會進入可視化模式,並不斷刷新展現過濾後的結果:

也能夠到 http://localhost:8081 查看 Flink 做業的運行狀況。

實例2:Group Aggregate

咱們的另外一個需求是計算搭載每種乘客數量的行車事件數。也就是搭載1個乘客的行車數、搭載2個乘客的行車... 固然,咱們仍然只關心紐約的行車事件。

所以,咱們能夠按照乘客數psgCnt作分組,使用 COUNT(*) 計算出每一個分組的事件數,注意在分組前須要先過濾出isInNYC的數據。在 SQL CLI 中運行以下 Query:

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;
複製代碼

SQL CLI 的可視化結果以下所示,結果每秒都在發生變化。不過最大的乘客數不會超過 6 人。

實例3:Window Aggregate

爲了持續地監測紐約的交通流量,須要計算出每一個區塊每5分鐘的進入的車輛數。咱們只關心至少有5輛車子進入的區塊。

此處須要涉及到窗口計算(每5分鐘),因此須要用到 Tumbling Window 的語法。「每一個區塊」 因此還要按照 toAreaId 進行分組計算。「進入的車輛數」 因此在分組前須要根據 isStart 字段過濾出進入的行車記錄,並使用 COUNT(*) 統計車輛數。最後還有一個 「至少有5輛車子的區塊」 的條件,這是一個基於統計值的過濾條件,因此能夠用 SQL HAVING 子句來完成。

最後的 Query 以下所示:

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;
複製代碼

在 SQL CLI 中運行後,其可視化結果以下所示,每一個 area + window_end 的結果輸出後就不會再發生變化,可是會每隔 5 分鐘會輸出一批新窗口的結果。由於 Docker 環境中的source咱們作了10倍的加速讀取(相對於原始速度),因此演示的時候,大概每隔30秒就會輸出一批新窗口。

Window Aggregate 與 Group Aggregate 的區別

從實例2和實例3的結果顯示上,能夠體驗出來 Window Aggregate 與 Group Aggregate 是有一些明顯的區別的。其主要的區別是,Window Aggregate 是當window結束時才輸出,其輸出的結果是最終值,不會再進行修改,其輸出流是一個 Append 流。而 Group Aggregate 是每處理一條數據,就輸出最新的結果,其結果是在不斷更新的,就好像數據庫中的數據同樣,其輸出流是一個 Update 流

另一個區別是,window 因爲有 watermark ,能夠精確知道哪些窗口已通過期了,因此能夠及時清理過時狀態,保證狀態維持在穩定的大小。而 Group Aggregate 由於不知道哪些數據是過時的,因此狀態會無限增加,這對於生產做業來講不是很穩定,因此建議對 Group Aggregate 的做業配上 State TTL 的配置。

例如統計每一個店鋪天天的實時PV,那麼就能夠將 TTL 配置成 24+ 小時,由於一天前的狀態通常來講就用不到了。

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id
複製代碼

固然,若是 TTL 配置地過小,可能會清除掉一些有用的狀態和數據,從而致使數據精確性地問題。這也是用戶須要權衡地一個參數。

實例4:將 Append 流寫入 Kafka

上一小節介紹了 Window Aggregate 和 Group Aggregate 的區別,以及 Append 流和 Update 流的區別。在 Flink 中,目前 Update 流只能寫入支持更新的外部存儲,如 MySQL, HBase, ElasticSearch。Append 流能夠寫入任意地存儲,不過通常寫入日誌類型的系統,如 Kafka。

這裏咱們但願將**「每10分鐘的搭乘的乘客數」**寫入Kafka。

咱們已經預約義了一張 Kafka 的結果表 Sink_TenMinPsgCntstraining-config.yaml 中有完整的表定義)。

在執行 Query 前,咱們先運行以下命令,來監控寫入到 TenMinPsgCnts topic 中的數據:

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
複製代碼

每10分鐘的搭乘的乘客數可使用 Tumbling Window 來描述,咱們使用 INSERT INTO Sink_TenMinPsgCnts 來直接將 Query 結果寫入到結果表。

INSERT INTO Sink_TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,  
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  CAST(SUM(psgCnt) AS BIGINT) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
複製代碼

咱們能夠監控到 TenMinPsgCnts topic 的數據以 JSON 的形式寫入到了 Kafka 中:

實例5:將 Update 流寫入 ElasticSearch

最後咱們實踐一下將一個持續更新的 Update 流寫入 ElasticSearch 中。咱們但願將**「每一個區域出發的行車數」**,寫入到 ES 中。

咱們也已經預約義好了一張 Sink_AreaCnts 的 ElasticSearch 結果表(training-config.yaml 中有完整的表定義)。該表中只有兩個字段 areaIdcnt

一樣的,咱們也使用 INSERT INTO 將 Query 結果直接寫入到 Sink_AreaCnts 表中。

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt 
FROM Rides 
WHERE isStart
GROUP BY toAreaId(lon, lat);
複製代碼

在 SQL CLI 中執行上述 Query 後,Elasticsearch 會自動地建立 area-cnts 索引。Elasticsearch 提供了一個 REST API 。咱們能夠訪問

隨着 Query 的一直運行,你也能夠觀察到一些統計值(_all.primaries.docs.count, _all.primaries.docs.deleted)在不斷的增加:http://localhost:9200/area-cnts/_stats

總結

本文帶你們使用 Docker Compose 快速上手 Flink SQL 的編程,並對比 Window Aggregate 和 Group Aggregate 的區別,以及這兩種類型的做業如何寫入到 外部系統中。感興趣的同窗,能夠基於這個 Docker 環境更加深刻地去實踐,例如運行本身寫的 UDF , UDTF, UDAF。查詢內置地其餘源表等等。


▼ Apache Flink 社區推薦 ▼

Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:

developer.aliyun.com/special/ffa…

首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:

tianchi.aliyun.com/markets/tia…

關注 Flink 官方社區微信公衆號,瞭解更多 Flink 資訊!

相關文章
相關標籤/搜索