做者:嶽猛
整理:毛鶴數組
本文根據 Apache Flink 系列直播課程整理而成,由 Apache Flink Contributor、網易雲音樂實時計算平臺研發工程師嶽猛分享。主要分享內容爲 Flink Job 執行做業的流程,文章將從兩個方面進行分享:一是如何從 Program 到物理執行計劃,二是生成物理執行計劃後該如何調度和執行。緩存
Flink 有四層轉換流程,第一層爲 Program 到 StreamGraph;第二層爲 StreamGraph 到 JobGraph;第三層爲 JobGraph 到 ExecutionGraph;第四層爲 ExecutionGraph 到物理執行計劃。經過對 Program 的執行,可以生成一個 DAG 執行圖,即邏輯執行圖。以下:session
第一部分將先講解四層轉化的流程,而後將以詳細案例講解四層的具體轉化。架構
Program 轉換成 StreamGraph 具體分爲三步:併發
經過 WindowWordCount 來看代碼到 StreamGraph 的轉化,在 flatMap transform 設置 slot 共享組爲 flatMap_sg,併發設置爲 4,在聚合的操做中設置 slot 共享組爲 sum_sg, sum() 和 counts() 併發設置爲 3,這樣設置主要是爲了演示後面如何嵌到一塊兒的,跟上下游節點的併發以及上游的共享組有關。app
WindowWordCount 代碼中能夠看到,在 readTextFile() 中會生成一個 transform,且 transform 的 ID 是 1;而後到 flatMap() 會生成一個 transform, transform 的 ID 是 2;接着到 keyBy() 生成一個 transform 的 ID 是 3;再到 sum() 生成一個 transform 的 ID 是 4;最後到 counts()生成 transform 的 ID 是 5。框架
transform 的結構如圖所示,第一個是 flatMap 的 transform,第二個是 window 的 transform,第三個是 SinkTransform 的 transform。除此以外,還能在 transform 的結構中看到每一個 transform 的 input 是什麼。frontend
接下來介紹一下 StreamNode 和 StreamEdge。分佈式
WindowWordCount transform 到 StreamGraph 轉化如圖所示,StreamExecutionEnvironment 的 transformations 存在 3 個 transform,分別是 Flat Map(Id 2)、Window(Id 4)、Sink(Id 5)。ide
transform 的時候首先遞歸處理 transform 的 input,生成 StreamNode,而後經過 StreamEdge 連接上下游 StreamNode。須要注意的是,有些 transform 操做並不會生成StreamNode 如 PartitionTransformtion,而是生成個虛擬節點。
在轉換完成後能夠看到,streamNodes 有四種 transform 形式,分別爲 Source、Flat Map、Window、Sink。
每一個 streamNode 對象都攜帶併發個數、slotSharingGroup、執行類等運行信息。
StreamGraph 到 JobGraph 的轉化步驟:
從 source 節點遞歸尋找嵌到一塊兒的 operator 中,嵌到一塊兒須要知足必定的條件,具體條件介紹以下:
JobGraph 對象結構如上圖所示,taskVertices 中只存在 Window、Flat Map、Source 三個 TaskVertex,Sink operator 被嵌到 window operator 中去了。
Flink 任務失敗的時候,各個 operator 是可以從 checkpoint 中恢復到失敗以前的狀態的,恢復的時候是依據 JobVertexID(hash 值)進行狀態恢復的。相同的任務在恢復的時候要求 operator 的 hash 值不變,所以可以獲取對應的狀態。
若是用戶對節點指定了一個散列值,則基於用戶指定的值可以產生一個長度爲 16 的字節數組。若是用戶沒有指定,則根據當前節點所處的位置,產生一個散列值。
考慮的因素主要有三點:
JobGraph 到 ExexcutionGraph 以及物理執行計劃的流程:
基於 Yarn 層面的架構相似 Spark on Yarn 模式,都是由 Client 提交 App 到 RM 上面去運行,而後 RM 分配第一個 container 去運行 AM,而後由 AM 去負責資源的監督和管理。須要說明的是,Flink 的 Yarn 模式更加相似 Spark on Yarn 的 cluster 模式,在 cluster 模式中,dirver 將做爲 AM 中的一個線程去運行。Flink on Yarn 模式也是會將 JobManager 啓動在 container 裏面,去作個 driver 相似的任務調度和分配,Yarn AM 與 Flink JobManager 在同一個 Container 中,這樣 AM 能夠知道 Flink JobManager 的地址,從而 AM 能夠申請 Container 去啓動 Flink TaskManager。待 Flink 成功運行在 Yarn 集羣上,Flink Yarn Client 就能夠提交 Flink Job 到 Flink JobManager,並進行後續的映射、調度和計算處理。
per job clusters
的部署方式,可是又支持能夠在一個集羣上運行多個做業的 session 模式,使人疑惑。在 Flink 版本 1.5 中引入了 Dispatcher,Dispatcher 是在新設計裏引入的一個新概念。Dispatcher 會從 Client 端接受做業提交請求並表明它在集羣管理器上啓動做業。
客戶端提交 JobGraph 以及依賴 jar 包到 YarnResourceManager,接着 Yarn ResourceManager 分配第一個 container 以此來啓動 AppMaster,Application Master 中會啓動一個 FlinkResourceManager 以及 JobManager,JobManager 會根據 JobGraph 生成的 ExecutionGraph 以及物理執行計劃向 FlinkResourceManager 申請 slot,FlinkResoourceManager 會管理這些 slot 以及請求,若是沒有可用 slot 就向 Yarn 的 ResourceManager 申請 container,container 啓動之後會註冊到 FlinkResourceManager,最後 JobManager 會將 subTask deploy 到對應 container 的 slot 中去。
會增長一個過程,就是 Client 會直接經過 HTTP Server 的方式,而後用 Dispatcher 將這個任務提交到 Yarn ResourceManager 中。
新框架具備四大優點,詳情以下:
single cluster job on Yarn 模式涉及三個實例對象:
clifrontend
YarnJobClusterEntrypoint(Master)
YarnTaskExecutorRunner (slave)
整個任務運行代碼調用流程以下圖:
調用 StreamTask 的 invoke 方法,執行步驟以下: * initializeState()即operator的initializeState() * openAllOperators() 即operator的open()方法 * 最後調用 run 方法來進行真正的任務處理
咱們來看下 flatMap 對應的 OneInputStreamTask 的 run 方法具體是怎麼處理的。
@Override protected void run() throws Exception { // cache processor reference on the stack, to make the code more JIT friendly final StreamInputProcessor<IN> inputProcessor = this.inputProcessor; while (running && inputProcessor.processInput()) { // all the work happens in the "processInput" method } }
最終是調用 StreamInputProcessor 的 processInput() 作數據的處理,這裏麪包含用戶的處理邏輯。
public boolean processInput() throws Exception { if (isFinished) { return false; } if (numRecordsIn == null) { try { numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); } catch (Exception e) { LOG.warn("An exception occurred during the metrics setup.", e); numRecordsIn = new SimpleCounter(); } } while (true) { if (currentRecordDeserializer != null) { DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate); if (result.isBufferConsumed()) { currentRecordDeserializer.getCurrentBuffer().recycleBuffer(); currentRecordDeserializer = null; } if (result.isFullRecord()) { StreamElement recordOrMark = deserializationDelegate.getInstance(); //處理watermark if (recordOrMark.isWatermark()) { // handle watermark //watermark處理邏輯,這裏可能引發timer的trigger statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), currentChannel); continue; } else if (recordOrMark.isStreamStatus()) { // handle stream status statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), currentChannel); continue; //處理latency watermark } else if (recordOrMark.isLatencyMarker()) { // handle latency marker synchronized (lock) { streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker()); } continue; } else { //用戶的真正的代碼邏輯 // now we can do the actual processing StreamRecord<IN> record = recordOrMark.asRecord(); synchronized (lock) { numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); //處理數據 streamOperator.processElement(record); } return true; } } } //這裏會進行checkpoint barrier的判斷和對齊,以及不一樣partition 裏面checkpoint barrier不一致時候的,數據buffer, final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } } }
streamOperator.processElement(record) 最終會調用用戶的代碼處理邏輯,假如 operator 是 StreamFlatMap 的話,
@Override public void processElement(StreamRecord<IN> element) throws Exception { collector.setTimestamp(element); userFunction.flatMap(element.getValue(), collector);//用戶代碼 }
本文爲雲棲社區原創內容,未經容許不得轉載。