做者:李勁鬆(之信)git
現在的大數據批計算,隨着 Hive 數倉的成熟,廣泛的模式是 Hive metastore + 計算引擎。常見的計算引擎有 Hive on MapReduce、Hive on Tez、Hive on Spark、Spark integrate Hive、Presto integrate Hive,還有隨着 Flink 1.10 發佈後生產可用的 Flink Batch SQL。github
Flink 做爲一個統一的計算引擎,旨在提供統一的流批體驗以及技術棧。Flink 在 1.9 合併了 Blink 的代碼,並在 1.10 中完善了大量的功能以及性能,能夠運行全部 TPC-DS 的查詢,性能方面也頗有競爭力,Flink 1.10 是一個生產可用的、批流統一的 SQL 引擎版本。sql
在搭建計算平臺的過程當中,性能和成本是選取計算引擎的很關鍵的因素。爲此,Ververica 的 flink-sql-benchmark [1] 項目提供了基於 Hive Metastore 的 TPC-DS Benchmark 測試的工具,爲了測試更靠近真正的生產做業:數據庫
- 測試的輸入表都是標準的 Hive 表,數據全在與生產一致的 Hive 數倉中。其它計算引擎也能方便分析這些表。
- 數據的格式採用 ORC,ORC 是經常使用的生產文件格式,提供較高的壓縮率,和較好的讀取性能。
- 選取 TPC-DS Benchmark 的 10TB 數據集,10TB 的數據集是比較常見的生產規模。若是隻有 1TB,徹底能夠在傳統數據庫中運行起來,不太適合大數據的測試。
咱們在 20 臺機器上測試了三種引擎:Flink 1.十、Hive 3.0 on MapReduce、Hive 3.0 on Tez,從兩個維度測試了引擎的成績:apache
- 總時長:直觀的性能數據,可是可能會受到個別 queries 的較大影響。
- 幾何平均:表示一組數的中心趨勢,它能夠更好的消除個別 queries 的較大影響,呈現較真實的平均數。
-
![eb2e01446559af416a93e7e0664a6526.jpg eb2e01446559af416a93e7e0664a6526.jpg](http://static.javashuo.com/static/loading.gif)
結果摘要:微信
運行總時間的對比成績是:框架
![1.jpg 1.jpg](http://static.javashuo.com/static/loading.gif)
Queries 幾何平均的對比成績是:分佈式
![2.jpg 2.jpg](http://static.javashuo.com/static/loading.gif)
本文只測試了上述引擎和 10TB 的數據集,讀者能夠根據本身的集羣規模,選取特定的數據集,使用 flink-sql-benchmark 工具來運行更多引擎的對比測試。
Benchmark 詳情
Benchmark 環境
具體環境及調優說明:
- 計算環境:20 臺機器,機器參數爲 64 核 intel 處理器、256GB 內存、1 SSD 盤用於計算引擎、多塊 SATA 盤用於 HDFS、萬兆網卡。
- 集羣環境:Yarn + HDFS + Hive。
- Flink參數:flink-conf.yaml [2]。
- Hive參數:主要調優了 MapJoin 的閾值,提升性能的同時避免 OOM。
- 選用較新的 Hadoop 版本(3.X),並選用了較新的 Hive 和 Tez 版本
Benchmark 步驟
■ 環境準備
- 準備 Hadoop (HDFS + YARN) 環境
- 準備 Hive 環境
■ 數據集生成
- 分佈式生成 TPC-DS 數據集,並加載 TEXT 數據集到 Hive,原始數據是 Csv 的格式。建議分佈式生成數據,這也是個比較耗時的步驟。(flink-sql-benmark 工具中集成了 TPC-DS 的工具)
- Hive TEXT 錶轉換爲 ORC 表,ORC 格式是常見的 Hive 數據文件格式,行列混合的存儲有利於後續的快速分析,也有很高的壓縮比。執行 Query:create table ${NAME} stored as ${FILE} as select * from ${SOURCE}.${NAME};
![3.jpg 3.jpg](http://static.javashuo.com/static/loading.gif)
如圖,生成了 TPC-DS 官方說明的 7 張事實表和 17 張維表。
- 分析 Hive 表,統計信息對於分析做業的查詢優化很是重要,對於複雜的 SQL,Plan 的執行效率有很大的差別。Flink 不但支持讀取 Hive 的 Table 統計信息,也支持讀取 Hive 的分區統計信息,根據統計信息進行 CBO 的優化。執行命令:analyze table ${NAME} compute statistics for columns;
![4.jpg 4.jpg](http://static.javashuo.com/static/loading.gif)
■ Flink 運行 Queries
- 準備 Flink 環境,搭建 Flink Yarn Session 環境,推薦使用 Standalone 或者 Session 模式,能夠複用 Flink 的進程,加快分析型做業的速度。
- 編寫代碼運行 Queries,統計執行時間等相關信息,具體代碼能夠直接複用 flink-sql-benchmark 裏的 flink-tpcds 工程。
- FLINK_HOME/flink run 運行程序,執行全部 queries,等待執行完畢,統計執行時間。
![5.jpg 5.jpg](http://static.javashuo.com/static/loading.gif)
■ 其它引擎運行 Queries
- 根據其它引擎的官網提示,搭建環境。
- 得益於標準的 Hive 數據集,能夠方便的使用其它引擎來讀取 Hive 數據。
- 在運行時,值得注意的是須要達到集羣的瓶頸,好比 Cpu、好比 Disk,必定是有瓶頸出現,才能證實運行方式和參數是比較合理的,爲此,須要一些性能調優。
Benchmark 分析
Flink 1.10
Flink 1.9 在合併 Blink 代碼的時候,就已經完成了不少工做:深度 CodeGeneration、Binary 存儲與計算、完善的 CBO 優化、Batch Shuffler,爲後續的性能突破打下了紮實的基礎。
Flink 1.10 繼續完善 Hive 集成,並達到了生產級別的 Hive 集成標準,其它也在性能和開箱即用方面作了不少工做:
Flink 參數分析
Flink 1.10 作了不少參數的優化,提升用戶的開箱即用體驗,可是因爲批流一體的一些限制,目前也是須要進行一些參數設置的,這裏本文粗略分析下。
■ Table 層參數:
- table.optimizer.join-reorder-enabled = true:須要手動打開,目前各大引擎的 JoinReorder 少有默認打開的,在統計信息比較完善時,是能夠打開的,通常來講 reorder 錯誤的狀況是比較少見的。
- table.optimizer.join.broadcast-threshold = 1010241024:從默認值 1MB 調整到 10MB,目前 Flink 的廣播機制還有待提升,因此默認值爲 1MB,可是在併發規模不是那麼大的狀況下,能夠開到 10MB。
- table.exec.resource.default-parallelism = 800:Operator 的併發設置,針對 10T 的輸入,建議開到 800 的併發,不建議太大併發,併發越大,對系統各方面的壓力越大。
■ TaskManager 參數分析:
- taskmanager.numberOfTaskSlots = 10:單個 TM 裏的 slot 個數。
- TaskManager 內存參數:TaskManager 的內存主要分爲三種,管理內存、網絡內存、JVM 相關的其它內存。須要理解下官網的文檔,纔能有效的設置這些參數。
- taskmanager.memory.process.size = 15000m:TaskManager 的總內存,減去其它內存後通常留給堆內 3-5GB 的內存。
- taskmanager.memory.managed.size = 8000m:管理內存,用於 Operator 的計算,留給單個 Slot 300 - 800MB 的內存是比較合理的。
- taskmanager.network.memory.max = 2200mb:Task 點到點的通訊須要 4 個 Buffers,根據併發大概計算得出須要 2GB,能夠經過嘗試得出,Buffers 不夠會拋出異常。
■ 網絡參數分析
- taskmanager.network.blocking-shuffle.type = mmap:Shuffle read 使用 mmap 的方式,直接靠系統來管理內存,是比較方便的形式。
- taskmanager.network.blocking-shuffle.compression.enabled = true:Shuffle 使用壓縮,這個參數是批流複用的,強烈建議給批做業開啓壓縮,否則瓶頸就會在磁盤上。
■ 調度參數分析
- cluster.evenly-spread-out-slots = true:在調度 Task 時均勻調度到每一個 TaskManager 中,這有利於使用全部資源。
- jobmanager.execution.failover-strategy = region:默認全局重試,需打開 region 重試才能 enable 單點的 failover。
- restart-strategy = fixed-delay:重試策略須要手動設置,默認是不重試的。
其它 timeout 相關參數是爲了不調度和運行過程當中,大數據量致使的網絡抖動,進而致使做業失敗的問題。
Flink 1.11 及後續規劃
後續 Flink 社區會在完善功能的同時進一步夯實性能:
- 提供 SQL Gateway 以及 JDBC Driver,目前提供獨立倉庫,面向 Flink 1.10。[6] [7]
- 提供 Hive 語法兼容模式,避免 Hive 用戶的困擾。
- 完善 ORC 和 Parquet 的向量化讀。
- N-Ary stream operator [8]:開發 table 層的 chain 框架,進一步避免 Shuffle 落盤致使的開銷。
參考連接:
[1]https://github.com/ververica/...
[2]https://github.com/ververica/...
[3]http://jira.apache.org/jira/b...
[4]https://issues.apache.org/jir...
[5]https://cwiki.apache.org/conf...
[6]https://github.com/ververica/...
[7]https://github.com/ververica/...
[8]https://cwiki.apache.org/conf...
# 重磅福利 #
《Demo: 基於 Flink SQL 構建離線應用》的 PPT 來啦!關注「Flink 中文社區」微信公衆號,後臺回覆關鍵字「0218SQL」便可獲取本次直播課程 Demo 演示 PPT~