做者:伍翀(雲邪)git
本文是 Apache Flink 零基礎入門系列文章第八篇,將經過五個實例講解 Flink SQL 的編程實踐。github
注: 本教程實踐基於 Ververica 開源的 sql-training 項目。基於 Flink 1.7.2 。sql
本文將經過五個實例來貫穿 Flink SQL 的編程實踐,主要會涵蓋如下幾個方面的內容。docker
本文假定您已具有基礎的 SQL 知識。數據庫
本文教程是基於 Docker 進行的,所以你只須要安裝了 Docker 便可。不須要依賴 Java、Scala 環境、或是IDE。編程
注意:Docker 默認配置的資源可能不太夠,會致使運行 Flink Job 時卡死。所以推薦配置 Docker 資源到 3-4 GB,3-4 CPUs。bootstrap
本次教程的環境使用 Docker Compose 來安裝,包含了所需的各類服務的容器,包括:性能優化
咱們已經提供好了Docker Compose 配置文件,能夠直接下載 docker-compose.yml 文件。bash
而後打開命令行窗口,進入存放 docker-compose.yml
文件的目錄,而後運行如下命令:微信
docker-compose up -d
複製代碼
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d
複製代碼
docker-compose
命令會啓動全部所需的容器。第一次運行的時候,Docker 會自動地從 Docker Hub 下載鏡像,這可能會須要一段時間(將近 2.3GB)。以後運行的話,幾秒鐘就能啓動起來了。運行成功的話,會在命令行中看到如下輸出,而且也能夠在 http://localhost:8081 訪問到 Flink Web UI。
運行下面命令進入 Flink SQL CLI 。
docker-compose exec sql-client ./sql-client.sh
複製代碼
該命令會在容器中啓動 Flink SQL CLI 客戶端。而後你會看到以下的歡迎界面。
Docker Compose 中已經預先註冊了一些表和數據,能夠運行 SHOW TABLES;
來查看。本文會用到的數據是 Rides
表,這是一張出租車的行車記錄數據流,包含了時間和位置信息,運行 DESCRIBE Rides;
能夠查看錶結構。
Flink SQL> DESCRIBE Rides;
root
|-- rideId: Long // 行爲ID (包含兩條記錄,一條入一條出)
|-- taxiId: Long // 出租車ID
|-- isStart: Boolean // 開始 or 結束
|-- lon: Float // 經度
|-- lat: Float // 緯度
|-- rideTime: TimeIndicatorTypeInfo(rowtime) // 時間
|-- psgCnt: Integer // 乘客數
複製代碼
Rides 表的詳細定義見 training-config.yaml。
例如咱們如今只想查看發生在紐約的行車記錄。
注:Docker 環境中已經預約義了一些內置函數,如 isInNYC(lon, lat)
能夠肯定一個經緯度是否在紐約,toAreaId(lon, lat)
能夠將經緯度轉換成區塊。
所以,此處咱們可使用 isInNYC
來快速過濾出紐約的行車記錄。在 SQL CLI 中運行以下 Query:
SELECT * FROM Rides WHERE isInNYC(lon, lat);
複製代碼
SQL CLI 便會提交一個 SQL 任務到 Docker 集羣中,從數據源(Rides 流存儲在Kafka中)不斷拉取數據,並經過 isInNYC
過濾出所需的數據。SQL CLI 也會進入可視化模式,並不斷刷新展現過濾後的結果:
也能夠到 http://localhost:8081 查看 Flink 做業的運行狀況。
咱們的另外一個需求是計算搭載每種乘客數量的行車事件數。也就是搭載1個乘客的行車數、搭載2個乘客的行車... 固然,咱們仍然只關心紐約的行車事件。
所以,咱們能夠按照乘客數psgCnt
作分組,使用 COUNT(*)
計算出每一個分組的事件數,注意在分組前須要先過濾出isInNYC
的數據。在 SQL CLI 中運行以下 Query:
SELECT psgCnt, COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;
複製代碼
SQL CLI 的可視化結果以下所示,結果每秒都在發生變化。不過最大的乘客數不會超過 6 人。
爲了持續地監測紐約的交通流量,須要計算出每一個區塊每5分鐘的進入的車輛數。咱們只關心至少有5輛車子進入的區塊。
此處須要涉及到窗口計算(每5分鐘),因此須要用到 Tumbling Window 的語法。「每一個區塊」 因此還要按照 toAreaId
進行分組計算。「進入的車輛數」 因此在分組前須要根據 isStart
字段過濾出進入的行車記錄,並使用 COUNT(*)
統計車輛數。最後還有一個 「至少有5輛車子的區塊」 的條件,這是一個基於統計值的過濾條件,因此能夠用 SQL HAVING 子句來完成。
最後的 Query 以下所示:
SELECT
toAreaId(lon, lat) AS area,
TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS cnt
FROM Rides
WHERE isInNYC(lon, lat) and isStart
GROUP BY
toAreaId(lon, lat),
TUMBLE(rideTime, INTERVAL '5' MINUTE)
HAVING COUNT(*) >= 5;
複製代碼
在 SQL CLI 中運行後,其可視化結果以下所示,每一個 area + window_end 的結果輸出後就不會再發生變化,可是會每隔 5 分鐘會輸出一批新窗口的結果。由於 Docker 環境中的source咱們作了10倍的加速讀取(相對於原始速度),因此演示的時候,大概每隔30秒就會輸出一批新窗口。
從實例2和實例3的結果顯示上,能夠體驗出來 Window Aggregate 與 Group Aggregate 是有一些明顯的區別的。其主要的區別是,Window Aggregate 是當window結束時才輸出,其輸出的結果是最終值,不會再進行修改,其輸出流是一個 Append 流。而 Group Aggregate 是每處理一條數據,就輸出最新的結果,其結果是在不斷更新的,就好像數據庫中的數據同樣,其輸出流是一個 Update 流。
另一個區別是,window 因爲有 watermark ,能夠精確知道哪些窗口已通過期了,因此能夠及時清理過時狀態,保證狀態維持在穩定的大小。而 Group Aggregate 由於不知道哪些數據是過時的,因此狀態會無限增加,這對於生產做業來講不是很穩定,因此建議對 Group Aggregate 的做業配上 State TTL 的配置。
例如統計每一個店鋪天天的實時PV,那麼就能夠將 TTL 配置成 24+ 小時,由於一天前的狀態通常來講就用不到了。
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id
複製代碼
固然,若是 TTL 配置地過小,可能會清除掉一些有用的狀態和數據,從而致使數據精確性地問題。這也是用戶須要權衡地一個參數。
上一小節介紹了 Window Aggregate 和 Group Aggregate 的區別,以及 Append 流和 Update 流的區別。在 Flink 中,目前 Update 流只能寫入支持更新的外部存儲,如 MySQL, HBase, ElasticSearch。Append 流能夠寫入任意地存儲,不過通常寫入日誌類型的系統,如 Kafka。
這裏咱們但願將**「每10分鐘的搭乘的乘客數」**寫入Kafka。
咱們已經預約義了一張 Kafka 的結果表 Sink_TenMinPsgCnts
(training-config.yaml 中有完整的表定義)。
在執行 Query 前,咱們先運行以下命令,來監控寫入到 TenMinPsgCnts
topic 中的數據:
docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning
複製代碼
每10分鐘的搭乘的乘客數可使用 Tumbling Window 來描述,咱們使用 INSERT INTO Sink_TenMinPsgCnts
來直接將 Query 結果寫入到結果表。
INSERT INTO Sink_TenMinPsgCnts
SELECT
TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,
TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
CAST(SUM(psgCnt) AS BIGINT) AS cnt
FROM Rides
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);
複製代碼
咱們能夠監控到 TenMinPsgCnts
topic 的數據以 JSON 的形式寫入到了 Kafka 中:
最後咱們實踐一下將一個持續更新的 Update 流寫入 ElasticSearch 中。咱們但願將**「每一個區域出發的行車數」**,寫入到 ES 中。
咱們也已經預約義好了一張 Sink_AreaCnts
的 ElasticSearch 結果表(training-config.yaml 中有完整的表定義)。該表中只有兩個字段 areaId
和 cnt
。
一樣的,咱們也使用 INSERT INTO
將 Query 結果直接寫入到 Sink_AreaCnts
表中。
INSERT INTO Sink_AreaCnts
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);
複製代碼
在 SQL CLI 中執行上述 Query 後,Elasticsearch 會自動地建立 area-cnts
索引。Elasticsearch 提供了一個 REST API 。咱們能夠訪問
area-cnts
索引的詳細信息: http://localhost:9200/area-cntsarea-cnts
索引的統計信息: http://localhost:9200/area-cnts/_statsarea-cnts
索引的內容:http://localhost:9200/area-cnts/_search隨着 Query 的一直運行,你也能夠觀察到一些統計值(_all.primaries.docs.count
, _all.primaries.docs.deleted
)在不斷的增加:http://localhost:9200/area-cnts/_stats
本文帶你們使用 Docker Compose 快速上手 Flink SQL 的編程,並對比 Window Aggregate 和 Group Aggregate 的區別,以及這兩種類型的做業如何寫入到 外部系統中。感興趣的同窗,能夠基於這個 Docker 環境更加深刻地去實踐,例如運行本身寫的 UDF , UDTF, UDAF。查詢內置地其餘源表等等。
▼ Apache Flink 社區推薦 ▼
Apache Flink 及大數據領域頂級盛會 Flink Forward Asia 2019 重磅開啓,目前正在徵集議題,限量早鳥票優惠ing。瞭解 Flink Forward Asia 2019 的更多信息,請查看:
developer.aliyun.com/special/ffa…
首屆 Apache Flink 極客挑戰賽重磅開啓,聚焦機器學習與性能優化兩大熱門領域,40萬獎金等你拿,加入挑戰請點擊:
tianchi.aliyun.com/markets/tia…
關注 Flink 官方社區微信公衆號,瞭解更多 Flink 資訊!