Flink 1.10 和 Hive 3.0 性能對比(附 Demo 演示 PPT)

做者:李勁鬆(之信)git

現在的大數據批計算,隨着 Hive 數倉的成熟,廣泛的模式是 Hive metastore + 計算引擎。常見的計算引擎有 Hive on MapReduce、Hive on Tez、Hive on Spark、Spark integrate Hive、Presto integrate Hive,還有隨着 Flink 1.10 發佈後生產可用的 Flink Batch SQL。github

Flink 做爲一個統一的計算引擎,旨在提供統一的流批體驗以及技術棧。Flink 在 1.9 合併了 Blink 的代碼,並在 1.10 中完善了大量的功能以及性能,能夠運行全部 TPC-DS 的查詢,性能方面也頗有競爭力,Flink 1.10 是一個生產可用的、批流統一的 SQL 引擎版本。sql

在搭建計算平臺的過程當中,性能和成本是選取計算引擎的很關鍵的因素。爲此,Ververica 的 flink-sql-benchmark [1] 項目提供了基於 Hive Metastore 的 TPC-DS Benchmark 測試的工具,爲了測試更靠近真正的生產做業:數據庫

  • 測試的輸入表都是標準的 Hive 表,數據全在與生產一致的 Hive 數倉中。其它計算引擎也能方便分析這些表。
  • 數據的格式採用 ORC,ORC 是經常使用的生產文件格式,提供較高的壓縮率,和較好的讀取性能。
  • 選取 TPC-DS Benchmark 的 10TB 數據集,10TB 的數據集是比較常見的生產規模。若是隻有 1TB,徹底能夠在傳統數據庫中運行起來,不太適合大數據的測試。

咱們在 20 臺機器上測試了三種引擎:Flink 1.十、Hive 3.0 on MapReduce、Hive 3.0 on Tez,從兩個維度測試了引擎的成績:apache

  • 總時長:直觀的性能數據,可是可能會受到個別 queries 的較大影響。
  • 幾何平均:表示一組數的中心趨勢,它能夠更好的消除個別 queries 的較大影響,呈現較真實的平均數。

eb2e01446559af416a93e7e0664a6526.jpg

結果摘要:微信

  • Flink 1.10 VS Hive 3.0 on MapReduce網絡

    • Flink 總時長的性能是 Hive on MapReduce 的 8.7 倍。
    • Flink Queries 幾何平均的性能是 Hive on MapReduce 的 7.8 倍。
  • Flink 1.10 VS Hive 3.0 on Tez併發

    • Flink 總時長的性能是 Hive on Tez 的 2.1 倍。
    • Flink Queries 幾何平均的性能是 Hive on Tez 的 2.0 倍。

運行總時間的對比成績是:框架

1.jpg

Queries 幾何平均的對比成績是:分佈式

2.jpg

本文只測試了上述引擎和 10TB 的數據集,讀者能夠根據本身的集羣規模,選取特定的數據集,使用 flink-sql-benchmark 工具來運行更多引擎的對比測試。

Benchmark 詳情

Benchmark 環境

具體環境及調優說明:

  • 計算環境:20 臺機器,機器參數爲 64 核 intel 處理器、256GB 內存、1 SSD 盤用於計算引擎、多塊 SATA 盤用於 HDFS、萬兆網卡。
  • 集羣環境:Yarn + HDFS + Hive。
  • Flink參數:flink-conf.yaml [2]。
  • Hive參數:主要調優了 MapJoin 的閾值,提升性能的同時避免 OOM。
  • 選用較新的 Hadoop 版本(3.X),並選用了較新的 Hive 和 Tez 版本

Benchmark 步驟

環境準備

  1. 準備 Hadoop (HDFS + YARN) 環境
  2. 準備 Hive 環境

■ 數據集生成

  1. 分佈式生成 TPC-DS 數據集,並加載 TEXT 數據集到 Hive,原始數據是 Csv 的格式。建議分佈式生成數據,這也是個比較耗時的步驟。(flink-sql-benmark 工具中集成了 TPC-DS 的工具)
  2. Hive TEXT 錶轉換爲 ORC 表,ORC 格式是常見的 Hive 數據文件格式,行列混合的存儲有利於後續的快速分析,也有很高的壓縮比。執行 Query:create table ${NAME} stored as ${FILE} as select * from ${SOURCE}.${NAME};

3.jpg

如圖,生成了 TPC-DS 官方說明的 7 張事實表和 17 張維表。

  1. 分析 Hive 表,統計信息對於分析做業的查詢優化很是重要,對於複雜的 SQL,Plan 的執行效率有很大的差別。Flink 不但支持讀取 Hive 的 Table 統計信息,也支持讀取 Hive 的分區統計信息,根據統計信息進行 CBO 的優化。執行命令:analyze table ${NAME} compute statistics for columns;

4.jpg

Flink 運行 Queries

  1. 準備 Flink 環境,搭建 Flink Yarn Session 環境,推薦使用 Standalone 或者 Session 模式,能夠複用 Flink 的進程,加快分析型做業的速度。
  2. 編寫代碼運行 Queries,統計執行時間等相關信息,具體代碼能夠直接複用 flink-sql-benchmark 裏的 flink-tpcds 工程。
  3. FLINK_HOME/flink run 運行程序,執行全部 queries,等待執行完畢,統計執行時間。

5.jpg

其它引擎運行 Queries

  1. 根據其它引擎的官網提示,搭建環境。
  2. 得益於標準的 Hive 數據集,能夠方便的使用其它引擎來讀取 Hive 數據。
  3. 在運行時,值得注意的是須要達到集羣的瓶頸,好比 Cpu、好比 Disk,必定是有瓶頸出現,才能證實運行方式和參數是比較合理的,爲此,須要一些性能調優。

