之因此寫這個是由於前段時間本身的項目出現過這樣的一個問題:apache
1 2 3 |
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation". |
跟着這問題在 Flink 的 Issue 列表裏看到了一個相似的問題:https://issues.apache.org/jira/browse/FLINK-9056
,看下面的評論差很少就是 TaskManager 的 slot 數量不足的緣由,致使 job 提交失敗。在 Flink 1.63 中已經修復了變成拋出異常了。併發
居然知道了是由於 slot 不足的緣由了,那麼咱們就要先了解下 slot 是什麼東東呢?不過文章這裏先介紹下 parallelism。app
如翻譯這樣,parallelism 是並行的意思,在 Flink 裏面表明每一個任務的並行度,適當的提升並行度能夠大大提升 job 的執行效率,好比你的 job 消費 kafka 數據過慢,適當調大可能就消費正常了。jsp
那麼在 Flink 中怎麼設置並行度呢?學習
如上圖,在 flink 配置文件中能夠查看到默認並行度是 1,spa
1 2 3 4 |
cat flink-conf.yaml | grep parallelism # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 |
因此你如何在你的 flink job 裏面不設置任何的 parallelism 的話,那麼他也會有一個默認的 parallelism = 1。那也意味着你能夠修改這個配置文件的默認並行度。命令行
若是你是用命令行啓動你的 Flink job,那麼你也能夠這樣設置並行度(使用 -p 並行度):線程
1 |
./bin/flink run -p 10 ../word-count.jar |
你也能夠經過這樣來設置你整個程序的並行度:翻譯
1 2 |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10); |
注意:這樣設置的並行度是你整個程序的並行度,那麼後面若是你的每一個算子不單獨設置並行度覆蓋的話,那麼後面每一個算子的並行度就都是這裏設置的並行度的值了。3d
如何給每一個算子單獨設置並行度呢?
1 2 3 4 |
data.keyBy(new xxxKey()) .flatMap(new XxxFlatMapFunction()).setParallelism(5) .map(new XxxMapFunction).setParallelism(5) .addSink(new XxxSink()).setParallelism(1) |
如上,就是在每一個算子後面單獨的設置並行度,這樣的話,就算你前面設置了 env.setParallelism(10) 也是會被覆蓋的。
這也說明優先級是:算子設置並行度 > env 設置並行度 > 配置文件默認並行度
並行度講到這裏應該都懂了,下面 zhisheng 就繼續跟你講講 什麼是 slot?
其實什麼是 slot 這個問題以前在第一篇文章 《從0到1學習Flink》—— Apache Flink 介紹 中就介紹過了,這裏再講細一點。
圖中 Task Manager 是從 Job Manager 處接收須要部署的 Task,任務的並行性由每一個 Task Manager 上可用的 slot 決定。每一個任務表明分配給任務槽的一組資源,slot 在 Flink 裏面能夠認爲是資源組,Flink 將每一個任務分紅子任務而且將這些子任務分配到 slot 來並行執行程序。
例如,若是 Task Manager 有四個 slot,那麼它將爲每一個 slot 分配 25% 的內存。 能夠在一個 slot 中運行一個或多個線程。 同一 slot 中的線程共享相同的 JVM。 同一 JVM 中的任務共享 TCP 鏈接和心跳消息。Task Manager 的一個 Slot 表明一個可用線程,該線程具備固定的內存,注意 Slot 只對內存隔離,沒有對 CPU 隔離。默認狀況下,Flink 容許子任務共享 Slot,即便它們是不一樣 task 的 subtask,只要它們來自相同的 job。這種共享能夠有更好的資源利用率。
文字說的比較幹,zhisheng 這裏我就拿下面的圖片來說解:
上面圖片中有兩個 Task Manager,每一個 Task Manager 有三個 slot,這樣咱們的算子最大並行度那麼就能夠達到 6 個,在同一個 slot 裏面能夠執行 1 至多個子任務。
那麼再看上面的圖片,source/map/keyby/window/apply 最大能夠有 6 個並行度,sink 只用了 1 個並行。
每一個 Flink TaskManager 在集羣中提供 slot。 slot 的數量一般與每一個 TaskManager 的可用 CPU 內核數成比例。通常狀況下你的 slot 數是你每一個 TaskManager 的 cpu 的核數。
可是 flink 配置文件中設置的 task manager 默認的 slot 是 1。
下面給出官方的圖片來更加深入的理解下 slot:
一、slot 是指 taskmanager 的併發執行能力
taskmanager.numberOfTaskSlots:3
每個 taskmanager 中的分配 3 個 TaskSlot, 3 個 taskmanager 一共有 9 個 TaskSlot。
二、parallelism 是指 taskmanager 實際使用的併發能力
parallelism.default:1
運行程序默認的並行度爲 1,9 個 TaskSlot 只用了 1 個,有 8 個空閒。設置合適的並行度才能提升效率。
三、parallelism 是可配置、可指定的
上圖中 example2 每一個算子設置的並行度是 2, example3 每一個算子設置的並行度是 9。
example4 除了 sink 是設置的並行度爲 1,其餘算子設置的並行度都是 9。
好了,既然並行度和 slot zhisheng 都帶你們過了一遍了,那麼再來看文章開頭的問題:slot 資源不夠。
如今這個問題的答案其實就已經很明顯了,就是咱們設置的並行度 parallelism 超過了 Task Manager 能提供的最大 slot 數量,因此纔會報這個錯誤。
再來拿個人代碼來看吧,當時我就是隻設置了整個項目的並行度:
1 |
env.setParallelism(15); |
爲何要設置 15 呢,由於我項目消費的 Kafka topic 有 15 個 parttion,就想着讓一個並行去消費一個 parttion,沒曾想到 Flink 資源的不夠,稍微下降下 並行度爲 10 後就沒出現這個錯誤了。
本文由本身項目生產環境的一個問題來說解了本身對 Flink parallelism 和 slot 的理解,並告訴你們如何去設置這兩個參數,最後也指出了問題的緣由所在。