工做中用Flink作批量和流式處理有段時間了,感受只看Flink文檔是對Flink ProgramRuntime的細節描述不是不少, 程序員仍是看代碼最簡單和有效。因此想寫點東西,記錄一下,若是能對別人有所幫助,善莫大焉。html
說一下個人工做,在一個項目裏咱們在Flink-SQL基礎上構建了一個SQL Engine, 使懂SQL非技術人員可以使用SQL代替程序員直接實現Application, 而後在此基礎上在加上一些拖拽的界面,使不懂SQL非技術人員 利用拖拽實現批量或流式數據處理的Application 。 公司的數據源多樣且龐大,發佈渠道也很豐富, 咱們在SQL Engine 裏實現了各類各樣的Table Source (數據源) , Table Sink (數據發佈)和 UDF (計算器), 公司裏有不少十分懂業務專業分析員,若是他們真的能夠簡簡單單,託託拽拽的操做大數據,創建計算模型,而後快速上線和發佈,這樣的產品應該前景廣闊。java
但是後臺並不是提及來這麼簡單,SQL使用不善,難以達到業務想要的效果,數據量一上來各類問題會出現,後端須要大量的優化工做, 好比 數據傾斜, 是最常發生的事情。SQL基本上是一個Join Language。用戶常常會將一個大數據源和一個小數據源作Inner Join, 若是大數據源的數據項很大部分都使用極少數的幾個join key, 就很容易出現數據傾斜。現實傾斜的或不均衡的,好比國際資本>80%以用美圓計價,世界人口50%屬於某兩個國家, 財富主要有20%的人擁有, 等等 。 Flink 若是把SQL join 執行成Hash Join, 最後的結果是不管你實現分配了多少個TaskSlots, 若是80%的數據都跑到某一個TaskSlot裏,緩慢運行直至將個這Slot的資源耗盡,整個job失敗。這種狀況最好是將小數據集廣播給全部的下游通道, 大數據集按原始的分片並行,這樣的join因分配均衡而快速。然而標準SQL裏沒有辦法指定joinhint , Flink sql也不支持這個,只能經過debug flink 來看看哪裏能作一些改變解決這個問題。咱們在最後一章,從Flink client , flink optimizer, flink run-time (job manager, task manager) 一步一步的 在源碼裏設置斷點, debug, 將數據流過一遍,看看有哪些方案能夠將這個小數據集合廣播起來。git
爲了使本文讀起來流暢一些, 我先經過幾個章節大概介紹一下Flink 。本文關心架構, 因此不會涉及不少關於API的東西(好比Flink streaming 的windowing, watermark, Dataset, DataStream, 及SQL的API等, 網上應該有不少關於這些的文章)。只是想大概梳理一個Flink的架構,使架構對應到源碼結構裏, 瞭解一下Flink的 Graph metadata, 高可靠性的設計,不一樣cluster環境裏 depoloyment的實現等, 最後利用IntelliJ IDEA 通過一個小例子帶你們debug一下Flink 。若是對flink的架構有較好的理解(好比主要類及metadata),就比較容易在準確的地方設置斷點,debug Flink代碼將更有效率,從而解決問題就會更有效率, 這就是本文的目的。大概瞭解一下框架,但並不會面面俱到。當若是你須要深刻了解一下Flink某方面的細節, 本文可以告訴你入口在哪裏,或者經過對架構瞭解過程當中獲得的common sense , 再加上一點想象力, 你或許直接可以獲得解決問題的方案, 而後再經過閱讀源碼及調試來加以驗證。 程序員
(圖-1 Flink Runtime 來自:https://ci.apache.org/projects/flink/flink-docs-release-1.6/concepts/runtime.html)github
關於架構,先上一個官方的圖,儘管對於Flink架構,上圖不是很準確(好比client與JobManager的通信已經改成REST 方式, 而非AKKA的actor system),咱們仍是能夠知道一些要點:web
(圖-2,JobManager的內部結構) 算法
如上一章所述, JobManager 是一個單獨的進程(JVM), 它是一個Flink Cluster的 master 、中心和大腦, 他由一堆services 組成(主要是Dispather, JobMaster 和ResourceManager),鏈接cluster裏其餘分佈式組件 (TaskManager, client及其餘外部組件),指揮、得到協助、或提供服務。sql
從以上所述, JobManager是一組Service的總稱, 其中真正管理Job調度的組件叫JobMaster ,負責資源管理的組件叫ResoruceManager, 負責接收client端請求的組件叫Dispatcher(包括Dispatcher和DispatchRestEndpoint)。其實Flink源碼裏有叫JobManager的包和類,功能上也是負責Job調度管理以及snapshot管理,但它應該在Flink某個版本之後就legacy了(估計是從version1.3開始)。這三個服務統稱爲還叫 JobManager,上真正管理做業的是JobMaster。這一點在讀code時讓人迷惑,好比JobManagerRunner啓動的倒是叫JobMaster的類。可是他不叫JobMasterRunner,這也體現了JobMaster實際是取代了JobManager類,保留legacy類是爲了向後兼容。如下是Client, 展開的JobManger(受HA 保護的Dispather, JobMaster, ResourceManager)和TaskManager處理submitJob的流程圖,這個比較圖-1更能體現當前的Flink runtime架構 (Flink 1.6):docker
(圖-3)展開JobManager後的Flink 架構, 來自於《 Stream Processing with Apache Flink》shell
以上的架構嚴格來說在Flink裏被稱做 SessionMode ( Cluster的EntryPoint類都是SessionClusterEntryPoint的子類), 若是沒有外部命令 terminate cluster, 在這種模式下的FlinkCluster 是Long running 的, 多個job能夠同時運行在同一個flinkcluster裏。 SessionMode 在Flink的各類部署都是支持的, 包括Standalone, Kubernetes, Yarn, Mesos, 上圖實際上是StandaloneSessionCluster的流程。 還有一種模式叫作JobMode, 區別就是Job(或application) 的main class 和 jar 和在JobManager 啓動時經過的啓動參數裝載的, 不須要submitJob的過程, job運行完畢, cluster自動終結, 全部資源釋放。 在這種模式下, Dispather並不負責處理job的提交, 但其餘 Client發給DispathcherRESTEndPoint的請求(好比Query, CancelJob), 仍是由Dispatcher處理。
Flink的每一種部署模式(deployment mode)都是既支持Session Mode又支持JobMode的 (或partialy support), 區別如上所述, 但在架構上是一致的。 當有由外部的ResourceManager協助硬件資源分配時,流程略有全部不一樣, 以 FlinkCluster in Yarn 爲例, SessionMode下, 區別只限於多了RM經過Yarn自動啓動TM 的過程(4,5)。
(圖-4)FlinkCluster in Yarn Session mode, 來自於《 Stream Processing with Apache Flink》
關於deployment的細節,請參照後面的將Deployment的章節。
如圖一所示,JobMaster的主要工做是:
1. JobGraph的scheduler : 將Client提交的JobGraph按照邏輯的向後關係(source -> transform -> sink), 以及並行關係(每一個operator的子任務只負責所有數據的中一部分), 將子任務分配到TaskManager的Slot中, 並按期的獲取每個子任務的運行狀態 (status)。
2. 觸發和管理Job的checkpoint snapshot:對於streaming job,按期的將運行中的每一個operator 的狀態(State)數據存入規定的存儲設備, 這些state數據能夠用於在Job恢復運行時,恢復相關子任務的失敗前的現場。
(圖-5)JobMaster內部結構
EG是面向Job 並行運行的圖結構,在JobGraph的基礎上它加入了對Operator並行執行的子任務,以及子任務的輸入輸出的描述 。
圖-6 Execution Graph
Execution: 但EV被分配執行時,Exception對象會被建立做爲EV的一次嘗試, 分配slot, 將EV打包 成 TDD(TaskDeploymentDescriptor)並同TaskManagerGateWay 發送給TM (submitTask) 執行。Exception若是失敗, 新的Exception會被建立做爲另外一次嘗試。
EG的Scheduling模式由兩種,一個叫Lazy, 一個叫Eager 。
Lazy的方式適用於Batch Job, 它先將全部的處理數據輸入的sub task 分配執行, 當TaskManager 返回 (同過JobMasterGateway) 已分配成功的信息, EG在根據EJV的上下游關係, 再給相應的EJV分配slot執行。分配的過程如上一小節所述, EJV全部EV都會經過TDD打包,而後要求SlotPool提供slot, 而後將tdd和slot信息都發送給相應的TM去實例化這個sub task而後運行起來 (再TM細述這個時怎麼實現的)。值得一說的是, EJV全部EV都應該一塊兒scheduling , 但當集羣裏沒有足夠的slot時, 同一個EJV可能只有部分EV被schedule了,若是那些沒有分配的相同EJV的EV再一個timeout(default 5分鐘)以後還沒法獲得slot, task 這時候會失敗, job 也會失敗。因此在計劃Job使用的資源時,計劃的總slot數 (好比當使用Yarn管理resource 時, yn * ys 是job向Yarn申請的總slot數 )必定要大於總的source sub task的總數量(source operator 的數量 * paralelism ), 不然部分soure sub task 得不到資源,再timeout以後就會出發failGlobal 使job 失敗。
Eager 方式使用與Streaming job, 在Eager模式下, 全部EV都必須都能獲得slot, 不然schduling 失敗, job 失敗。
Flink的Checkpoint這個概念仍是有必要簡單說明一下, 固然參考Flink文檔會獲得更全面的理解。Flink主要是一個Stream computation的架構,(固然它也能夠作Batch, 但Batch並非Flink的強項), Flink Streaming processing 的一個特性就是Stated Streaming 。 意思就是在它的流式計算的工做流裏, operator的都是能夠有狀態的。什麼是有狀態的?至關於一我的睡醒以後還記的本身是誰,而後還能繼續下來的生活,作完沒有作完的事情的意思 : 由於大腦裏存儲了過去的信息。Flink 的Operator能夠像這樣生活的,建立一下StateFull的變量 (好比ValueState<T> ), 而後週期性的將這些狀態存儲一個地方 ,當job重啓, operator 從新實例化的時候, 經過加載這些Sate信息,就可以重新回到上次重啓前的狀態,而後繼續這個operator的人生。週期性的(Periodically)存儲operator的State 信息, Flink稱做Checkpoint, 每一次存儲叫作一次snapshot 。
CheckpointCordinator的主要功能就是協調促使工做流裏全部的operator都週期性的觸發Checkpoint snapshot 。
換句話說,CheckpointCordinator的主要功能就是向全部的Execution (看前面回顧一下Execution的概念) triggerCheckPoint 。 每個EG都會建立一個CheckpointCordinator (CC), CC用內建的timer (Executor)定時(根據可配置的interval)的經過RPC向全部的TM觸發他們運載的全部Source Exection對應的Task的CheckpointBarrier。 上一句比較長,分解來講, 每個EG的Execution 都會對應一個Task (準確的說時SubTask)運行在某一個TM 裏, triggerCheckPoint 就是CC定時的經過RPC調用全部 TM 上全部的Source Task ( 是工做流裏開始位置的處理Source 數據的Task, 不包括Sink Task 也不包括 普通的Transformation Task ) 的triggerCheckpointBarrier方法。當一個SourceTask收到triggerCheckpointBarrier時, 它會命令內嵌的Invokable對象 (Operator, 或Operator Chain的封裝對象)執行 performCheckPoint , 這個過程大概有以下幾步, 不少多步驟的時異步執行的:
至於CheckPoint怎麼配置,State數據, StateBackend 包含那些, 以及CheckpointBarrier再工做流裏的聯動過程, 我就不贅述了,網上 應該不少介紹, 不過經過代碼閱讀,想強調以下幾點。
TwoPhaseCommitSinkFunction
。https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html。 大概意思時SinkOperator向external 下游發送數據時須要分兩步走(TwoPhase), 不過目前沒目前 Flink1.6.1 的 Sink都沒有實現這個功能 。
在源碼裏, TaskManager 類同 JobManager被JobMaster取代 同樣,TaskExecutor取代了legcy TaskManager 併發揮着它的做用。本文裏TM指的是TaskManager的整個進程, JM表明JobManager整個進程。JM的核心類是JobMaster (固然還有ClusterEntryPoint, Dispatcher, WebMonitor和ResourceManager, 可是都起的是輔助做用), TM裏的核心類是TaskExecutor 。還有一個比較混亂的Term就是Task。Task對應JobGraph的一個節點,是一個Operator。在ExecutionGraph裏Tast被分解爲多個並行執行的subtask 。每一個subtask做爲一個excution分配到TM裏執行。但比較然人抓狂的是在TM裏這個subtask的概念由一個叫Task的類來實現。因此TM 裏談論的Task對象實際上對應的是EG裏的一個subtask ,若是須要表述Task的概念,用Operator。先澄清一下Terminology ,以避免語言混亂。
圖-7 TaskExecutor
我的以爲Task之間的數據交換,是TM裏的核心和重點,也是 Flink runtime的核心、重點和難點,它的質量是區別於其餘大數據系統的關鍵,它的質量直接影響Streaming/Batch 任務的延遲及吞吐量兩大指標。人們選擇一個大數據流式框架最想先了解的就是這兩個指標, 至因而不是Statefull, 可靠性可用性集成度怎麼樣,編程接口是否簡單不是不重要,但不是最關鍵的,是次要考慮的。因此本節做爲本文的重點,咱們深刻解讀一下TM管理的數據交換。
首先強調一下Task之間的在網絡裏或在本地的數據交換, 不是Task管理的,是由TM (或TaskExecutor)管理的。以下圖所示。
圖-8 JM和TM的數據交換
JM將EG的Execution 提交給 TM,TM將Execution轉化爲Task執行, 並負責Task之間的數據傳輸.
回想一下EG的數據結構:
(R
圖-9 EG 的數據結構 和 數據流
爲了將JM的EG的邏輯工做流在物理機上執行起來, TM建立了相應的物理數據結構。
ResultPartition(RP)概念上對應着EG中的IRP(IntermediateResultPartition), 它負責存放一個Task生成的全部結果數據。
ResultSubpartition (RS) 是對應着RP的數據Repartion以後shuffle數的中的一個parition ,它存放着RP從新分區後發送下游的某一個sub task的分區數據。
InputGate(IG): 在EE中, IRP是source, EV 是target, 它描述了分區數據的流向, 但對不物理實現這樣的結構仍是不夠的。 IG 由此而引入, RP是EE上的數據發送端(IRP的物理實現),IG是接收端上與RP功能相似的組件, 負責收集來自上游(RP)的數據區 (data buffer)。
InputChannel(IC) 是接受端與RS功能相似的組件,IG使用不一樣的IC鏈接不一樣EE上的RP 中特定分區的數據區。 好比在data shuffle的過程當中多個RP都產生鍵值爲a1的RS , 這些RSa1中的數據最終都會流向同一個IG中的IC, 這其中每個IC承擔接受上游的RP中每個RS-a1中的數據。
數據在EE上的通信, 由RP到IG ,數據是以binary 形式傳輸的。也就是說數據進入RP前,會由總Serializer 將data record 序列化成binary format, 並存入data buffer 中。 Data data buffer 由IG 接受傳遞給下游的Oparor 前, 由Deseriealizer將數據從DataBuffer反序列化成data record, 供該Task消費使用。 Data buffer至關於高速公路上運輸乘客的大巴車, buffer中的數據至關於乘客 。 每一個大巴車的形狀, 運載量也是固定的, 不裝滿不發車 。它能極大的增長了整體數據傳遞傳輸量和創數率, 增長系統的吞吐量, 但同時也增長的單個數據的延遲 。缺省狀況下2048 個buffer會被建立, 每個32K字節 。對與比較大的record 須要多個buffer 承載 。每個RS和IC有一些大buffer組成, RP和IG就是這些大巴車的裝載者和卸載者 。
關於DataBuffer若是建立和管理, 參考後面的內存管理章節。
另外值得一提的是, 不一樣的對於RS的實現決定了實際的數據傳輸的方式。
PipelinedSubpartition支持streaming模式下的數據傳輸 (大部分的RS都是這種實現): 數據壓滿一個buffer (buffer size是configuration 指定)就向下傳遞 。 SpillableSubpartition 只有當RP的類型爲BLOCKING是纔會建立出來(Batch job 中的部分RS 是這種實現)。它支持在事先分配的Buffer不夠用的狀況下, 將Buffer中的數據Spill到硬盤中,從而該RS佔有的Buffer得以釋放。由於他會涉及IOManager (應該只有它會使用到IOManager), 因此我們細說一下。
那什麼是BLOCKING類型的? 從ResultPartitionType是這樣定義:BLOCKING(false, false, false)能夠看到, 3個false 分別以下:
簡單來講只有Batch job 裏的一些RP類型是Blocking的 ,由於BatchJob總一些須要shffule 輸出的operator纔會有纔會啓動Blocking模式, SpillableSubpartition纔會被建立。
注意 上面說到了的三個模式:概念比較混亂。
1. JobMode (Batch or Streaming):任務模式, 因爲決定ExecutionConext
2. ExecutionMode (PIPELINED (default), BATCH, PIPELINED_FORCE, BATCH_FORCE):可配置的數據流總體模式, 目的是經過一個可配置的ExecutionMode(可在ExecutionConfig中配置)來決定全部Operator的DataExchangeMode。它是Optimizer在優化Edge的shipStrategy和DataExchangeMode作策略選擇的依據。 詳細看DataExchgeMode代碼中,DataExchgeMode與ExecutionMode的Mapping關係。
3. DataExchangeMode(PIPELINED, BATCH, PIPELINE_WITH_BATCH_FALLBACK ) : 根據下游Operator,和 ExecutionMode, 由Optimizer 決定 數據下發模式 ,
下面是當JobMode 爲Batch, ExecutionMode 爲PIPELINE時, DataExchangeMode應該優化的結果。能夠看出來只要下游須要Shuffle, DataExchangeMode就會被優化成BATCH模式, 此時Flink會建立SpillableSubpartition 。
DataExchangeMode.PIPELINED, // to map
DataExchangeMode.PIPELINED, // to combiner connections are pipelined
DataExchangeMode.BATCH, // to reduce
DataExchangeMode.BATCH, // to filter
DataExchangeMode.PIPELINED, // to sink after reduce
DataExchangeMode.PIPELINED, // to join (first input)
DataExchangeMode.BATCH, // to join (second input)
DataExchangeMode.PIPELINED, // combiner connections are pipelined
DataExchangeMode.BATCH, // to other reducer
DataExchangeMode.PIPELINED, // to flatMap
DataExchangeMode.PIPELINED, // to sink after flatMap
DataExchangeMode.PIPELINED, // to coGroup (first input)
DataExchangeMode.PIPELINED, // to coGroup (second input)
DataExchangeMode.PIPELINED // to sink after coGroup
由於batch 工做模式下的 shuffle一般會伴隨的對整個group的排序, aggregation等, 下游須要獲得(該group的)全集纔可作這些操做。全局數據量比較大, 物理內存Buffer極可能不夠用, 這時候SpillableSubpartition(在IOManager的幫助下)可將一部分硬盤當Buffer來用, 者極大的幫助了RP端的數據緩存。但SpillToDisk必須實如今RP端嗎?不能夠在接受端(InputGate)實現嗎? 接受端的計算須要作密集的內存訪問(sort, hash, etc ), 這些算法都是在整個數據集上的操做, 因此數據須要緩存在MemoryManager管理的MemorySegment中 從而提升存取效率, 也能爲使Flink對內存作有效的管理。可現實是Flink的BatchJob須要消耗巨大的內存, 這跟接收端不使用硬盤作Buffer由很大關係。至少Flink 1.6.1的Memory是不使用硬盤作緩存的,雖然有HybridOffHeapMemoryPool, 並且它也使用了DirectMemory 來分配MemorySegment, 但並無實現用磁盤文件來映射內存 , 全部當上遊數據量很大, 但內存不足時, Flink task會很快的out of memory。以後看看Flink的最新版本是否有改善。
R
圖-10 使用EG控制物流數據流(數據交換)
圖-10描述了Task之間數據交換的大概流程。
圖-11給出了跨TM的兩個Task之間的數據交換的更多細節。
1.3.2 Task 提交與執行 (TDD, Task, AbstractInvokable , Driver , ...)
根據前面的介紹,Task是由JM(經過EG) 通過Scheduler (申請和Offer slot resource和根據schedule 策略,等等) 提交給TM執行的。申請resource 和提交Task都是經過JM, RM 和TM之間的RPC通道完成的。那麼具體提交了什麼呢?Task若是執行的呢?Task如何知道誰是上游從而創建InputChannel的?
實際上這是一個 從EV->TDD->Task->AbstractInvokable->Driver ->具體的Oparator 實現類變形的過程。
1. 從EV 到TDD :
TaskDeploymentDescriptor(TDD) : 是TM在submitTask是提交給TM的數據結構。 他包含了關於Task的全部描述信息:
不難看出,EV包含上述的全部信息,EV的一個方法createDeploymentDescriptor,完成了上述變形。JM 在向TM submitTask時,傳遞的是TDD不是EV。爲何要作此變形的而不是將EV直接傳過去既然他們很相似? 我想這個一個設計模式問題, EV是做爲ExcutionGraph的中的頂點 ,它最好只存在於JobMaster 的物理圖中, 而不是做爲參數傳遞給其餘組件,從而維護它的獨立性和單一性。參見Descripor設計模式。
當TaskExecutor(TM)接受submitTask 的RPC調用從而獲得TDD時, 他會將TDD實例化爲一個在TM上能夠執行的對象 : Task 。
2. Task :
Task 是一個Runnable 對象, TM接受到TDD 後會用它實例化成一個Task對象, 並啓動一個線程執行Task的Run方法。
Task實例化時, 他會將TDD中的IGD實例化成InputGate (IG) 和 InputChannel(IC), 將RPD實例化成RP . 在 Task 的Run 方法被調用時, 它根據TDD的 TaskInfo, 使用URLClassLoader將用戶的operator類從HDFS
加載, 並實例化TaskInfo所描述的AbstractInvokable對象, 並將IG, IC, RP , 還有其餘全部的AbstractInvokable須要的服務類(MemoryManager, IOManager, CheckoutResponder, TaskConfig etc )都傳遞進去, 而後調用AbstractInvokable 的 invoke 方法。
3. AbstractInvokable
如以前給出的一些例子 : DataSourceTask, DataSinkTask, BatchTask, StreamTask 。 它們是Task.Run()時, 經過TaskInfo加載並實例化的。 這些Task Operator的源代碼都在
org.apache.flink.runtime.operators 或org.apache.flink.streaming.runtime.tasks下面 。每一個BaskTask的須要的工具都是相似的, 只是計算的流程不一樣 , 因此BaskTask的invoke方法調用時, 它根據TaskConfig(信息繼承與TDD的TaskInfo) 加載和實例化不一樣的Driver 類, 並調用Driver的run方法由Driver指揮流程(input, calcuate, out , etc )。
4. Driver
Driver類和AbstractInvokable位於同一個包內。 每個Driver的run() 大都是一個循環。 不停向IG 要next record , 寫metrics, 調用function 計算, 而後見結果發給RP。只不過有些drver一次計算須要兩個record, 有些driver 須要兩個record 來自不一樣的IG , 有些須要將全部的input所有收完才計算, 有些在window expired後才計算, 等等。下面是FlatmapDriver 的run() 。
while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}
在stream模式下數據處理是有界限(bondary)的, 每一個window的處理所使用的內存是相對比較小 的, 因此Flink stream job 一般使用的內存較小。
但在batch 模式下, 數據處理是無界的 (所謂無界就是沒有Window),如前面所示, 不少job 須要將上游全部的input都取乾淨,纔開始計算, 如sort, hash join, cache 等, 此時所須要的內存是巨大的, 好比上游operator 讀取300G文件而後map成record, 下游operator 須要將這些record 同另外一組輸入作outer join 。因此Flink內存管理主要針對的是這些batch job 。
總起來將, Flink的內存管理仍是比較失敗的, 至少在我用的版本里(1.6.1) , 主要緣由仍是 MemoryManager並無聯合IOManager Disk去擴展內存。但我覺的MemoryManager的引入, 就是爲了管理Flink的內存, 以防止OutOfMemoryException的發生 (如Spark同樣 )。 防止溢出最直接的方法就是當系統內存不足或超過throttle時, 使用DiskFile以補充內存 , 從而完成 那些內存消耗巨大的操做。 只是version 1.6.1 還沒作好 ,或許之後版本會作好 。
先看下Flink的內存管理機制吧 。
從概念上將, Flink 將JVM的heap分紅三個區域。
"taskmanager.network.numberOfBuffers"調整。
"taskmanager.memory.fraction" 配置。
"taskmanager.memory.size"
配置一個絕對值, 好比10GB。
Network buffers pool 和 memory segment pool 都是TM啓動時就分配的, 它們生存於JVM的老年代, 因此不會被GC回收的。只有Free Heap區域在新生代裏生存。
IG/IC, 和RP/RS 的record存儲在network buffer 裏,有些RS (SpillableSubpartion)須要Spill to Disk ,所以他們須要IOManager 的幫助。
須要sort/hash/cache的Task瞬時內存消耗很是大,所以從IG接收record後就將他們存儲memory segment pool, 以後在對serialized record 應用 算法 。
其餘的Task大都是pipeline 方式, 來一個消費一個, 瞬時內存消耗不會大。此時這些Task會使用FeekHeap 區域的內存。
若是沒有人爲錯誤, 系統不會有瞬時的大量內存申請, 因此不會有OutOfMemoryException , 因此FreeHeap的區域應該是比較輕鬆的。
但是問題是, memory segment pool 目前沒有使用DiskFile 做爲OffHeapMemory, 它可以裝載上游下來的巨量 blocking 輸入嗎? 這就是Flink 1.6.1 的問題。不過這個MemoryManager實現的好很差的問題, 設計上是有防止OutOfMemoryException 的組件的。這個就期待Flink 新版本 補強這部分功能吧。
前面說「serialized record 應用 算法「,這是怎麼作到的呢? Flink 實現了大量的 TypeSerializer 和 TypeComparator, 他們懂得record 是如何序列化到字節數組裏的,因此也知道如何將record部分地deserialize 。一般算法只是使用record的某個或某些field。 部分deserialize極大的下降內存使用, 提高了數據存取的速度, 從而極大的提成了內存的使用效率。 serialized 形式的record 數據是Flink 可以將他們在TM之間, 跨進程和跨網絡的傳輸, 它是分佈式系統須要的數據形式, 同時, 它也爲MemoryManager 將它們在內存和硬盤之間交換提供了條件。
IOManager 使用FileChannel將MemorySegment同DiskFile創建映射, 從而實現將數據在MemorySegment和DiskFile之間寫入和讀回。
因此,雖然MemoryManager目前還有些問題, 在這種設計下, Flink的內存使用效率確定會進化的極好 。
RM在Flink內部是Flink cluster 裏slot資源的管理者 , TM 提供slot,JM (JobMaster)消費slot 。 RM同JM和TM 都要保持心跳,以保持slot市場的活躍 ,以及在TM或JM失敗的時候通知給對方。
總起來講RM的做用主要包括:
第2、三項功能比較好實現。第一項功能須要同外部的集羣管理器合做才能實現。所用RM是一個隨環境不一樣而不一樣的組件。在不一樣的集羣環境裏, RM有不一樣的實現類。
市場上比較流行的集羣資源管理器主要有Yarn, Mesos, Kubernetes, 和 AWS ECS 。其中Yarn, Mesos中, 能夠利用ApplicationMaster/Scheduler是資源調度器,只要將RM在ApplicationMaster中運行起來,理論上 RM就能夠Yarn, Mesos的master通訊,爲TM分配容器和啓動TM。實際上JM的全部組件(Dispatcher, RestEndPoint, JM, ClusterEntryPoint, etc)都在ApplicationMaster啓動的。
Kubernetes和ECS 沒有ApplicationMaster的概念, 但TM能夠做爲一個有多個副本的deployment (K8s) 或 service (ECS) 運行。此時 TM 的多少(規模)是由deployment/service 根據必定策略自動擴容的而不是根據Flink須要的slot數量。Flink並無實現特殊的ResourceManager和K8s/ECS集成,此時的FlinkCluster使StandaloneCluster。若是能一個K8s/ECS 特殊的RM和集羣的master通信,使TM可以能按需擴容而不是自動擴容,那就和在Yarn裏同樣完美融合, 並且可以用上Docker的服務, 畢竟Yarn目前只是使用JVM做爲容器,並無真正達到真正資源的隔離 。
RM在不一樣cluster以不一樣的方式啓動新的TM以補充slot資源,固然當一個job 結束時,它也會同集羣的master通信,釋放container,關閉徹底空閒的TM。
在不一樣的集羣環境裏,RM可以管理TM的生命週期,那麼誰來啓動和結束JM 呢? 請參考後面的 Flink Deployment 。
Flink的 HighAvialbility 主要有兩個服務組成 : LeaderEleketronService和LeaderRetrievalService 。
該Service是用來選舉Leader的 。假設系統裏啓動了兩個Dispatcher 。先回顧一下什麼是Dispatcher , 看圖-2, JobMananager 的進程是ClusterEntryPoint的main 函數啓動的,ClusterEntryPoint啓動了 WebUI, Dispatcher 和ResourceManager。當用戶提交了 Job時, Dispather 實例化一個新的JobMaster 來管理這個job, Dispatcher也是CheckPointBarier 的發起者, 同時也是來自WebUI REST 後臺的請求的處理者。Dispatcher 的做用承上啓下, 做用很是核心,不可缺失,因此Flink啓用HA服務來保護它。 Flink 系統裏, 使用HA 保護的核心服務還包括ResourceManager ,JobMaster 還有WebUI 的REST Endpoint。當系統裏啓動兩個Dispatcher 誰來當Leader呢? 這就要LeaderEleketronService (LES) 的決定了。
目前Flink的起做用的LES使用ZooKeeper 實現的。實現方式就是兩個Dispather都試圖搶佔的特定ZooKeeper Path (dispather latch path) 的LeaderLatch(參見 curator framework),誰先搶到了, 就被選舉成Leader , leader的 AKKA URI 和UUID被被寫道一個特定的ZooKeeper path 中(leader path) 。只有Leader 是工做的,由於其餘組件在使用Dispather時會向獲取Leader Dispath的URI, 而後才RPC的 。 其餘競爭者不會接受到RPC, 他們只有繼續監聽,若是當前的Leader 退出了, LeaderLatch被釋放了, 從而新的leader會被選舉出來。
那麼對於Dispatcher的使用者, 怎麼知道誰是Leader呢? 這就須要LeaderRetrievalService了 (LES ) 。 對於用ZooKeeper 實現的LES, 它只須要監聽一下leader path, 它就知道誰是leader 了 。
好比, 當你用fink run 命令提交job 時, 假如系統裏有多個Fink REST Enpoint, Flink的ClusterClient 對先使用LES獲取Leader REST Endpoint, 而後纔會將job 發送過去。
除了ZooKeeper的實現, HAService 還能夠以來外部的名字服務(如DNS, LoadBalencer , etc ) 實現。在LES和LRS 永遠返回 HA保護的服務的URI(不管是AKKA PRC URI, 或REST URI )。 該URI永遠保持不變, 若是提供服務的server失敗, 名字服務會將給URI映射到另一個工做的server 。 具體請參考 StandaloneHaServices的代碼。
目前爲止, Flink 支持大概4種部署方式或4種cluster 類型,MiniCluster(或LocalCluster), StandaloneCluster,YarnCluster,MesosCluster 。 雖然前面提到過Kubernetes , ECS , Docker , 但他們的本質是將JM和TM docker 化 , 從而是他們運行在真正的 dockers裏面 , cluster 仍是 StandaloneCluster 。
圖-3 是StandaloneCluster , 圖-4是YarnCluster , MesosCluster 與YarnCluster 相似, MiniCluster是運行在IDE(e.g. IntelliJ )的虛擬cluster , 它主要用於 FlinkApplicaiton的調試 。在Session 模式下,表面上能夠看來StandaloneCluster 與YarnCluster基本流程是一致的, 只不過是啓動JM 和TM的啓動機制不一樣而已。YarnCluster/MesosCluster 裏TM是由他們各自的ResourceManager實現根據須要啓動的。 在 StandaloneCluster下, TM是手工啓動的。其實在YarnCluster下, JM 也是Yarn啓動的。其實, 如前所述, StandaloneCluster與YarnCluster / MesosCluster的本質區別是, 前者的用於TM硬件資源是管理員手工分配的, 後置是有RM同集羣管理器協調自動分配的。
從Flink內部看, 是爲了支持這些異構環境, Flink 對於不一樣的Cluster, 實現了不一樣的ClusterEntryPoint用於啓動不一樣ResoureManager ,以及Dispatcher 。不一樣集羣類型的ClusterEntryPoint對JobMode和SessionMode有 不一樣的實現, 好比YearnSessionClusterEntryPoint和YarnJobClusterEntryPoint。Session模式下, Dispatcher使用的時StandaloneDispatcher, Job模式下,使用的是MiniDispather 。 關於什麼是JobMode和SessionMode, 請參考圖-3下面的解釋。
如前所述,JM接受jobGraph的對象或對象文件(序列化後),jobGraph由FlinkClient生成, FlinkClient和JM位於不一樣的JVM (MiniCluster除外)。若是JobCluster沒法由FlinkClient啓動 (經過某種方式),則JobGraph生成之後須要存到指定位置,在手工啓動JobCluster讀入,這樣的過程比較複雜。基於目前的架構,儘管Flink在各類集羣環境裏對job mode , session mode 在代碼上都由支持, 但job mode 一般因爲集成度不夠好,用戶沒法方便使用。
我用一個表來描述他們跟具體的不一樣以及Flink是如何支持這些異構環境。爲了使這個表不至於太龐大,先把不一樣的cluster 類型表述下先。
1. MiniCluser
用於在IDE(IntelliJ, Eclipse) 運行FlinkApplication 的小型FlinkCluster環境,JM和TM運行在同一個進程裏,主要用於在IDE裏調試Flink Application。
2. StandaloneCluster
能夠運行在單機或多機上的FlinkCluster環境, JM和TM運行在不一樣的JVM裏。只要有JRE , 不管在Windows ,Linux均可以搭建StandaloneCluster。
用戶可使用FlinkClient 的command line,或API 直接向JM REST URI 提交job ,具體看Flink CLI 的 「-m" 選項。StandaloneCluster很是有利於搭建單元測試,集成測試以及演示環境。
3. YarnCluster
在Hadoop集羣裏搭建的FlinkCluster, JM和TM都運行在Yarn管理的容器裏。JM作爲Yarn裏ApplicationMaster ,Flink cluster做爲Yarn的一個Application運行。
Yarn是Flink與之集成度最好的集羣管理器。 用戶能夠直接使用FlinkClient 的command line,或API 直接向Yarn提交job , YARN 會自動的啓動(或鏈接現有的)Flink JM, 自動啓動須要的TM,然後在該Flinkcluster運行任務。
具體看Flink CLI 的「-m yarn-cluster" 和 」applicationId" 命令行用法。
4. MesosCluster
在Mesos集羣裏搭建的FlinkCluster, JM作爲Mesos裏Scheduler , 一個 Flink cluster做爲Mesos的一個Framework運行, JM和TM均可以運行在Mesos管理的容器裏。
TM由JM的resource Manager同Mesosmaster 協調按需自動啓動和銷燬。JM 須要先使用Marathon 建立服務, 由Marathon 啓動。Marathon服務的後端都是運行在Mesos的容器裏,
並且Marathon服務一個高可用性服務。
用戶能夠直接使用FlinkClient的command line,或API 直接向Marathon 建立服務提交job ,具體看Flink CLI 的 「-m" 選項。
我的以爲Flink 與Mesos的集成度能夠在提升一些。目前用戶須要手動的爲JM建立Marathon Service, 可參考下面寫一個簡單的配置文件, 調用flink的shell腳本mesos-appmaster.sh啓動Flink
MesosSessionCluster的JM (該JM會啓動MemsosResourceManager用來啓動須要的TM)。
yhttps://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/mesos.html
Flink 徹底能夠改進一下Client端, 添加集成Mesos Marathon須要的 ClusterDescrptor類,CLI類(好比MesosJobClusterDescripor, MesosSessionClusterDescripor,FlinkMesosSessionCLI ),豐富「-m" 選項,
由ClusterDescrptor類自動建立和銷燬JM (Marathon 服務)
5. Kubernetes/ECS/Docker
Flink 對於Kubernetes/ECS/Docker在源碼級別並無任何支持。只是說,能夠將StandaloneCluster的JM和TM運行在這三個以Docker 做爲容器服務的
集羣環境裏。因此是,Flink 與他們的集成度, 比較低 。Kubernetes的具體作法分別對StandaloneCluster的JM和TM作兩個deployment(就是能夠平行擴展的docker group ), 它們分別啓動StandaloneCluster的JM和TM (經過start-console.sh腳本), 而後對JM的
deployment作一個Service使其具備高可用性,固然須要將該Service的URI傳遞給TM,前面說過StandaloneHAService是靠統一的URI來提供HA服務的。更詳細的請參考下面。
https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html
ECS和Docer的實現同Kubernetes 相似, 只是在各自使用的技術名稱不一樣而已。ECS的AutoScalingGroup等價於Kubernetes的Deployment。 common sense是同樣的, 只是各家用各家喜歡的名字。
總之,將Flink的部署在上述集羣裏, 目前手工的工做仍是比較多的,並且TM的數量是預先設定的,或按策略自動擴容的, 並非最優的由Flink RM 指導的按需擴容。獲得的好處最大應該是Docker的容器服務。畢竟Yarn或Mesos對Docker的支持並非很好。
我的以爲Flink 與他們的集成度能夠在提升一些。同提升Mesos的集成度的想法同樣, 在目前Fink的框架下,只須要改進FlinkClient,添加集成Kubernetes API-Server須要的 ClusterDescrptor類,CLI類(好比K8sJobClusterDescripor, K8sSessionClusterDescripor,
FlinkK8sSessionCLI ),豐富「-m" 選項,由ClusterDescrptor類自動建立和銷燬JM (JM 的Deployment和 Serice ), 添加K8sResourceManager, 用它來建立及擴容TM的deployment 。
Cluster Type | JM 啓動方式 | TM啓動方式 | ClusterEntryPoint | ResourceManager | SuportedMode |
Mini | 調試環境啓動的Flink Client是 LocalEnvironen或 LocalStreamEvronment, 它們的execute方法會 先啓動MiniClusterEntryPoint |
由MiniClusterEntryPoint啓動 | MiniClusterEntryPoint .java |
StandaloneResource Manager.java |
只支持JobMode |
Standalone | 手工,如sart-cluster.sh腳本 | 手工, |
StandaloneSession ClusterEntryPoint.java
StandaloneJobCluster EntryPoint.java |
StandaloneResource Manager.java |
支持SessionMode job mode 有支持,但用戶沒法方便使用。 |
Yarn | jobmode下 由FlinkClient裏的 YarnClusterDescriptor, 調用Yarn的API經過YARN啓動。 JM作爲ApplicationMaster自動啓動。
SessionMode JM須要手工運行 yarn-session.sh, JM和TM的內存不是per-job, 只有管理員才能設定。 |
JM 裏的YarnResourceManager 利用YARN的API啓動TM。 |
YarnSessionCluster EntryPoint.java
YarnJobCluster EntryPoint.java
|
YarnResource Manager.java |
支持JobMode和SessionMode
|
Mesos |
經過Marathon Service啓動, JM運行在Mesos的容器裏。 但Marathon Service須要手 工建立。 |
JM 裏的MesosResource Manager 利用Mesos的API啓動TM。 |
MesosSessionCluster EntryPoint.java
MesosJobCluster EntryPoint.java |
MesosResource Manager.java |
支持SessionMode job mode有支持,但不方便使用。須要 提高集成度。 |
Kubernetes /ECS/Docker |
由JM service 啓動 | 由TM deployment啓動 | StandaloneSession ClusterEntryPoint.java
StandaloneJobCluster EntryPoint.java |
StandaloneResource Manager.java |
支持SessionMode job mode有支持,但不方便使用。 須要提高集成度。 |
表-1 Flink Deployment
如前所述FinkCluster裏 (參考圖-3), 最核心的組件並且交互最頻繁的組件是Dispatcher, ResourceManager, JobMaster, 和TaskManager 。
其中對於Dispatcher, ResourceManager, JobMaster,一般是ClusterEntryPoint啓動Dispatcher和ResourceManager,而後當有job提交時,Dispatcher啓動JobMaster。(MiniCluster, JobCluster略有不一樣,但相似)。儘管他們之間使用RPC通信,他們生存在同一個JVM裏,若是其中一個失敗,同時使整個JVM(因爲異常崩潰了)失敗了,全部組件都失敗了。那麼他們中若是有承擔Leader責任的,也都會經過HA Service 切換到另外一個工做的JM後端的RPCEndPoint。
若是TM失敗了, JM可以感知到heatbeat 機制感知到, JM 和RM都與主動向TM發送和hearbeat , 因此TM heartbeat 一旦timeout , JM會釋放該TM 提供 過來的slot,與此同時將slot上運行的Execution 的狀態設置爲失敗, 參考 SingleLogicalSlot::signalPayloadRelease .
在stream mode 下,JM 會嘗試將該TM上失敗的Task重現分配到別的TM上(若是slot資源時有的,或能夠分配的)。
在Batch mode 下, JM 會將整個job 失敗,然後嘗試從新啓動整個job 。
從新啓動的streaming Task會根據上一次的checkpoint 回復狀態, 繼續運行。
to be filled .
上一章介紹Flink了的架構, 它包括了那些主要組件? 組件是怎麼工做的? 組件之間是怎麼工做的?組件使用的資源(包括服務器,容器,內存和CPU和Disk)是若是分配的? 以及組件是如何部署的?一個job是怎樣分割成小的Task並行執行在cluster裏面的?
怎麼保證組件HignAvailablity 以及組件是如何作 失敗處理的?以及Flink關於系統安全的設計等等。 這些關於架構的介紹,重點在於瞭解flink cluster中各個組件的互操做行以及容錯處理, 瞭解框架, 以便於再出現問題的時候我們能夠對它有比較針對性的debug 。關於Flink引以爲傲的statefull operator, window watermak, checkpoint barrier , stream/batch API 以及web monitor本文都沒有介紹,請參考相關的Flink的官方文檔。
Flink的源碼還是比較清晰易懂的, 尤爲是瞭解了她的架構後, 大部分的實現都很是符合common sense 。 不想在這裏貼一堆代碼段然後加註釋解讀了, 本文的篇幅已經太長了。過一下全部的包,解釋一下他們的主要做用,以及須要框架裏使用的主要類吧。用一個大表來列舉比較合適。
Artifect | 功能介紹 |
Flink-client | FlinkCommandLine 入口類 (CliFrontEnd) 解析Flink 命令行 (DefaultCLI, YarnSessionCLI ) 負責將從用戶jar 文件中main函數生成Plan (PackagedProgramUtils),用通過LcoalExecutor或RemoteExecutor調用flink-Optimizer生成 jobGraph . 使用ClusterDescriptor啓動jobManager (好比在yarn), 使用並將jobGraph 用ClusterClient提交給本地或遠程的JM。 |
Flink-runtime | Flink 源碼的核心。 框架的核心組件都在裏面, 包括 Diskpatcher, JobMaster, TaskExecutor, ResourceManager, ClusterEntryPoint, WebUI以及他們依賴的子組件以及核心數據模型, JobGroup, ExecutionGraph, Execution, Task, Invokable, Operator, Driver, Function, NetworkBufferPool, ChannelManager, IOMemory, MemoryManager, PRCService, HAService, HeartbeatService , CheckPointCoordinator , BackPressure, etc . 這些類的名字和做用在上一章都或多或少提到過, 最好結合架構的介紹, 理解他們的代碼。 |
Flink-runtime-web | Flink Web monitor 的界面和handler . handler會調用Dispacher 的方法處理客戶請求, 好比sumitJob. |
Flink-java Fink-scala |
用Java 和 scala實現的Flink Batch API , Dataset, ExecutionEnrionment, etc . |
Flink-stream-java Flink-stream-scala |
用Java 和 scala實現的Flink stream API , DataStream, ExecutionEnrionment, , StreamExecutionEnvironment, windowing, etc . 和對streaming提供支持的runtime, Checkpoint, StreamTask, StreamPartitioner , BarrierTracker, WindowOperator, etc . |
Flink-optimizer |
主要功能是優化Plan 和生成JobGraph。Flink-optimizer是一個很client端使用重要的庫,它決定了從客戶代碼(application) 到jobGraph造成過過程。 我個人覺得一般也是調查和解決 application問題的根源, 好比application 的寫法是否是合適的,有問題的, 或最優的等等。 在前面架構極少裏面,並沒有談及Flink-optimizer, 因此在這簡單介紹下。 fink application 開發者使用Flink API 編寫application, flink-client 爲了將application 最終能在Flink cluster裏運行起來是, 它首先通過讀取用戶jar 文件中的main函數生成Plan:以DataSink為根的一個或多個樹結構, 樹的節點都是application使用的 API operator, 每一個節點的輸入來自與樹的下一層節點。 不難想象樹的葉子節點都是DataSoruce: 他們沒有輸入節點,但有用於讀取數據源的inputFormat, 根節點是DataSink :他們既有輸入節點,也有用於寫入目標系統的outputFormat, 中間節點都會有一個多個的輸入節點。Plan數據結構描述了同用戶的application徹底相同的數據流的節點, 但它只是一個邏輯樹狀結構, 並無對鏈接節點的邊作描述, 也沒有對application編寫的數據流作任何修改。o 而後使用Flink-optimizer將Plan根據時根據優化策略設置節點的邊上的數據傳輸方式,並同時用OptimizerNode生成多種優化方案, 最後選擇cost 最小的方案產生的方案 做爲OptimizedPlan 。 好比將數據裝載方式(ShipStrategyType) 設置 FORWARD, 而不是 PARTITION_HASH而 應爲 FORWARD 的網絡cost 最低(0) 。OptimizedPlan是一個圖結構, 圖中的頂點(PlanNode) 記錄了自身的cost以及從source 開始到它的累計的cost 。OptimizedPlan主要針對與join和interation操做。 而後Flink-optimizer將OptimizedPlan 編譯成JobGraph。編譯的過程應該基本上是一對一的翻譯(從PlanNode 到 JobVertex), 但若是一串PlanNode 知足Chaining 條件 (好比數據在每一個oparator 都不需從新分區, 流過operator 以後, 直接forward到下一個operator), Flink-optimizer就像這些 operator 鏈接到一塊而後在JobGraph裏面只建立一個 ChainedOperator jobVertex, ChainedOperator同Spark裏面的stage 概念相似, 是優化的一部分。 最後flink-client將jobGraph提交給FlinkCluster ,jobGraph 變形爲 ExceutionGraph在JM和TM上執行。 能夠從Optimizer 的compile 和JogGraphGenerator的 compileJobGraph展開看, 他們分別complie的是Plan和OptimizedPlan 。 Flink-optimizer優化的是parition 的選擇以及算法的選擇, 而不是DAG 的workflow 。
|
Flink-Table
|
用戶能夠用flink-java/flink-stream-java裏的api編寫flink application, 也能夠用 Flink-Table 的table api和 flink-sql寫 application 。 Flink-Table將數據源(Dataset, DataStream)都 generalize 成Table (row based ),用戶能夠用相似關係型數據庫的操做方式操做Flink的數據源, 這種方式雖然屏蔽了一些flink-api的特性 (好比 broadcast), 但極大的下降了application 的開發難度,減小了客戶程序的代碼量,從而極大的提升系統的重用度。 Flink-table 的底層仍是依賴dataset/datastream api, 因此基於table api 或SQL 的程序 (Program) 最終會翻譯成 Flink-optimizer 的Plan , 通過前面所述一樣的編譯優化過程, 最終一個EG的形式運行在JM和TM上。 固然,在被翻譯成Plan以前, Flink-table 的Program 也會有自身的優化過程, 好比SQL Plan optimization . 代碼須要看, 如何實現一個Table : StreamTableSource, BatchTableSource, BatchTableSink,AppendStreamTableSink 如何擴充FlinkSQL : UserDefinedFunction, ScalarFunction,TableFunction,AggregateFunction |
Flink-yarn Flink-mesos Flink-container |
Flink 怎麼支持異構環境的, 包括不一樣環境裏的, 主要是異構環境裏的不一樣ResourceManager (啓動TM ),以及 ClusterDescriptor (啓動JM)如何實現的。 |
Flink-library |
flink-cep, flink-gelly, flink-ml |
Flink-connector Flink-format |
鏈接外圍數據系統(數據源和輸出)系統的InputFormat, OutputFormat . |
Flink-filesystem |
Flink支持的分佈式文件系統, hadoop, s3, mapr |
Flink-metrics |
flink 的metrics 系統 |
Flink-statebackends Flink-queryable-satate |
flink 的 state的存儲 |
flink-jepsen |
Flink的UT和集成測試。 |
表-2 Flink packages
須要安裝JDK1.8和Scala的plugin, 不然Flink沒法在IntelliJ裏編譯。
下載flink1.6.1的代碼,到下面的地址 download ,而後用maven 編譯 。
https://github.com/apache/flink/tree/release-1.6.1
下載例子程序: https://github.com/kaixin1976/flink-arch-debug 。 因爲github的限制, 上傳的instrument文件相對較小,讀者能夠經過自我複製擴大文件尺寸。
運行例子程序, 有兩個參數 :
第一個是 joinType,可取值爲"normal" 或"broadcast"。normal 方式徹底依賴FlinkOptimizer產生 優化的方案, broadcast是運用一下方法, 使join的第一路輸入廣播, 第二路輸入自由平均分配。
第二個使api類型, 可取值爲 "api" 或 "sql" 。api方式指的是Flink Java API, sql 使 Flink SQL API 。對於Join Flink Java API 能夠指定JoinHint或Parateters從而影響優化, sql 沒有這些接口, 只能在DataSoource 上作文章。
具體看JoinTest::main()。
如前所述, Flink分佈式環境的主要包含三部分:
1. FlinkCient : 生成、優化和提交JobGraph.
FlinkClient的main 函數在CliFrontend中, VM 的classpath 和工做路徑設置爲本地安裝的一個flink路徑 , 用於找到全部須要的jar文件和 flink confugration .
程序參數跟flink run 同樣,
圖-13 Flink Client debug configuration
2. JM : 生成ExecutionGraph,併爲其中的全部的的子任務分配slot資源, 並將子任務發送到slot所在TM上運行。
JM 使用StandaloneSessionClusterEntrypoint , 他會啓動Standalone Session Cluster 的entry point .
圖-14 Flink JobManager debug configuration
3. TM: 運行ExecutionGraph的子任務。
圖-14 Flink TaskManager debug configuration
例子程序是一個利用Flink SQL 把一大一小兩個數據源join再一塊兒的Flink application 。大數據源叫instruments 包含了一些假造金融工具(好比股票期權等)的基礎數據(其中包括該工具交易貨幣的ID),小數據源是貨幣currencies 包含貨幣ID,和貨幣名稱。join目的是給與金融工具的貨幣ID得到對應的貨幣名稱(好比,人民幣,歐元,美圓等)。
join的過程是比較簡單的,首先 從文件建立兩個數據源(instrument 和 currency 的 TableSource), 而後用調用 SQL 作join, 最後建立TableSink將join的結果輸出到文件裏。程序的主體以下。
public void joinSmallWithBig(String joinType) throws Exception { "StrikePriceMultiplier,LotSize,CurrencyCode,AssetState " + "FROM currencyTable join instruments on currencyTable.CurrencyId=instruments.CurrencyID "); |
表-3 例子程序
問題是join的過程發生了嚴重的數據傾斜 (data skew )。全世界的金融工具分佈式極不平均的, 絕大部分使用美圓,歐元計價 (好比例子程序裏假造的instrument的數據源,大概有13/14 的instrument是以歐元計價的)。若是使用經常使用的 hash join 或 sort-merage join, 數據傾斜是必然發生的。 調用joinSmallWithBig,並傳入任意字符串 作爲joinType 時, 程序調用的registerCurrencyTableSource方法, 該方法就是建立了一個普通的CsvTableSourcei並註冊, 以後利用sql join 將兩個數據join起來,此時會發生數據傾斜。以下圖所示:
圖-15 flink-webmonitor 上的currency 數據源
圖-16 flink-webmonitor 上的instrument數據源
能夠看出currency 的CsvTableSource一共有492條記錄分4個split讀入,數據量確實小,instrument數據源記錄數大概14million 條記錄有,比較大,一樣分4個split讀入, 分配不可謂不均衡 。但當join後, 均衡完全打破了,以下圖所示:
圖-17 flink-webmonitor 上的hash-join
兩個數據源的輸出策略(或JoinOperator的輸入策略)也就是鏈接數據源與JoinOperator的那條邊的ShipStrategyType被設置爲一個currencyId 爲key的HashPartition。 因爲本例中約13/14的instrument以歐元計價,大概有13 million個instrument 記錄依照hash code 被髮送到一個task的IG裏, 剩下約1 million的其餘三個task 。
借這張圖我們須要重溫一下的前面提到的task, RP,RS,IG,IC的關係, instrument數據源 有4 個task因此4個RP, 每一個RP會產生4個RS以裝載不一樣的hash code,共16個RS 。 下游join operator 有兩個IG (分別鏈接instrument數據源 和currency), 每個IG 的ShipStrategy都是HashParition。每個IG 有4個 IC, 每一個IC 都鏈接上游標號相同的RS (好比JoinOperator task1的 IG1 的IC1,IC2,IC3,IC4)只鏈接上游 instrumentSource task1, 2,3,4 的 RS1) 。 JoinOperator是一個DualInputOperator, 一般的operator只有一個IG 。
上圖中13 million的record received (13,836,997)是joinOperator的第三個 task 的IG1 和 IG2中相同標號IC的記錄數量(好比IC2),因爲currency數據量很小,構成13 million 這個數量級的來源是instrument中以某幾個貨幣計價的instrument, 如前述主要是歐元 , 此例中歐元計價佔比大概爲13/14 。
數據傾斜的結果形成joinOperator的第三個task須要多於別的 task的100倍的內存,一般會超過容器預先分配的內存配額。罪魁禍首是 join operator 輸入邊 的 ShipStrategy : hash partition 。 SQL 裏沒法指定 join 使用哪一種策略, 爲何flink 會在翻譯(將sql 翻譯成jobGraph)的過程當中將 join 輸入冊率 優化成hash partition 呢?一個小數據集和大數據集join,最好的方式是將小數據集廣播給下游的全部task, 大數據平均分配,而後再join operator內部作hash join, 這樣的cost是最低的 。Flink爲何不能作到這樣的優化呢?若是使用Flink API (而不是 SQL) 狀況會好嗎?
初步的感受是Flink-optimizer 並無作到這樣的優化: 當join的兩個數據源大小懸殊時, 小數據集廣播,大數據集均分的輸入給join operator 。Flink這麼經常使用的優化都沒作到嗎?
讓咱們經過debug Flink-optimizer 代碼, 看看他時怎麼將本例的join input的ShipStrategy優化成HashPartition 吧。
下載例子代碼,而後編譯,package 成flink-arch-debug-0.0.1.jar, 而後下載flink-1.6.1 的代碼,編譯, 按照圖-13設置flink-client的運行選項。
program arguments 設置爲 run flink-arch-debug-0.0.1.jar normal sql, 使用 "normal ","sql" 方式, 以下:
C:\projects\flink-arch-debug\target\flink-arch-debug-0.0.1.jar normal sql
Flink Optimizer 的優化過程(參考Optimizer::compile方法)大概是:
首先將利用GraphCreatingVisitor 將Flink API (API, Table, 或 SQL) 產生的Plan翻譯成dag 包裏的OptimizerNode和DagConnection組成圖,
而後從圖的SinkNode(一個或多個)先前遞歸的調用getAlternativePlans方法從產生最優的Plan , 最優的Plan 是由plan包裏的PlanNode和Channel組成的Graph 。ShipStratigy是channel的一個屬性, 它描述數據從一個operator以那種方式發佈到RP中的RS中的。
最後由JobGraphGenerator將最優圖翻譯成JobGraph
那麼有多少種ShipStrategy呢? 連續按兩下SHIFT, 再鍵入類名(ShipStategyType.java)。參考一下本文最後面的IntelliJ的經常使用鍵吧.
public enum ShipStrategyType { FORWARD(false, false), //數據按原分區號傳遞到本地運行的Task相同分區,不跨網絡,不須要比較器(用於排序或Hash) PARTITION_RANDOM(true, false),//數據隨機傳遞到本地運行的Task任意分區,不跨網絡,不須要比較器 PARTITION_HASH(true, true),//數據按照指定鍵值的Hash決定分區,到目標Task須要跨網絡傳遞,須要比較器 PARTITION_RANGE(true, true),//數據按照指定鍵值的排序順序決定分區,到目標Task須要跨網絡傳遞,須要比較器 BROADCAST(true, false) // 將數據複製到任意一個分區中,不須要比較器 PARTITION_FORCED_REBALANCE(true, false), // 將數據平均到各個分區中,不須要比較器 PARTITION_CUSTOM(true, true); // 將數據按用戶指定的分區器分配分區,不須要比較器 ... |
表-4 ShipStrategyType
搜索一下PARTITION_HASH, 看誰將它設置給最優圖的Channel 對象, 過濾一下,應該由兩個地方:
一個是TwoInputNode::setInput 方法中, 它根據join operator 的JoinHint 或 Parameters 來設置 input Channel的ShipStrategy 。 好比, 若是想讓join的第一個input(Currencies)用廣播方式傳遞, 就可以使用」BROADCAST_HASH_FIRST「 hint, 或者將這樣的參數"INPUT_LEFT_SHIP_STRATEGY"="SHIP_BROADCAST"傳遞給join operator 。不過這些都只能在Flink Java API中 使用, (具體參考flink-arch-debug中的ApiJointTest類),沒法在SQL API 中使用。
第二個在TwoInputNode::getAlternativePlans中。如前文所述, getAlternativePlans首先會 OptimizerNode (本例中主要是JoinNode) 節點用於產生多個plan , 每一個 plan的輸入輸出的節點都是相同, 不一樣的是邊上ShipStrategy 。對於每一個Plan , getAlternativePlans都會調用RequestedGlobalProperties::parameterizeChannel從而設置這個Plan的InputChannel的ShipStrategy 。
在SQL application 中, 用戶沒法像在使用Java API 時那樣經過設置JoinHint或Parameter 來選擇想要的ShipStategy , 實際上若是指定了這兩項中的任意一個,也等同於跳過了getAlternativePlans (優化過程)。 由於既然用戶已選擇了想要的ShipStategy,就沒有必要用Optimizer 根據計算每一個方案的cost 來選擇最優方案了。 但是即便是由Optimizer 來選擇, 對於本例(小數據集join大數據集), 根據表-4 ShipStrategyType的 定義,PARTITION_HASH的cost 確定不是最低的 。對於大數據集 (instruments), 若是採用PARTITION_HASH將數據傳遞給下游, 網絡cost 必然是很高的, 由於PARTITION_HASH是一個shuffule 的過程, 下游 必然有位於不一樣TM遠程Task , 所以網絡傳遞的cost 必然很高。 看看Flink 怎麼定義Cost 和怎麼估計cost 的?
public class Costs implements Comparable<Costs>, Cloneable { private double cpuCost; // 本節點算法在計算中使用CPU 的costs, 好比Hash, sort , merage, etc
|
表-5 Cost定義
根據表-5 Flink cost的定義, 一個節點(好比JoinNode)的cost ,包括上游數據由上游傳遞過來的network cost, 數據本地緩存的 disk cost, 還有算法的cpu cost 。 圖-17第二條邊(Instrments)使用的HASH_PARTITION策略確定比FORWARD cost高不少, 由於若是使用FORWARD 的network cost 爲0 (參考表-4的定義),diskCost和cpuCost 跟算法有關(好比使用HashTable或sort merage 去作join)。 第一條邊(Currencies)使用的HASH_PARTITION相比BROADCAST, cost 確定低一些, 可是Currencies數據源 數量特別小, 它的cost 和 這兩個cost 之間的差異 在Instrments引發的cost面前均可以忽略不記。因此感受上 HH(兩邊都是HASH_PARTITION)和BF(currency 邊Broadcuast, instrument邊Forward)相比, BF方案的cost必定低不少。 但爲何getAlternativePlans沒有選擇 BF呢?JoinNode沒有給出BF的選擇權嗎?看看JoinNode::getDataProperties()的代碼:
private List<OperatorDescriptorDual> getDataProperties(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint, switch (joinHint) { } |
表-6 JoinNode的輸入節點要求的屬性
從表-6 JoinNode的輸入節點要求的屬性可知, 但由Optimizer 來決定(OPTIMIZER_CHOOSES)輸入節點的ShipStrategy(輸出分區策略)時, JoinNode對上游的要求時很是寬泛的,幾乎就是什麼都行 。
SortMergeInnerJoinDescriptor(this.keys1, this.keys2),HashJoinBuildFirstProperties(this.keys1, this.keys2),HashJoinBuildSecondProperties(this.keys1, this.key),
實際上就是他們爲JoinNode和它的input定義了三類需求 :
第一, Join算法但是時Sort-merge, 或者用一路輸入作HashTable的HashJoin ,或者用二路輸入作HashTable的HashJoin
第二, 第一路第二路輸入的partition 策略: 也就是所謂的RequestGlobalPropertities, 能夠是RANDOM_PARTITIONED (對應FORWARD shipStategyType),或者HASH_PARTITIONED(PARTITION_HASH),或者FULL_REPLICATION(對應BROADCAST),
或者ANY_PARTITIONING(它是一個通配策略,意思是RANDOM or HASH均可以)
第三,輸入的排序策略 :就是所謂的RequestGlobalPropertities, 排序或者不排序。
想想這三類需求的可選項組合起來,一共會有多少組合? 弄個表, 描述一下可能更清楚些。
序號 | Input1 的 partition 備選 策略 |
Input2 的 partition 備選 策略 |
算法 | Input1/Input2 的 排序備選策略 | 兼容性 | network Cost |
1 |
FORWARD |
HASH |
SortMergeInnerJoin |
sort/sort | yes | |
2 |
FORWARD |
HASH |
HashJoinBuildFirst |
sort/sort | yes | |
3 |
FORWARD |
HASH |
HashJoinBuildFirst |
not sort/ not sort | yes | |
4 |
FORWARD |
HASH |
HashJoinBuildFirst |
sort/not sort | yes | |
5 |
FORWARD |
HASH |
HashJoinBuildFirst |
not sort /sort | yes | |
6 |
FORWARD |
HASH |
HashJoinBuildSecond |
sort/sort | yes | |
7 |
FORWARD |
HASH |
HashJoinBuildSecond |
not sort/ not sort | yes | |
8 |
FORWARD |
HASH |
HashJoinBuildSecond |
sort/not sort | yes | |
9 | FORWARD |
HASH |
HashJoinBuildSecond |
not sort /sort | yes |
表-6 GloabalProperties, LocalProperties 和算法的組合
從表-6 可知, 對於input1和input2的partition策略(表-6實際上用的是shipStategy, 他們是1-1對應的)一個組合FH(Forward, Hash ), 一共有9個能夠接受的分區-算法-排序的方案,每一種方案都有本身的cost, getAlternativePlans會最後選擇cost最低的方案。 除了FH組合, 合法組合還有
HB:Hash-Broadcast
HH:Hash-Hash
BB:Broadcast-Broadcast
BF:Broadcast-Forward
FB:Forward-Broadcast
固然還有HF,FH ,FF,這些都是不合法的,對於join操做會有數據丟失的組合。 6個合法的input1和input2的partition策略組合,根據表-6能夠計算, 一共能有6*9=54可互相兼容的分區-算法-排序的組合方案 。實際上若是在TwoInputNode.java:516行上設計斷點 (getAlternativePlans方法內)並運行例子程序,
你會發現變量 outputPlans裏有90個備選方案,那麼其中36個必定是重複和多徐的。
那麼, BF方案是既然在備選方案裏,並且它的cost理論上是最低的, 並且getAlternativePlans的算法是選擇cost最低方案, 爲何cost並不低的HH方案最終被選擇了 ? 答案只能有一個 : cost estimation有問題。
不貼Cost, CostEstimiator, DefualtCostEstimator (costs包的三個類)的代碼了。總起來所, CostEstimator 是根據OptimizerNode 的 estimatedOutputSize 成員變量(包括本節點和數據節點的)來計算 network和disk cost 的 , estimatedOutputSize是由節點成員函數computeOperatorSpecificDefaultEstimates()設定, 能夠參考DataSourceNode中該函數的實現:estimatedOutputSize 被設置成來自於InputFormat的數據源實際物理尺寸。 但是FlatMapNode, MapNode 只是簡簡單單的設置了estimatedNumRecords (使之同 source的 同樣),而並無計算estimatedOutputSize 。
重看一下圖-4的ExcutionGraph可知, Join的第一個input是FlatMap , 若是Flatmap沒有estimatedOutputSize 會致使它的下一個節點JoionNode沒法計算cost. 請參考DefualtCostEstimator::addHybridHashCosts()的實現, 當上遊的estimatedOutputSize爲負數(Unknown)時,本節點的cost爲被設置爲Unknown 。Unknown cost在選擇cheapest cost的plan時是不能作比較的, 只能依靠假設的Cost (如heuristicNetworkCost ), 這個東西很不靠譜, 同時也是HH方案被選上的緣由。
解決方案應該修改Flink代碼在FlatmapNode::computeOperatorSpecificDefaultEstimates 加上對estimatedOutputSize的估計(好比保持和上游相同, 或加一個discount), 或者修改DefualtCostEstimator 使之 在計算Cost時能經過estimatedNumRecords估計estimatedOutputSize (下策, 沒有 前者好)。 總之,Flink Optimizter 在 Cost Estimation 作的不夠好, 改善是應該的, 我剛剛看了Flink 1.9 (最新版)的 Optimizer , 這一塊仍是沒有修改。忽然還有加入Flink community 的衝動。
若是不想修改 Flink 源碼, 或改了也沒法發佈, 那就看看有沒有替換方案了。
... |
表-7 FullyReplicated DataSource 。
這段代碼表示若是上游節點是一個FullyReplicated DataSource , 那麼就不須要備選方案的選擇過程, 直接將這個輸入邊的shipStagy 設置爲Forward 。 FullyReplicated DataSource意思是, 這個Data Source 的每一個Parition輸出的都是數據源的全集 而不是不部分。那麼只須要將Currencies作成這種數據源,問題不就解決了嗎?!
雖然ShipStategy是Forward, 但實際下游(Join Node)的每個Task都獲得了Currencies的這個數據集的全集, 這個同Broadcast的效果是同樣的。並且第一路爲F的組合有FF, FB , FH , 選擇的範圍小了。 這不對阿,前面不是說FF, FH, HF是不合法的嗎? 要注意此時的F是因爲FullyReplicated DataSource而既決定的F,不是普通的Forward , 是合法的。FF, FB , FH產生了27種可選方案(參考表-6), 根據heuristicNetworkCost , FF是cost最低的。 那確定是最低的阿, Forward的數據都是本地傳輸的, network cost 是 0 。
下面就是如何將Currencies作成FullyReplicated DataSource了, 根據DataSourceNode.java:92, DataSource 的InputFormat只要是ReplicatingInputFormat, 則就是FullyReplicated 。在看看ReplicatingInputFormat的代碼, 只須要將原有的InputFormat (好比CsvInputFormat)外面包一層ReplicatingInputFormat就能夠了, 具體的看例子的代碼吧 : SqlJoinTest.java:95。運行結果以下。
圖-18 flink-webmonitor 上FullyReplicated Currencies, 每個partition都輸出數據集的所有, 492條記錄。
圖-19 flink-webmonitor 上instrument , 跟以前沒什麼區別。
圖-20 flink-webmonitor 上的join , 數據在各個並行Task上很是均衡。
雖然這個例子只用到了Flink-client上的知識, 並無與JM和TM聯合調試, 可是問題的發現,分析和解決的方法是同樣的。 都是須要根據對架構的認識,縮小懷疑的範圍,然會反覆閱讀和調式相關代碼,找到問題,以及找到解決方案, 試想一下,若是沒有架構的知識,你怎麼可以懷疑這必定是Flink-optimizer的問題呢? 並且有時解決問題的方法並不必定是直接修改的出問題的代碼(固然這樣是最好的方案),根據相關代碼找到一個合適的替代方案也是解決問題的方法。
沒有想到描述一個這樣的小問題,這一章運用這麼長的篇幅, 對於複雜的問題,簡直能寫本書了。但願拋磚引玉,可以運用這樣的方法學來解決使用Flink遇到的問題: 架構->縮小範圍->閱讀和調式代碼->發現和分析問題->找到解決方案。
CTRL+ALT+Enter to complete line
SHIFT+SHIT search class in source code
CTRL+SHIFT+F serarch string in soruce code
Alt + F1 show current java in project view
CTRL+ALT+B navigate to implementation classes
CTRL+U navigate to super method
ALT + F7 Show reference
ALT + 4 Show run window
ALT + F12 Show terminal window
ALT+Enter Create A testClass
ALT+Insert TestMethod
CTRL+home move to file start
CTRL+end move to file end
參考
Flink conference Page: https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals
很不錯的Blog: http://www.javashuo.com/article/p-khdfegoi-mh.html
Flink1.6 documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.6/
Flink Github: https://github.com/apache/flink