Flink之體系--Task Execution--Tasks--Parallelism

Flink 並行度:

優先級:算子層面>環境層面>客戶端層面>系統層面java

Operator Level(操做算子層面)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
複製代碼
  • operators、data sources、data sinks均可以調用setParallelism()方法來設置parallelism

Execution Environment Level(執行環境層面)

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
複製代碼
  • 在ExecutionEnvironment裏頭能夠經過setParallelism來給operators、data sources、data sinks設置默認的parallelism;若是operators、data sources、data sinks本身有設置parallelism則會覆蓋ExecutionEnvironment設置的parallelism

Client Level(客戶端層面)

./bin/flink run -p 10 ../examples/*WordCount-java*.jar
複製代碼

或者程序員

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
複製代碼
  • 使用CLI client,能夠在命令行調用是用-p來指定,或者Java/Scala調用時在Client.run的參數中指定parallelism

System Level(系統層面)

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1
複製代碼
  • 能夠在flink-conf.yaml中經過parallelism.default配置項給全部execution environments指定系統級的默認parallelism

Flink數據流圖簡介

1.1 Flink做業的邏輯視圖

在大數據領域,詞頻統計(WordCount)程序就像是一個編程語言的HelloWorld程序,它展現了一個大數據引擎的基本規範。麻雀雖小,五臟俱全,從這個樣例中,咱們能夠一窺Flink設計和運行原理。數據庫

image-20201019114310250.png

如圖1所示,程序分爲三大部分,第一部分讀取數據源(Source),第二部分對數據作轉換操做(Transformation),最後將轉換結果輸出到一個目的地(Sink)。代碼中的方法被稱爲算子(Operator),是Flink提供給程序員的接口,程序員須要經過這些算子對數據進行操做。Source算子讀取數據源中的數據,數據源能夠是數據流、也能夠存儲在文件系統中的文件。Transformation算子對數據進行必要的計算處理。Sink算子將處理結果輸出,數據通常被輸出到數據庫、文件系統或下一個數據流程序。apache

咱們能夠把算子理解爲1 + 2 運算中的加號,加號(+)是這個算子的一個符號表示,它表示對數字1和數字2作加法運算。一樣,在Flink或Spark這樣的大數據引擎中,算子對數據進行某種操做,程序員能夠根據本身的需求調用合適的算子,完成所需計算任務。經常使用的算子有mapflatMapkeyBytimeWindow等,它們分別對數據流執行不一樣類型的操做。編程

image-20201019114432008.png

在程序實際運行前,Flink會將用戶編寫的代碼作一個簡單處理,生成一個如圖2所示的邏輯視圖。圖 2展現了WordCount程序中,數據從不一樣算子間流動的狀況。圖中,圓圈表明算子,圓圈間的箭頭表明數據流,數據流在Flink程序中通過不一樣算子的計算,最終生成爲目標數據。其中,keyBytimeWindowsum共同組成了一個時間窗口上的聚合操做,被歸結爲一個算子。咱們能夠在Flink的Web UI中,點擊一個做業,查看這個做業的邏輯視圖。api

對於詞頻統計這個案例,邏輯上來說無非是對數據流中的單詞作提取,而後使用一個Key-Value結構對單詞作詞頻計數,最後輸出結果便可,這樣的邏輯本能夠用幾行代碼完成,改爲使用算子形式,反而讓新人看着一頭霧水,爲何必定要用算子的形式來寫程序呢?實際上,算子進化成當前這個形態,就像人類從石塊計數,到手指計數,到算盤計數,再到計算機計數這樣的進化過程同樣,儘管更低級的方式能夠完成必定的計算任務,可是隨着計算規模的增加,古老的計數方式存在着低效的弊端,沒法完成更高級別和更大規模的計算需求。試想,若是咱們不使用大數據引擎提供的算子,而是本身實現一套上述的計算邏輯,儘管咱們能夠快速完成當前的詞頻統計的任務,可是當面臨一個新計算任務時,咱們須要從新編寫程序,完成一整套計算任務。咱們本身編寫代碼的橫向擴展性可能很低,當輸入數據暴增時,咱們須要作很大改動,以部署在更多機器上。數據結構

大數據引擎的算子對計算作了一些抽象,對於新人來講有必定學習成本,而一旦掌握這門技術,人們所能處理的數據規模將成倍增長。大數據引擎的算子出現,正是針對數據分佈在多個節點的大數據場景下,須要一種統一的計算描述語言來對數據作計算而進化出的新計算形態。基於Flink的算子,咱們能夠定義一個數據流的邏輯視圖,以此完成對大數據的計算。剩下那些數據交換、橫向擴展、故障恢復等問題全交由大數據引擎來解決。多線程

1.2 從邏輯視圖到物理執行

在絕大多數的大數據處理場景下,一臺機器節點沒法處理全部數據,數據被切分到多臺節點上。在大數據領域,當數據量大到超過單臺機器處理能力時,須要將一份數據切分到多個分區(Partition)上,每一個分區分佈在一臺虛擬機或物理機上。架構

前一小節已經提到,大數據引擎的算子提供了編程接口,咱們可使用算子構建數據流的邏輯視圖。考慮到數據分佈在多個節點的狀況,邏輯視圖只是一種抽象,須要將邏輯視圖轉化爲物理執行圖,才能在分佈式環境下執行。
image-20201019114604556.png併發

圖 3爲WordCount程序的物理執行圖,這裏數據流分佈在2個分區上。箭頭部分表示數據流分區,圓圈部分表示算子在分區上的算子子任務(Operator Subtask)。從邏輯視圖變爲物理執行圖後,FlatMap算子在每一個分區都有一個算子子任務,以處理該分區上的數據:FlatMap[1/2]算子子任務處理第一個數據流分區上的數據,以此類推。

算子子任務又被稱爲算子實例,一個算子在並行執行時,會有多個算子實例。即便輸入數據增多,咱們也能夠經過部署更多的算子實例來進行橫向擴展。從圖 3中能夠看到,除去Sink外的算子都被分紅了2個算子實例,他們的並行度(Parallelism)爲2,Sink算子的並行度爲1。並行度是能夠被設置的,當設置某個算子的並行度爲2時,也就意味着有這個算子有2個算子子任務(或者說2個算子實例)並行執行。實際應用中通常根據輸入數據量的大小,計算資源的多少等多方面的因素來設置並行度。

注意,在本例中,爲了演示,咱們把全部算子的並行度設置爲了2:env.setParallelism(2);,把最後輸出的並行度設置成了1:wordCount.print().setParallelism(1);。若是不單獨設置print的並行度的話,它的並行度也是2。

算子子任務是Flink物理執行的基本單元,算子子任務之間是相互獨立的,某個算子子任務有本身的線程,不一樣算子子任務可能分佈在不一樣的節點上。後文在Flink的資源分配部分咱們還會重點介紹算子子任務。

再談邏輯視圖到物理執行圖

瞭解了Flink的分佈式架構和核心組件,這裏咱們從更細粒度上來介紹從邏輯視圖轉化爲物理執行圖過程,該過程能夠分紅四層:StreamGraph -> JobGraph -> ExecutionGraph -> 物理執行圖。

image-20201019114833862.png

  • StreamGraph:是根據用戶編寫的代碼生成的最初的圖,用來表示一個Flink做業的拓撲結構。在StreamGraph中,節點StreamNode就是算子。
  • JobGraphJobGraph是提交給 JobManager 的數據結構。StreamGraph通過優化後生成了JobGraph,主要的優化爲,將多個符合條件的節點連接在一塊兒做爲一個JobVertex節點,這樣能夠減小數據交換所須要的傳輸開銷。這個連接的過程叫作算子鏈(Operator Chain),會在下一小節繼續介紹。JobVertex通過算子鏈後,會包含一到多個算子,它輸出是IntermediateDataSet,是通過算子處理產生的數據集。
  • ExecutionGraph:JobManager將 JobGraph轉化爲ExecutionGraphExecutionGraphJobGraph的並行化版本:假如某個JobVertex的並行度是2,那麼它將被劃分爲2個ExecutionVertexExecutionVertex表示一個算子子任務,它監控着單個子任務的執行狀況。每一個ExecutionVertex會輸出一個IntermediateResultPartition,這是單個子任務的輸出,再通過ExecutionEdge輸出到下游節點。ExecutionJobVertex是這些並行子任務的合集,它監控着整個算子的運行狀況。ExecutionGraph是調度層很是核心的數據結構。
  • 物理執行圖:JobManager根據ExecutionGraph對做業進行調度後,在各個TaskManager上部署具體的任務,物理執行圖並非一個具體的數據結構。

能夠看到,Flink在數據流圖上可謂煞費苦心,僅各種圖就有四種之多。對於新人來講,能夠不用太關心這些很是細節的底層實現,只須要了解如下幾個核心概念:

  • Flink採用主從架構,Master起着管理協調做用,TaskManager負責物理執行,在執行過程當中會發生一些數據交換、生命週期管理等事情。
  • 用戶調用Flink API,構造邏輯視圖,Flink會對邏輯視圖優化,並轉化爲並行化的物理執行圖,最後被執行的是物理執行圖。

任務、算子子任務與算子鏈

在構造物理執行圖的過程當中,Flink會將一些算子子任務連接在一塊兒,組成算子鏈。連接後以任務(Task)的形式被TaskManager調度執行。使用算子鏈是一個很是有效的優化,它能夠有效下降算子子任務之間的傳輸開銷。連接以後造成的Task是TaskManager中的一個線程。

image-20201019114927309.png

例如,數據從Source前向傳播到FlatMap,這中間沒有發生跨分區的數據交換,所以,咱們徹底能夠將Source、FlatMap這兩個子任務組合在一塊兒,造成一個Task。數據通過keyBy發生了數據交換,數據會跨越分區,所以沒法將keyBy以及其後面的窗口聚合連接到一塊兒。因爲WindowAggregation的並行度是2,Sink的並行度爲1,數據再次發生了交換,咱們不能把WindowAggregation和Sink兩部分連接到一塊兒。1.2節中提到,Sink的並行度是人爲設置爲1,若是咱們把Sink的並行度也設置爲2,那麼是可讓這兩個算子連接到一塊兒的。

默認狀況下,Flink會盡可能將更多的子任務連接在一塊兒,這樣能減小一些沒必要要的數據傳輸開銷。但一個子任務有超過一個輸入或發生數據交換時,連接就沒法創建。兩個算子可以連接到一塊兒是有一些規則的,感興趣的讀者能夠閱讀Flink源碼中org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator中的isChainable方法。StreamingJobGraphGenerator類的做用是將StreamGraph轉換爲JobGraph

儘管將算子連接到一塊兒會下降一些傳輸開銷,可是也有一些狀況並不須要太多連接。好比,有時候咱們須要將一個很是長的算子鏈拆開,這樣咱們就能夠將原來集中在一個線程中的計算拆分到多個線程中來並行計算。Flink容許開發者手動配置是否啓用算子鏈,或者對哪些算子使用算子鏈。

任務槽位與計算資源

任務槽位

根據前文的介紹,咱們已經瞭解到TaskManager負責具體的任務執行。TaskManager是一個JVM進程,在TaskManager中能夠並行運行多個Task。在程序執行以前,通過優化,部分子任務被連接在一塊兒,組成一個Task。每一個Task是一個線程,須要TaskManager爲其分配相應的資源,TaskManager使用任務槽位給Task分配資源。

在解釋Flink任務槽位的概念前,咱們先回顧一下進程與線程的概念。在操做系統層面,進程(Process)是進行資源分配和調度的一個獨立單位,線程(Thread)是CPU調度的基本單位。好比,咱們經常使用的Office Word軟件,在啓動後就佔用操做系統的一個進程。Windows上可使用任務管理器來查看當前活躍的進程,Linux上可使用top命令來查看。線程是進程的一個子集,一個線程通常專一於處理一些特定任務,不獨立擁有系統資源,只擁有一些運行中必要的資源,如程序計數器。一個進程至少有一個線程,也能夠有多個線程。多線程場景下,每一個線程都處理一小個任務,多個線程以高併發的方式同時處理多個小任務,能夠提升處理能力。

回到Flink的槽位分配機制上,一個TaskManager是一個進程,TaskManager能夠管理一至多個Task,每一個Task是一個線程,佔用一個槽位。每一個槽位的資源是整個TaskManager資源的子集,好比這裏的TaskManager下有3個槽位,每一個槽位佔用TaskManager所管理的1/3的內存,第一個槽位中的Task不會與第二個槽位中的Task互相爭搶內存資源。注意,在分配資源時,Flink並無將CPU資源明確分配給各個槽位。

image-20201019115013303.png

假設咱們給WordCount程序分配兩個TaskManager,每一個TaskManager又分配3個槽位,因此總共是6個槽位。結合圖 7中對這個做業的並行度設置,整個做業被劃分爲5個Task,使用5個線程,這5個線程能夠按照圖 8所示的方式分配到6個槽位中。

Flink容許用戶設置TaskManager中槽位的數目,這樣用戶就能夠肯定以怎樣的粒度將任務作相互隔離。若是每一個TaskManager只包含一個槽位,那麼運行在該槽位內的任務將獨享JVM。若是TaskManager包含多個槽位,那麼多個槽位內的任務能夠共享JVM資源,好比共享TCP鏈接、心跳信息、部分數據結構等。官方建議將槽位數目設置爲TaskManager下可用的CPU核心數,那麼平均下來,每一個槽位都能平均得到1個CPU核心。

相關文章
相關標籤/搜索