Flink Batch SQL 1.10 實踐

Flink做爲流批統一的計算框架,在1.10中完成了大量batch相關的加強與改進。1.10能夠說是第一個成熟的生產可用的Flink Batch SQL版本,它一掃以前Dataset的羸弱,從功能和性能上都有大幅改進,如下我從架構、外部系統集成、實踐三個方面進行闡述。sql

架構

Stack

首先來看下stack,在新的Blink planner中,batch也是架設在Transformation上的,這就意味着咱們和Dataset徹底沒有關係了:編程

  1. 咱們能夠儘量的和streaming複用組件,複用代碼,有同一套行爲。
  2. 若是想要Table/SQL的toDataset或者fromDataset,那就徹底沒戲了。儘量的在Table的層面來處理吧。
  3. 後續咱們正在考慮在DataStream上構建BoundedStream,給DataStream帶來批處理的功能。

網絡模型

Batch模式就是在中間結果落盤,這個模式和典型的Batch處理是一致的,好比MapReduce/Spark/Tez。網絡

Flink之前的網絡模型也分爲Batch和Pipeline兩種,可是Batch模式只是支持上下游隔斷執行,也就是說資源用量能夠不用同時知足上下游共同的併發。可是另一個關鍵點是Failover沒有對接好,1.9和1.10在這方面進行了改進,支持了單點的Failover。架構

建議在Batch時打開:併發

jobmanager.execution.failover-strategy = region框架

爲了不重啓過於頻繁致使JobMaster太忙了,能夠把重啓間隔提升:函數

restart-strategy.fixed-delay.delay = 30 s工具

Batch模式的好處有:性能

  • 容錯好,能夠單點恢復
  • 調度好,無論多少資源均可以運行
  • 性能差,中間數據須要落盤,強烈建議開啓壓縮
    taskmanager.network.blocking-shuffle.compression.enabled = true

Batch模式比較穩,適合傳統Batch做業,大做業。優化

Pipeline模式是Flink的傳統模式,它徹底和Streaming做業用的是同一套代碼,其實社區裏Impala和Presto也是相似的模式,純走網絡,須要處理反壓,不落盤,它主要的優缺點是:

  • 容錯差,只能全局重來
  • 調度差,你得保證有足夠的資源
  • 性能好,Pipeline執行,徹底複用Stream,複用流控反壓等功能。

有條件能夠考慮開啓Pipeline模式。

調度模型

Flink on Yarn支持兩種模式,Session模式和Per job模式,如今已經在調度層次高度統一了。

  1. Session模式沒有最大進程限制,當有Job須要資源時,它就會去Yarn申請新資源,當Session有空閒資源時,它就會給Job複用,因此它的模型和PerJob是基本同樣的。
  2. 惟一的不一樣只是:Session模式能夠跨做業複用進程。

另外,若是想要更好的複用進程,能夠考慮加大TaskManager的超時釋放:
resourcemanager.taskmanager-timeout = 900000

資源模型

先說說併發:

  1. 對Source來講:目前Hive的table是根據InputSplit來定須要多少併發的,它以後能Chain起來的Operators天然都是和source相同的併發。
  2. 對下游網絡傳輸事後的Operators(Tasks)來講:除了必定須要單併發的Task來講,其它Task所有統一併發,由table.exec.resource.default-parallelism統一控制。

咱們在Blink內部實現了基於統計信息來推斷併發的功能,可是其實以上的策略在大部分場景就夠用了。

Manage內存

目前一個TaskManager裏面含有多個Slot,在Batch做業中,一個Slot裏只能運行一個Task (關閉SlotShare)。

對內存來講,單個TM會把Manage內存切分紅Slot粒度,若是1個TM中有n個Slot,也就是Task能拿到1/n的manage內存。

咱們在1.10作了重大的一個改進就是:Task中chain起來的各個operators按照比例來瓜份內存,因此如今配置的算子內存都是一個比例值,實際拿到的還要根據Slot的內存來瓜分。

這樣作的一個重要好處是:

  1. 無論當前Slot有多少內存,做業能都run起來,這大大提升了開箱即用。
  2. 無論當前Slot有多少內存,Operators都會把內存瓜分乾淨,不會存在浪費的可能。

固然,爲了運行的效率,咱們通常建議單個Slot的manage內存應該大於500MB。

另外一個事情,在1.10後,咱們去除了OnHeap的manage內存,因此只有off-heap的manage內存。

外部系統集成

Hive

強烈推薦Hive Catalog + Hive,這也是目前批處理最成熟的架構。在1.10中,除了對之前功能的完善之外,其它作了幾件事:

  1. 多版本支持,支持Hive 1.X 2.X 3.X
  2. 完善了分區的支持,包括分區讀,動態/靜態分區寫,分區統計信息的支持。
  3. 集成Hive內置函數,能夠經過如下方式來load:
    a)TableEnvironment.loadModule("hiveModule",new HiveModule("hiveVersion"))
  4. 優化了ORC的性能讀,使用向量化的讀取方式,可是目前只支持Hive 2+版本,且要求列沒有複雜類型。有沒有進行過優化差距在5倍量級。

兼容Streaming Connectors

得益於流批統一的架構,目前的流Connectors也能在batch上使用,好比HBase的Lookup和Sink、JDBC的Lookup和Sink、Elasticsearch的Sink,均可以在Batch無縫對接使用起來。

實踐

SQL-CLI

在1.10中,SQL-CLI也作了大量的改動,好比把SQL-CLI作了stateful,裏面也支持了DDL,還支持了大量的DDL命令,給SQL-CLI暴露了不少TableEnvironment的能力,這讓用戶能夠方便得多。後續,咱們也須要對接JDBC的客戶端,讓用戶能夠更好的對接外部工具。可是SQL-CLI仍然待繼續改進,好比目前仍然只支持Session模式,不支持Per Job模式。

編程方式

老的BatchTableEnv由於綁定了Dataset,並且區分Java和Scala,是不乾淨的設計方式,因此Blink planner只支持新的TableEnv。

TableEnv註冊的source, sink, connector, functions,都是temporary的,重啓以後即失效了。若是須要持久化的object,考慮使用HiveCatalog。

能夠經過tEnv.sqlQuery來執行DML,這樣能夠得到一個Table,咱們也經過collect來得到小量的數據:

能夠經過tEnv.sqlUpdate來執行DDL,可是目前並不支持建立hive的table,只能建立Flink類型的table:

能夠經過tEnv.sqlUpdate來執行insert語句,Insert到臨時表或者Catalog表中,好比insert到上面建立的臨時JDBC表中:

當結果表是Hive表時,可使用Overwrite語法,也可使用靜態Partition的語法,這須要打開Hive的方言:

結語

目前Flink batch SQL仍然在高速發展中,可是1.10已是一個可用的版本了,它在功能上、性能上都有很大的提高,後續還有不少有意思的features,等待着你們一塊兒去挖掘。


本文做者:李勁鬆(之信)

原文連接

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索