簡介:本文着重從 shuffle、join 方式的選擇、對象重用、UDF 重用等方面介紹了京東在 Flink SQL 任務方面作的優化措施。git
本文做者爲京東算法服務部的張穎和段學浩,並由 Apache Hive PMC,阿里巴巴技術專家李銳幫忙校對。主要內容爲:github
- 背景
- Flink SQL 的優化
- 總結
GitHub 地址
https://github.com/apache/flink
歡迎你們給 Flink 點贊送 star~算法
目前,京東搜索推薦的數據處理流程如上圖所示。能夠看到實時和離線是分開的,離線數據處理大部分用的是 Hive / Spark,實時數據處理則大部分用 Flink / Storm。數據庫
這就形成了如下現象:在一個業務引擎裏,用戶須要維護兩套環境、兩套代碼,許多共性不能複用,數據的質量和一致性很可貴到保障。且由於流批底層數據模型不一致,致使須要作大量的拼湊邏輯;甚至爲了數據一致性,須要作大量的同比、環比、二次加工等數據對比,效率極差,而且很是容易出錯。apache
而支持批流一體的 Flink SQL 能夠很大程度上解決這個痛點,所以咱們決定引入 Flink 來解決這種問題。segmentfault
在大多數做業,特別是 Flink 做業中,執行效率的優化一直是 Flink 任務優化的關鍵,在京東天天數據增量 PB 級狀況下,做業的優化顯得尤其重要。緩存
寫過一些 SQL 做業的同窗確定都知道,對於 Flink SQL 做業,在一些狀況下會形成同一個 UDF 被反覆調用的狀況,這對一些消耗資源的任務很是不友好;此外,影響執行效率大體能夠從 shuffle、join、failover 策略等方面考慮;另外,Flink 任務調試的過程也很是複雜,對於一些線上機器隔離的公司來講尤甚。安全
爲此,咱們實現了內嵌式的 Derby 來做爲 Hive 的元數據存儲數據庫 (allowEmbedded);在任務恢復方面,批式做業沒有 checkpoint 機制來實現failover,可是 Flink 特有的 region 策略可使批式做業快速恢復;此外,本文還介紹了對象重用等相關優化措施。服務器
在 Flink SQL 任務裏會出現如下這種狀況:若是相同的 UDF 既出如今 LogicalProject 中,又出如今 Where 條件中,那麼 UDF 會進行屢次調用 (見https://issues.apache.org/jira/browse/FLINK-20887))。可是若是該 UDF 很是耗 CPU 或者內存,這種多餘的計算會很是影響性能,爲此咱們但願能把 UDF 的結果緩存起來下次直接使用。在設計的時候須要考慮:(很是重要:請必定保證 LogicalProject 和 where 條件的 subtask chain 到一塊兒)網絡
根據以上考慮,咱們用 guava cache 將 UDF 的結果緩存起來,以後調用的時候直接去cache 裏面拿數據,最大可能下降任務的消耗。下面是一個簡單的使用(同時設置了最大使用 size、超時時間,可是沒有寫鎖):
public class RandomFunction extends ScalarFunction { private static Cache<String, Integer> cache = CacheBuilder.newBuilder() .maximumSize(2) .expireAfterWrite(3, TimeUnit.SECONDS) .build(); public int eval(String pvid) { profileLog.error("RandomFunction invoked:" + atomicInteger.incrementAndGet()); Integer result = cache.getIfPresent(pvid); if (null == result) { int tmp = (int)(Math.random() * 1000); cache.put("pvid", tmp); return tmp; } return result; } @Override public void close() throws Exception { super.close(); cache.cleanUp(); } }
你們可能會好奇爲何會把單元測試也放到優化裏面,你們都知道 Flink 任務調試過程很是複雜,對於一些線上機器隔離的公司來講尤甚。京東的本地環境是沒有辦法訪問任務服務器的,所以在初始階段調試任務,咱們耗費了不少時間用來上傳 jar 包、查看日誌等行爲。
爲了下降任務的調試時間、增長代碼開發人員的開發效率,實現了內嵌式的 Derby 來做爲 Hive 的元數據存儲數據庫 (allowEmbedded),這算是一種優化開發時間的方法。具體思路以下:
首先建立 Hive Conf:
public static HiveConf createHiveConf() { ClassLoader classLoader = new HiveOperatorTest().getClass().getClassLoader(); HiveConf.setHiveSiteLocation(classLoader.getResource(HIVE_SITE_XML)); try { TEMPORARY_FOLDER.create(); String warehouseDir = TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/metastore_db"; String warehouseUri = String.format(HIVE_WAREHOUSE_URI_FORMAT, warehouseDir); HiveConf hiveConf = new HiveConf(); hiveConf.setVar( HiveConf.ConfVars.METASTOREWAREHOUSE, TEMPORARY_FOLDER.newFolder("hive_warehouse").getAbsolutePath()); hiveConf.setVar(HiveConf.ConfVars.METASTORECONNECTURLKEY, warehouseUri); hiveConf.set("datanucleus.connectionPoolingType", "None"); hiveConf.set("hive.metastore.schema.verification", "false"); hiveConf.set("datanucleus.schema.autoCreateTables", "true"); return hiveConf; } catch (IOException e) { throw new CatalogException("Failed to create test HiveConf to HiveCatalog.", e); } }
接下來建立 Hive Catalog:(利用反射的方式調用 embedded 的接口)
public static void createCatalog() throws Exception{ Class clazz = HiveCatalog.class; Constructor c1 = clazz.getDeclaredConstructor(new Class[]{String.class, String.class, HiveConf.class, String.class, boolean.class}); c1.setAccessible(true); hiveCatalog = (HiveCatalog)c1.newInstance(new Object[]{"test-catalog", null, createHiveConf(), "2.3.4", true}); hiveCatalog.open(); }
建立 tableEnvironment:(同官網)
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build(); TableEnvironment tableEnv = TableEnvironment.create(settings); TableConfig tableConfig = tableEnv.getConfig(); Configuration configuration = new Configuration(); configuration.setInteger("table.exec.resource.default-parallelism", 1); tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName());
最後關閉 Hive Catalog:
public static void closeCatalog() { if (hiveCatalog != null) { hiveCatalog.close(); } }
此外,對於單元測試,構建合適的數據集也是一個很是大的功能,咱們實現了 CollectionTableFactory,容許本身構建合適的數據集,使用方法以下:
CollectionTableFactory.reset(); CollectionTableFactory.initData(Arrays.asList(Row.of("this is a test"), Row.of("zhangying480"), Row.of("just for test"), Row.of("a test case"))); StringBuilder sbFilesSource = new StringBuilder(); sbFilesSource.append("CREATE temporary TABLE db1.`search_realtime_table_dump_p13`(" + " `pvid` string) with ('connector.type'='COLLECTION','is-bounded' = 'true')"); tableEnv.executeSql(sbFilesSource.toString());
傳統的離線 Batch SQL (面向有界數據集的 SQL) 有三種基礎的實現方式,分別是 Nested-loop Join、Sort-Merge Join 和 Hash Join。
效率 | 空間 | 備註 | |
---|---|---|---|
Nested-loop Join | 差 | 佔用大 | |
Sort-Merge Join | 有sort merge開銷 | 佔用小 | 有序數據集的一種優化措施 |
Hash Join | 高 | 佔用大 | 適合大小表 |
Nested-loop Join 最爲簡單直接,將兩個數據集加載到內存,並用內嵌遍歷的方式來逐個比較兩個數據集內的元素是否符合 Join 條件。Nested-loop Join 的時間效率以及空間效率都是最低的,可使用:table.exec.disabled-operators:NestedLoopJoin 來禁用。
如下兩張圖片是禁用前和禁用後的效果 (若是你的禁用沒有生效,先看一下是否是 Equi-Join):
Hash Join 一樣分爲兩個階段:首先將一個數據集轉換爲 Hash Table,而後遍歷另一個數據集元素並與 Hash Table 內的元素進行匹配。
Hash Join 效率較高可是對空間要求較大,一般是做爲 Join 其中一個表爲適合放入內存的小表的狀況下的優化方案 (並非不容許溢寫磁盤)。
注意:Sort-Merge Join 和 Hash Join 只適用於 Equi-Join ( Join 條件均使用等於做爲比較算子)。
Flink 在 join 之上又作了一些細分,具體包括:
特色 | 使用 | |
---|---|---|
Repartition-Repartition strategy | 對數據集分別進行分區和shuffle,若是數據集大的時候效率極差 | 兩個數據集相差不大 |
Broadcast-Forward strategy | 將小表的數據所有發送到大表數據的機器上 | 兩個數據集有較大的差距 |
衆所周知,batch 的 shuffle 很是耗時間。
能夠經過:table.optimizer.join.broadcast-threshold 來設置採用 broadcast 的 table 大小,若是設置爲 「-1」,表示禁用 broadcast。
下圖爲禁用先後的效果:
在 Flink SQL 任務裏,下降 shuffle 能夠有效的提升 SQL 任務的吞吐量,在實際的業務場景中常常遇到這樣的狀況:上游產出的數據已經知足了數據分佈要求 (如連續多個 join 算子,其中 key 是相同的),此時 Flink 的 forward shuffle 是冗餘的 shuffle,咱們但願將這些算子 chain 到一塊兒。Flink 1.12 引入了 mutiple input 的特性,能夠消除大部分不必的 forward shuffle,把 source 的算子 chain 到一塊兒。
table.optimizer.multiple-input-enabled:true
下圖爲開了 multiple input 和沒有開的拓撲圖 ( operator chain 功能已經打開):
上下游 operator 之間會通過序列化 / 反序列化 / 複製階段來進行數據傳輸,這種行爲很是影響 Flink SQL 程序的性能,能夠經過啓用對象重用來提升性能。可是這在 DataStream 裏面很是危險,由於可能會發生如下狀況:在下一個算子中修改對象意外影響了上面算子的對象。
可是 Flink 的 Table / SQL API 中是很是安全的,能夠經過以下方式來啓用:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableObjectReuse();
或者是經過設置:pipeline-object-reuse:true
爲何啓用了對象重用會有這麼大的性能提高?在 Blink planner 中,同一任務的兩個算子之間的數據交換最終將調用 BinaryString#copy,查看實現代碼,能夠發現 BinaryString#copy 須要複製底層 MemorySegment 的字節,經過啓用對象重用來避免複製,能夠有效提高效率。
下圖爲沒有開啓對象重用時相應的火焰圖:
batch 任務模式下 checkpoint 以及其相關的特性所有都不可用,所以針對實時任務的基於 checkpoint 的 failover 策略是不能應用在批任務上面的,可是 batch 任務容許 Task 之間經過 Blocking Shuffle 進行通訊,當一個 Task 由於任務未知的緣由失敗以後,因爲 Blocking Shuffle 中存儲了這個 Task 所須要的所有數據,因此只須要重啓這個 Task 以及經過 Pipeline Shuffle 與其相連的所有下游任務便可:
jobmanager.execution.failover-strategy:region (已經 finish 的 operator 可直接恢復)
table.exec.shuffle-mode:ALL\_EDGES\_BLOCKING (shuffle 策略)。
Flink 裏的 shuffle 分爲 pipeline shuffle 和 blocking shuffle。
blocking shuffle 就是傳統的 batch shuffle,會將數據落盤,這種 shuffle 的容錯好,可是會產生大量的磁盤、網絡 io (若是爲了省心的話,建議用 blocking suffle)。blocking shuffle 又分爲 hash shuffle 和 sort shuffle,
相應的控制參數:
table.exec.shuffle-mode,該參數有多個參數,默認是 ALL\_EDGES\_BLOCKING,表示全部的邊都會用 blocking shuffle,不過你們能夠試一下 POINTWISE\_EDGES\_PIPELINED,表示 forward 和 rescale edges 會自動開始 pipeline 模式。
taskmanager.network.sort-shuffle.min-parallelism ,將這個參數設置爲小於你的並行度,就能夠開啓 sort-merge shuffle;這個參數的設置須要考慮一些其餘的狀況,具體的能夠按照官網設置。
本文着重從 shuffle、join 方式的選擇、對象重用、UDF 重用等方面介紹了京東在 Flink SQL 任務方面作的優化措施。另外,感謝京東實時計算研發部付海濤等所有同事的支持與幫助。
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。