Benchmark 分析

Flink 1.10

Flink 1.9 在合併 Blink 代碼的時候,就已經完成了不少工做:深度 CodeGeneration、Binary 存儲與計算、完善的 CBO 優化、Batch Shuffler,爲後續的性能突破打下了紮實的基礎。

Flink 1.10 繼續完善 Hive 集成,並達到了生產級別的 Hive 集成標準,其它也在性能和開箱即用方面作了不少工做:

  • Hive 多版本的支持,支持了 Hive 1.0 之後的主要版本。
  • 向量化的 ORC 讀,目前只在 Hive 2.0 以上版本纔會默認開啓。

    • Hive 1.X 版本的支持已經在進行中:FLINK-14802 [3]
    • Parquet 的向量化讀支持也已經在開發中:FLINK-11899 [4]
  • 基於比例的彈性內存分配,這不只利於 Operator 能夠更多的使用內存,並且大大方便了用戶的配置,用戶再也不須要配置 Operator 內存,Operator 根據 Slot 彈性的拿到內存,提升了 Flink 開箱即用的易用性。詳見 FLIP-53 [5]
  • Shuffle 的壓縮:Flink 默認給 Batch 做業開啓中間數據落盤的方式,這有利於避免調度死鎖的可能,也提供了良好的容錯機制,可是大量的落盤可能致使做業瓶頸在磁盤的吞吐上,因此 Flink 1.10 開發了 Shuffle 的壓縮,用 Cpu 換 IO。
  • 新調度框架:Flink 1.10 也引入新了的調度框架,這有利於 JobMaster 的調度性能,避免併發太大時,JobMaster 成爲性能瓶頸。

Flink 參數分析

Flink 1.10 作了不少參數的優化,提升用戶的開箱即用體驗,可是因爲批流一體的一些限制,目前也是須要進行一些參數設置的,這裏本文粗略分析下。

Table 層參數:

  • table.optimizer.join-reorder-enabled = true:須要手動打開,目前各大引擎的 JoinReorder 少有默認打開的,在統計信息比較完善時,是能夠打開的,通常來講 reorder 錯誤的狀況是比較少見的。
  • table.optimizer.join.broadcast-threshold = 1010241024:從默認值 1MB 調整到 10MB,目前 Flink 的廣播機制還有待提升,因此默認值爲 1MB,可是在併發規模不是那麼大的狀況下,能夠開到 10MB。
  • table.exec.resource.default-parallelism = 800:Operator 的併發設置,針對 10T 的輸入,建議開到 800 的併發,不建議太大併發,併發越大,對系統各方面的壓力越大。

TaskManager 參數分析:

  • taskmanager.numberOfTaskSlots = 10:單個 TM 裏的 slot 個數。
  • TaskManager 內存參數:TaskManager 的內存主要分爲三種,管理內存、網絡內存、JVM 相關的其它內存。須要理解下官網的文檔,纔能有效的設置這些參數。
  • taskmanager.memory.process.size = 15000m:TaskManager 的總內存,減去其它內存後通常留給堆內 3-5GB 的內存。
  • taskmanager.memory.managed.size = 8000m:管理內存,用於 Operator 的計算,留給單個 Slot 300 - 800MB 的內存是比較合理的。
  • taskmanager.network.memory.max = 2200mb:Task 點到點的通訊須要 4 個 Buffers,根據併發大概計算得出須要 2GB,能夠經過嘗試得出,Buffers 不夠會拋出異常。

網絡參數分析

  • taskmanager.network.blocking-shuffle.type = mmap:Shuffle read 使用 mmap 的方式,直接靠系統來管理內存,是比較方便的形式。
  • taskmanager.network.blocking-shuffle.compression.enabled = true:Shuffle 使用壓縮,這個參數是批流複用的,強烈建議給批做業開啓壓縮,否則瓶頸就會在磁盤上。

調度參數分析

  • cluster.evenly-spread-out-slots = true:在調度 Task 時均勻調度到每一個 TaskManager 中,這有利於使用全部資源。
  • jobmanager.execution.failover-strategy = region:默認全局重試,需打開 region 重試才能 enable 單點的 failover。
  • restart-strategy = fixed-delay:重試策略須要手動設置,默認是不重試的。

其它 timeout 相關參數是爲了不調度和運行過程當中,大數據量致使的網絡抖動,進而致使做業失敗的問題。

Flink 1.11 及後續規劃

後續 Flink 社區會在完善功能的同時進一步夯實性能:

  • 提供 SQL Gateway 以及 JDBC Driver,目前提供獨立倉庫,面向 Flink 1.10。[6] [7]
  • 提供 Hive 語法兼容模式,避免 Hive 用戶的困擾。
  • 完善 ORC 和 Parquet 的向量化讀。
  • N-Ary stream operator [8]:開發 table 層的 chain 框架,進一步避免 Shuffle 落盤致使的開銷。

參考連接:

[1]https://github.com/ververica/...
[2]https://github.com/ververica/...
[3]http://jira.apache.org/jira/b...
[4]https://issues.apache.org/jir...
[5]https://cwiki.apache.org/conf...
[6]https://github.com/ververica/...
[7]https://github.com/ververica/...
[8]https://cwiki.apache.org/conf...

# 重磅福利 #

《Demo: 基於 Flink SQL 構建離線應用》的 PPT 來啦!關注「Flink 中文社區」微信公衆號,後臺回覆關鍵字「0218SQL」便可獲取本次直播課程 Demo 演示 PPT~

相關文章
相關標籤/搜索