《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹

前言

之因此寫這個是由於前段時間本身的項目出現過這樣的一個問題:apache

1
2
3
Caused by: akka.pattern.AskTimeoutException: 
Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. 
Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".

跟着這問題在 Flink 的 Issue 列表裏看到了一個相似的問題:https://issues.apache.org/jira/browse/FLINK-9056
,看下面的評論差很少就是 TaskManager 的 slot 數量不足的緣由,致使 job 提交失敗。在 Flink 1.63 中已經修復了變成拋出異常了。併發

居然知道了是由於 slot 不足的緣由了,那麼咱們就要先了解下 slot 是什麼東東呢?不過文章這裏先介紹下 parallelism。app

什麼是 parallelism?

如翻譯這樣,parallelism 是並行的意思,在 Flink 裏面表明每一個任務的並行度,適當的提升並行度能夠大大提升 job 的執行效率,好比你的 job 消費 kafka 數據過慢,適當調大可能就消費正常了。jsp

那麼在 Flink 中怎麼設置並行度呢?學習

如何設置 parallelism?

如上圖,在 flink 配置文件中能夠查看到默認並行度是 1,spa

1
2
3
4
cat flink-conf.yaml | grep parallelism

# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1

因此你如何在你的 flink job 裏面不設置任何的 parallelism 的話,那麼他也會有一個默認的 parallelism = 1。那也意味着你能夠修改這個配置文件的默認並行度。命令行

若是你是用命令行啓動你的 Flink job,那麼你也能夠這樣設置並行度(使用 -p 並行度):線程

1
./bin/flink run -p 10 ../word-count.jar

你也能夠經過這樣來設置你整個程序的並行度:翻譯

1
2
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

注意:這樣設置的並行度是你整個程序的並行度,那麼後面若是你的每一個算子不單獨設置並行度覆蓋的話,那麼後面每一個算子的並行度就都是這裏設置的並行度的值了。3d

如何給每一個算子單獨設置並行度呢?

1
2
3
4
data.keyBy(new xxxKey())
    .flatMap(new XxxFlatMapFunction()).setParallelism(5)
    .map(new XxxMapFunction).setParallelism(5)
    .addSink(new XxxSink()).setParallelism(1)

如上,就是在每一個算子後面單獨的設置並行度,這樣的話,就算你前面設置了 env.setParallelism(10) 也是會被覆蓋的。

這也說明優先級是:算子設置並行度 > env 設置並行度 > 配置文件默認並行度

並行度講到這裏應該都懂了,下面 zhisheng 就繼續跟你講講 什麼是 slot?

什麼是 slot?

其實什麼是 slot 這個問題以前在第一篇文章 《從0到1學習Flink》—— Apache Flink 介紹 中就介紹過了,這裏再講細一點。

圖中 Task Manager 是從 Job Manager 處接收須要部署的 Task,任務的並行性由每一個 Task Manager 上可用的 slot 決定。每一個任務表明分配給任務槽的一組資源,slot 在 Flink 裏面能夠認爲是資源組,Flink 將每一個任務分紅子任務而且將這些子任務分配到 slot 來並行執行程序。

例如,若是 Task Manager 有四個 slot,那麼它將爲每一個 slot 分配 25% 的內存。 能夠在一個 slot 中運行一個或多個線程。 同一 slot 中的線程共享相同的 JVM。 同一 JVM 中的任務共享 TCP 鏈接和心跳消息。Task Manager 的一個 Slot 表明一個可用線程,該線程具備固定的內存,注意 Slot 只對內存隔離,沒有對 CPU 隔離。默認狀況下,Flink 容許子任務共享 Slot,即便它們是不一樣 task 的 subtask,只要它們來自相同的 job。這種共享能夠有更好的資源利用率。

文字說的比較幹,zhisheng 這裏我就拿下面的圖片來說解:

上面圖片中有兩個 Task Manager,每一個 Task Manager 有三個 slot,這樣咱們的算子最大並行度那麼就能夠達到 6 個,在同一個 slot 裏面能夠執行 1 至多個子任務。

那麼再看上面的圖片,source/map/keyby/window/apply 最大能夠有 6 個並行度,sink 只用了 1 個並行。

每一個 Flink TaskManager 在集羣中提供 slot。 slot 的數量一般與每一個 TaskManager 的可用 CPU 內核數成比例。通常狀況下你的 slot 數是你每一個 TaskManager 的 cpu 的核數。

可是 flink 配置文件中設置的 task manager 默認的 slot 是 1。

slot 和 parallelism

下面給出官方的圖片來更加深入的理解下 slot:

一、slot 是指 taskmanager 的併發執行能力

taskmanager.numberOfTaskSlots:3

每個 taskmanager 中的分配 3 個 TaskSlot, 3 個 taskmanager 一共有 9 個 TaskSlot。

二、parallelism 是指 taskmanager 實際使用的併發能力

parallelism.default:1

運行程序默認的並行度爲 1,9 個 TaskSlot 只用了 1 個,有 8 個空閒。設置合適的並行度才能提升效率。

三、parallelism 是可配置、可指定的

上圖中 example2 每一個算子設置的並行度是 2, example3 每一個算子設置的並行度是 9。

example4 除了 sink 是設置的並行度爲 1,其餘算子設置的並行度都是 9。

好了,既然並行度和 slot zhisheng 都帶你們過了一遍了,那麼再來看文章開頭的問題:slot 資源不夠。

問題緣由

如今這個問題的答案其實就已經很明顯了,就是咱們設置的並行度 parallelism 超過了 Task Manager 能提供的最大 slot 數量,因此纔會報這個錯誤。

再來拿個人代碼來看吧,當時我就是隻設置了整個項目的並行度:

1
env.setParallelism(15);

爲何要設置 15 呢,由於我項目消費的 Kafka topic 有 15 個 parttion,就想着讓一個並行去消費一個 parttion,沒曾想到 Flink 資源的不夠,稍微下降下 並行度爲 10 後就沒出現這個錯誤了。

總結

本文由本身項目生產環境的一個問題來說解了本身對 Flink parallelism 和 slot 的理解,並告訴你們如何去設置這兩個參數,最後也指出了問題的緣由所在。

相關文章
相關標籤/搜索