Flink架構,源碼及debug

        工做中用Flink作批量和流式處理有段時間了,感受只看Flink文檔是對Flink ProgramRuntime的細節描述不是不少, 程序員仍是看代碼最簡單和有效。因此想寫點東西,記錄一下,若是能對別人有所幫助,善莫大焉。html

        說一下個人工做,在一個項目裏咱們在Flink-SQL基礎上構建了一個SQL Engine, 使懂SQL非技術人員可以使用SQL代替程序員直接實現Application, 而後在此基礎上在加上一些拖拽的界面,使不懂SQL非技術人員 利用拖拽實現批量或流式數據處理的Application 。 公司的數據源多樣且龐大,發佈渠道也很豐富, 咱們在SQL Engine 裏實現了各類各樣的Table Source (數據源) , Table Sink (數據發佈)和 UDF (計算器), 公司裏有不少十分懂業務專業分析員,若是他們真的能夠簡簡單單,託託拽拽的操做大數據,創建計算模型,而後快速上線和發佈,這樣的產品應該前景廣闊。java

        但是後臺並不是提及來這麼簡單,SQL使用不善,難以達到業務想要的效果,數據量一上來各類問題會出現,後端須要大量的優化工做, 好比 數據傾斜, 是最常發生的事情。SQL基本上是一個Join Language。用戶常常會將一個大數據源和一個小數據源作Inner Join, 若是大數據源的數據項很大部分都使用極少數的幾個join key, 就很容易出現數據傾斜。現實傾斜的或不均衡的,好比國際資本>80%以用美圓計價,世界人口50%屬於某兩個國家, 財富主要有20%的人擁有, 等等 。 Flink 若是把SQL join 執行成Hash Join, 最後的結果是不管你實現分配了多少個TaskSlots, 若是80%的數據都跑到某一個TaskSlot裏,緩慢運行直至將個這Slot的資源耗盡,整個job失敗。這種狀況最好是將小數據集廣播給全部的下游通道, 大數據集按原始的分片並行,這樣的join因分配均衡而快速。然而標準SQL裏沒有辦法指定joinhint , Flink sql也不支持這個,只能經過debug flink 來看看哪裏能作一些改變解決這個問題。咱們在最後一章,從Flink client , flink optimizer, flink run-time (job manager, task manager) 一步一步的 在源碼裏設置斷點, debug, 將數據流過一遍,看看有哪些方案能夠將這個小數據集合廣播起來。git

       爲了使本文讀起來流暢一些, 我先經過幾個章節大概介紹一下Flink 。本文關心架構, 因此不會涉及不少關於API的東西(好比Flink streaming 的windowing, watermark, Dataset, DataStream, 及SQL的API等, 網上應該有不少關於這些的文章)。只是想大概梳理一個Flink的架構,使架構對應到源碼結構裏, 瞭解一下Flink的 Graph metadata, 高可靠性的設計,不一樣cluster環境裏 depoloyment的實現等, 最後利用IntelliJ IDEA 通過一個小例子帶你們debug一下Flink 。若是對flink的架構有較好的理解(好比主要類及metadata),就比較容易在準確的地方設置斷點,debug Flink代碼將更有效率,從而解決問題就會更有效率, 這就是本文的目的。大概瞭解一下框架,但並不會面面俱到。當若是你須要深刻了解一下Flink某方面的細節, 本文可以告訴你入口在哪裏,或者經過對架構瞭解過程當中獲得的common sense , 再加上一點想象力, 你或許直接可以獲得解決問題的方案, 而後再經過閱讀源碼及調試來加以驗證。 程序員

1.  Flink的架構簡介

1.1  Flink 分佈式運行環境(官方圖)

(圖-1 Flink Runtime 來自:https://ci.apache.org/projects/flink/flink-docs-release-1.6/concepts/runtime.htmlgithub

關於架構,先上一個官方的圖,儘管對於Flink架構,上圖不是很準確(好比client與JobManager的通信已經改成REST 方式, 而非AKKA的actor system),咱們仍是能夠知道一些要點:web

  • FlinkCluster: Flink的分佈式運行環境是由一個(起做用的)JobManager 和多個TaskManager組成。每個JobManager(JM)或TaskManager(TM)都運行在一個獨立的JVM裏,他們之間經過AKKA的Actor system (創建的RPC service)通信。全部的TaskManager 都有JobManager管理,Flink distributed runtime其實是一個沒有硬件資源管理 的軟件集羣 ( FlinkCluster ), JM是這個FlinkCluster 的master, TM是worker。  因此將Flink運行在真正的cluster 環境裏(可以動態分配硬件資源的cluster,好比Yarn, Mesos, kubernetes), 只須要將JM 和 TM運行在這些集羣資源管理器分配的容器裏,配置網絡環境和集羣服務使AKKA能工做起來, Flink cluster 看起來就能夠工做了。具體的關於怎麼將Flink 部署到不一樣的環境, 以後有介紹, 雖然沒有上面說的這麼簡單,還有一些額外的工做, 不過大概就是這樣:由於Flink runtime 自身已經經過AKKA的 sharding cluster創建了FlinkCluster, 部署到外圍的集羣管理只是爲了獲取硬件資源服務。 Flink不是搭建在零基礎上的框架,任何功能都要本身從新孵化,實際上它使用了大量的優秀的開源框架, 好比用AKKA實現軟件集羣及遠程方法調用服務(RPC), 用ZooKeeper提升JM的高可用性, 用HDFS,S3, RocksDB 永久存儲數據, 用 Yarn/Mesos/Kubernetes作容器資管理, 用Netty 作高速數據流傳輸等等。
  • JobManager: JobManager 做爲FlinkCluster的manager,它是由一些Services 組成的,有的service 接受從flink client端 提交的Dataflow Graph(JobGraph),並將JobGraph schedule 到TaskManager裏運行,有的Service 協調作每一個operator 的checkpoint以備job graph運行失敗後及時恢復到失敗前的現場從而繼續運行 , 有的Service負責資源管理, 有的service 負責高可用性,後面在詳細介紹。值得一提的是,集羣裏有且只有一個工做的JM, 它會對每個job實例化一個Job
  • TaskManager : TaskManager是slot  的提供者和sub task的執行者。一般Flink Cluster裏會有多個TM, 每一個TM都擁有可以同時運行多個SubTask的限額,Flink稱之爲TaskSlot。當TM啓動後, TM 將slot限額註冊到Cluster裏的JM的ResourceManager(RM), RM知道從而Cluster中的slot 總量,並要求TM將必定數量的slot 提供給JM,從而JM 能夠將Dataflow Graph的task(sub task)分配給TM 去執行。TM是運行並行子任務(sub Task) 的載體 (一個Job workflow 須要分解成不少task, 每個task 分解成一個過多個並行子任務:sub Task) , TM須要把這些sub task在本身的進程空間裏運行起來, 並且負責傳遞他們之間的輸入輸出數據, 這些數據 包括是本地task的和運行在另外一個TM裏的遠程Task 。關於如何具體excute Tasks, 和交換數據 後面介紹。
  • Client:Client端(Flink Program)經過invoke 用戶jar文件 (flink run 中提供的 jar file)的裏main函數 (註冊data source, apply operators, 註冊data sink, apply data sink),從而在ExecutionEnvironment或StreamingExecutionEnvironment裏創建sink operator 爲根的一個或多個FlinkPlan(以sink爲根, source 爲葉子, 其餘operator爲中間節點的樹狀結構), 以後client用Flink-Optimizer將Plan優化成OptimimizedPlan(根據Cost estimator計算出來的cost 優化operator在樹中的原始順序, 同時加入了Operator與Operator鏈接的邊 , 並根據規則設置每一個邊的shipingStrategy, 實際上OptimizedPlan已經從一個樹結構轉換成一個圖結構), 以後使用GraphGenerator(或StreamingGraphGenerator)將OptimizedPlan轉化成JobGraph提交給JobManager, 這個提交是經過JM的DispatcherRestEndPoint提交的。
  • Communication: JobManager 與Taskanager都是AKKA cluster裏的註冊的actor, 他們之間很容易經過AKKA(實現的RPCService)通信。 client與JobManager在之前(Version 1.4及之前)也是經過AKKA(實現的RPCService)通信的,但Version1.5及之後版本的JobManager裏引入DispatcherRestEndPoint (目的是使Client請求能夠在穿過Firewall ?),今後client端與JobManager提供的REST EndPoint通信。Task與Task之間的數據(data stream records)(好比一個reduce task的input來自與graph上前一個map, output 給graph上的另外一個map), 若是這兩個Task運行在不一樣的TM上,數據是經過由TM上的channel manager 管理的tcp channels傳遞的。

 

1.2  JobManager

 (圖-2,JobManager的內部結構) 算法

 如上一章所述, JobManager 是一個單獨的進程(JVM),  它是一個Flink Cluster的 master 、中心和大腦, 他由一堆services 組成(主要是Dispather, JobMaster 和ResourceManager),鏈接cluster裏其餘分佈式組件 (TaskManager, client及其餘外部組件),指揮、得到協助、或提供服務。sql

  • ClusuterEntryPoint是JobManager的入口,它有一個main method ,用來啓動HearBeatService, HA Sercie, BlobServer,  Dispather RESTEndPoint, Dispather, ResourceManager 。不一樣的FlinkCluster有不一樣的ClusuterEntryPoint 的子類,用於啓動這些Service在不一樣Cluster裏的不一樣實現類或子類。Flink目前(version1.6.1)實現的FlinkCluster 包括:
    • MiniCluster : JM和TM都運行在同一個JVM裏,主要用於在 IDE (IntelliJ或Eclipse)調試 Flink Program (也叫作 application )。
    • Standalone cluster : 不鏈接External Service (上圖中灰色組件,如HA,Distributed storage, hardware Resoruce manager), JM和TM運行在不一樣的JVM裏。 Flink release 中start-cluster.sh啓動的就是StandaloneCluster.
    • YarnCluster : Yarn管理的FlinkCluster, JM的ResourceManager鏈接Yarn的ResourceManager建立容器運行TaskManager。BlobServer, HAService 鏈接外部服務,使JM更可靠。
    • MesosCluster : Mesos管理的FlinkCluster, JM的ResourceManager鏈接Mesos的ResourceManager建立容器運行TaskManager。BlobServer, HAService 鏈接外部服務,使JM更可靠。
  • HighAvailabilityService:重複以前的話:JM是一個Flink Cluster的 master 、中心和大腦, 若是JM崩潰了,整個cluster就沒法運行了。HAService可以使多個JobManager同時運行,並選舉一個JM做爲Leader, 當Leader失敗後在從新選舉,使另個健康的JM取而代之成爲leader, 從HA存儲中讀取MetaData(Graph,snapshot)從而 繼續管理Cluster的運行。HighAvailabilityService 只保護JM裏的DispatcherRestEndpoint, Dispatcher, ResourceManager 和JobMaster 4個核心服務, 從理論上來說, 這些service的各自的leader有可能來自不一樣的JM, 這就要看外部作Coordination的服務的Leader Election策略會不會把他們都從一個JM 選了。目前,Flink支持的和在使用的HighAvailbilityService有ZooKeeperHaService和StandaloneHaService。
    • ZooKeeperHaService:鏈接外部的ZooKeeper cluster作多個JM的Leader Election,從指定的存儲(一般是HDFS)存取JM metadata, 從而當JM takeover 或從新啓動時可以獲取失敗以前的snapshot or savepoint, 從而繼續服務。
    • StandaloneHaService : 不支持多個JM Election。但支持從指定的存儲存取JM metadata, 作失敗後重啓恢復。
  • BlobServer 使用來存儲Client端提交的Flink program jar,  jobGraph file, JM 的全部services , 和全部的TM都鏈接同一個BlobServer (能夠是LocalDisk, HDFS, S3 , 或其餘的 Blob數據庫)讀取這些數據。
  • HeatBeatService , 用來運行JM 與TaskManager之間的心跳服務。 好比 ResourceManager 與JobMaster和全部TaskManager之間的心跳, JobMaster與全部TaskManager之間的心跳。若是心跳消失, 相應的HA 容錯措施就要啓動。 好比一個TM與JM的心跳沒了,那麼相應的容錯措施就會執行了。好比JobMaster的心跳消失,HA就會從新選舉新的JobMaster Leader;TM的心跳消失,ResourceManager就要將task分配到其餘空閒的TM的slot裏,若是沒有空閒的slot ,RM 就會向外部的ResoureManager申請新硬件和啓動新的 TM以提供空閒的 slot。Flink的心跳消息是經過AKKA 傳遞的。
  • DispatcherRESTEndPoint是JM的4大核心服務之一(其餘三個分別爲Dispatcher, JobMaster和ResourceManager),受HAService的保護, 是Flink客戶端與JM交互的REST接口, 也是Flink custer 的WebMonitor。非核心服務實際上都是一些UtilityService, 他們非JM獨有,須要用時可隨時實例化:好比Client端也會使用HAService來獲取DispatcherRESTEndPoint的leader的地址和端口, TM也會使用BlobServer 。DispatcherRESTEndPoint是用Netty搭建的RESTService, 它建立了大概有290個handler 對應不容的資源地址及方法。這些handler大都須要經過RPC方式調用Dispatcher 的遠程方法來知足客戶的請求。
  • Diaptcher是DispatcherRESTEndPoint的後端服務層,它實現了RestDispatcher接口, 從客戶端(包括FlinkClient和Flink Web Dashboard)提交給又有來自於EndPoint的請求,都由這個接口裏的方法服務, 這其中最總要的方法就是submitJob。當Dispather受到submitJob的調用時,他會先在本JVM裏建立一個JobMaster服務,並將 JobGraph和Flink applicaiton 的jar file , 轉交給這個JobMaster去安排job具體的運行。
  • JobMaster的是用於一個Job的Master, 當集羣裏由多個Job同時運行則會有多個JobMaster同時運行,每個JobMaster只會負責一個job。當接收到jobGraph時, JobMaster首先會將jobGraph轉換成ExecutionGraph:一個能夠指導task並行運行的數據流程圖,並向ResouceManager(RM)申請運行這個ExecutionGaph須要的資源(TaskSlot):好比一個並行度爲8的job,必須有8個TaskSlot才能運行起來, 而後按照ExecutionGraph將task schedle到Taskslot中去, 並定時的對task作checkpoint, 以備重啓時恢復到崩潰前的現場。
  • ResourceManager負責管理FlinkCluster裏全部TaskManager的TaskSlot資源(至關於TM裏的一個運行線程)。當一個TM啓動時,它會將本身的TaskSlot註冊到RM。當JobMaster向RM申請slot時,RM會要求TM將它空閒的slot(已註冊到RM,因此TM知道全部slot的狀態)提供給JobMaster使用,以後JobMaster纔會將相應的Task 安排到slot裏運行。若是集羣裏的TaskSlot不夠, RM會向外部的ResourceManager(好比Yarn/Mesos/Hubernetes)申請新的容器(container) 去啓動新的TM從而知足JobMaster的slot資源的需求。

1.2.1  展開JobManager後的Flink架構

從以上所述, JobManager是一組Service的總稱, 其中真正管理Job調度的組件叫JobMaster ,負責資源管理的組件叫ResoruceManager, 負責接收client端請求的組件叫Dispatcher(包括Dispatcher和DispatchRestEndpoint)。其實Flink源碼裏有叫JobManager的包和類,功能上也是負責Job調度管理以及snapshot管理,但它應該在Flink某個版本之後就legacy了(估計是從version1.3開始)。這三個服務統稱爲還叫 JobManager,上真正管理做業的是JobMaster。這一點在讀code時讓人迷惑,好比JobManagerRunner啓動的倒是叫JobMaster的類。可是他不叫JobMasterRunner,這也體現了JobMaster實際是取代了JobManager類,保留legacy類是爲了向後兼容。如下是Client, 展開的JobManger(受HA 保護的Dispather, JobMaster, ResourceManager)和TaskManager處理submitJob的流程圖,這個比較圖-1更能體現當前的Flink runtime架構 (Flink 1.6):docker

 

(圖-3)展開JobManager後的Flink 架構, 來自於《 Stream Processing with Apache Flink》shell

 

以上的架構嚴格來說在Flink裏被稱做 SessionMode ( Cluster的EntryPoint類都是SessionClusterEntryPoint的子類),  若是沒有外部命令 terminate cluster, 在這種模式下的FlinkCluster 是Long running 的, 多個job能夠同時運行在同一個flinkcluster裏。 SessionMode 在Flink的各類部署都是支持的, 包括Standalone, Kubernetes, Yarn, Mesos, 上圖實際上是StandaloneSessionCluster的流程。 還有一種模式叫作JobMode, 區別就是Job(或application) 的main class 和 jar 和在JobManager 啓動時經過的啓動參數裝載的, 不須要submitJob的過程, job運行完畢, cluster自動終結, 全部資源釋放。 在這種模式下, Dispather並不負責處理job的提交, 但其餘 Client發給DispathcherRESTEndPoint的請求(好比Query, CancelJob), 仍是由Dispatcher處理。

Flink的每一種部署模式(deployment mode)都是既支持Session Mode又支持JobMode的 (或partialy support), 區別如上所述, 但在架構上是一致的。 當有由外部的ResourceManager協助硬件資源分配時,流程略有全部不一樣, 以 FlinkCluster in Yarn 爲例, SessionMode下, 區別只限於多了RM經過Yarn自動啓動TM 的過程(4,5)。

(圖-4)FlinkCluster in Yarn Session mode,  來自於《 Stream Processing with Apache Flink》

關於deployment的細節,請參照後面的將Deployment的章節。

1.2.2  JobMaster

如圖一所示,JobMaster的主要工做是:

1.  JobGraph的scheduler : 將Client提交的JobGraph按照邏輯的向後關係(source -> transform -> sink), 以及並行關係(每一個operator的子任務只負責所有數據的中一部分), 將子任務分配到TaskManager的Slot中, 並按期的獲取每個子任務的運行狀態 (status)。

2. 觸發和管理Job的checkpoint snapshot:對於streaming job,按期的將運行中的每一個operator 的狀態(State)數據存入規定的存儲設備, 這些state數據能夠用於在Job恢復運行時,恢復相關子任務的失敗前的現場。

 

 (圖-5)JobMaster內部結構

  •  ExcutionGraph (EG) 是JobMaster 最核心的組件,它承擔了JobMaster 上述的的兩大責任: job scheduling 和 checkpoint snapshot  。EG的細節下節展開。
  • SlotPool 存放由全部TM Offer 過來的slot 。Offer 的過程就是圖-3中的3,4,5 或圖-4中的3,4,5,6,7,8。當EG須要slot去執行給sub Task時, 它就從SlotPool里根據必定的策略poll 一個slot ,而後將SubTask打包 (這個在TM講解中展開) 發送相應的TM 去執行 。 SlotPool實現了一個RPCEndPoint : SlotPoolGateway, 如圖-5中所示,感受這個Gatway是爲TM OfferSlot準備的。 實際上TM調用的是JobMasterGateway (到JobMaster), 而後JobMaster 經過SlotPoolGateway這個RPC 接口與SlotPool通信的。 看代碼時看到SlotPoolGateway時比較奇怪的, 由於它做爲JobMaster的組件,是沒有必要實現爲PCEndPoint的。集羣中運行的每個Job, 都會由一個JobMaster建立出來爲之服務, 每個JobMaster 都有一個SlotPool存放這個Job分配的Slot 。有一種多是Slotpool的實現這打算將slotpool共享給全部的的JobMaster ? 若是那樣的Slotpool  須要由Zookeepr 管理作Leader Selection 和 FailOver, 其實也沒什麼必要。
  • JobMasterGateway 是外界(ResourceManager, TaskManager)用來 同JobMaster通信的RPC接口。
  • RMConnection 和TMConnection(多個)是JobMaster 同TM 和TM 通信的PRC 通道。這些通道里包裹了RM和TM的PRCEndPoint的AKKA地址,以及永遠RPC call 的 XXXXGateway接口。好比ResourceManagerGateway 和TaskManagerGateay。
  • HearbeatManager 會以Interval爲(10,000 ms),timeout 爲(50,000ms) 向TM和RM發送heartBeat, 若是timeout 發生則相應的ErrorHandling 會出發, 好比從新鏈接RM,切斷timeout的TM 。interval 和 timeout都是可配置的, 前面的兩個數值是缺省值。
  • FatalErrorHandler : 一般指向ClusterEntryPoint (回顧一下圖-2)。JobMaster 在沒法鏈接和註冊有效的RM時會觸發FatalErrorHandler的onFatalError方法。onFatalError一般會簡單記下log, 而後推出JVM 。
  • RestartStrategy用於在EG中,但Job失敗時,嘗試重啓Job, RestartStrategy 能夠在Flink Java/Scala API種指定 。
  • BackPressureTracker,  當一個operator的處理速度小於的上游的下發速度, 數據就會在input buffer 裏積壓, 當buffer滿了的狀況, 數據就會無處可放。 Flink將這種狀況稱做爲BackPressure 。Dispatch 會持續的經過JM的BackPressureTracker對每個TM每一個 sub Task作Stack trace(100 stack traces every 50ms , configurable) ,而後用可能有BP的stack trace (好比訪問buffer, 訪問網絡棧等)同total tack trace 的比例決定系統是否有Back Pressure風險 。好比 <10%是OK的, <50%是低危的, >50是高危的。這個比率是能夠在Flink WebMonitor的Metrics裏看到的。若是是高危的怎麼辦, 實際上Flink就是把他經過Metrics發了出來,沒有作任何handling , 目的是讓用戶手工在工做流種作相應調整, 好比加速和降速Datasource 的輸出速率, 在某個operator 上加cache等。
  •  

 1.2.3  ExcutionGraph 

EG是面向Job 並行運行的圖結構,在JobGraph的基礎上它加入了對Operator並行執行的子任務,以及子任務的輸入輸出的描述 。 

                                                 圖-6 Execution Graph

 

  • ExecutionJobVertex : 對於每一個 Operator 或Task(單獨的或chained Opertor) ,EG 都會建立一個ExecutionJobVertex(EJV)對應 。
  • ExecutionVertex: 對於它 的每個並行子任務  (sub task), EVJ都會建立一個ExecutionVertex(EV)對應 。每個EV都知道輸出到哪裏 (IntermediateResult), 到哪裏獲取input (ExecutionEdges : 底層數據也來自IRP ), 和執行的Operator類。
  • IntermediateResultPartition(IRP)  : 表明IntermediateResult(IR)的一個Partition 。 它描述了它是由哪一個EJV提供數據, 並由哪一個EE消費數據的。
  • ExecutionEdge (EE) : 是每一個EV的input的描述, 好比source 來自與哪一個partition, edge 是sub task的第幾個input 。
  • Execution: 但EV被分配執行時,Exception對象會被建立做爲EV的一次嘗試, 分配slot,  將EV打包 成 TDD(TaskDeploymentDescriptor)並同TaskManagerGateWay 發送給TM (submitTask)  執行。Exception若是失敗, 新的Exception會被建立做爲另外一次嘗試。

  • TaskDeploymentDescriptor (TDD): 包括了該Sub Task 全部信息的描述: sub task的執行類 (operator 的類名), 輸入和輸出的描述, job的描述。 TaskManager收到TDD以後建立一系列物理對象執行的對象,把這些些建立在分散TM上對象拼在一張圖上, 實際就造成了EG的物理執行圖。 這個TM的章節在展開。
  • 總起來講EG經過EJV, EV,  IR, IRP, EE構成了一個包含了並行子任務 以及各個子任務間輸入輸出關係的總工做流圖。當scheduling EG的時候, 每一個EV都打包成TDD發給TM。TM會將TDD裏的子任務,輸出Partition和輸入Channel建立在TM的物理機上。 把TM的這些物理對象拼接起來,就造成了該工做流物理執行圖 。 Dispatcher就是經過收集這些物理對象的metrics和狀態信息,從而在WebMonitor上更新EG的。

 1.2.3.1  任務分配執行(Scheduler)

EG的Scheduling模式由兩種,一個叫Lazy, 一個叫Eager 。

Lazy的方式適用於Batch Job, 它先將全部的處理數據輸入的sub task 分配執行, 當TaskManager 返回 (同過JobMasterGateway) 已分配成功的信息, EG在根據EJV的上下游關係, 再給相應的EJV分配slot執行。分配的過程如上一小節所述, EJV全部EV都會經過TDD打包,而後要求SlotPool提供slot, 而後將tdd和slot信息都發送給相應的TM去實例化這個sub task而後運行起來 (再TM細述這個時怎麼實現的)。值得一說的是, EJV全部EV都應該一塊兒scheduling , 但當集羣裏沒有足夠的slot時, 同一個EJV可能只有部分EV被schedule了,若是那些沒有分配的相同EJV的EV再一個timeout(default 5分鐘)以後還沒法獲得slot, task 這時候會失敗, job 也會失敗。因此在計劃Job使用的資源時,計劃的總slot數 (好比當使用Yarn管理resource 時, yn * ys 是job向Yarn申請的總slot數 )必定要大於總的source sub task的總數量(source operator 的數量 * paralelism ), 不然部分soure sub task 得不到資源,再timeout以後就會出發failGlobal 使job 失敗。

Eager 方式使用與Streaming job, 在Eager模式下, 全部EV都必須都能獲得slot, 不然schduling 失敗, job 失敗。

 

1.2.3.2  CheckpointCordinator

Flink的Checkpoint這個概念仍是有必要簡單說明一下, 固然參考Flink文檔會獲得更全面的理解。Flink主要是一個Stream computation的架構,(固然它也能夠作Batch, 但Batch並非Flink的強項), Flink Streaming processing 的一個特性就是Stated Streaming 。 意思就是在它的流式計算的工做流裏, operator的都是能夠有狀態的。什麼是有狀態的?至關於一我的睡醒以後還記的本身是誰,而後還能繼續下來的生活,作完沒有作完的事情的意思 : 由於大腦裏存儲了過去的信息。Flink 的Operator能夠像這樣生活的,建立一下StateFull的變量 (好比ValueState<T> ), 而後週期性的將這些狀態存儲一個地方 ,當job重啓, operator 從新實例化的時候, 經過加載這些Sate信息,就可以重新回到上次重啓前的狀態,而後繼續這個operator的人生。週期性的(Periodically)存儲operator的State 信息, Flink稱做Checkpoint, 每一次存儲叫作一次snapshot 。

CheckpointCordinator的主要功能就是協調促使工做流裏全部的operator都週期性的觸發Checkpoint snapshot 。

換句話說,CheckpointCordinator的主要功能就是向全部的Execution (看前面回顧一下Execution的概念) triggerCheckPoint 。 每個EG都會建立一個CheckpointCordinator (CC), CC用內建的timer (Executor)定時(根據可配置的interval)的經過RPC向全部的TM觸發他們運載的全部Source Exection對應的Task的CheckpointBarrier。 上一句比較長,分解來講, 每個EG的Execution 都會對應一個Task (準確的說時SubTask)運行在某一個TM 裏, triggerCheckPoint 就是CC定時的經過RPC調用全部 TM 上全部的Source Task ( 是工做流裏開始位置的處理Source 數據的Task, 不包括Sink Task 也不包括 普通的Transformation Task ) 的triggerCheckpointBarrier方法。當一個SourceTask收到triggerCheckpointBarrier時,  它會命令內嵌的Invokable對象 (Operator, 或Operator Chain的封裝對象)執行 performCheckPoint , 這個過程大概有以下幾步, 不少多步驟的時異步執行的:

  1. CC 對全部的Soruce Execution triggerCheckPoint 
  2. TM 對 全部的 SourceTask triggerCheckpointBarrier
  3. SourceTask對應的Invokable 對象執行performCheckPoint 
  4. 首先 作Barrier 前的工做: 好比對齊和比較多個input channel的barrier 等。
  5. 其次建立Barrier event (只有source須要建立)並向下遊傳遞 : 下游的Operator 的收到這個Barrier , 也會作這5個步驟 ,只是當有多個input channel的時候(Input), 步驟稍微複雜一些而已。Sink 不須要建立Barrier ,由於沒有下游。
  6. 而後對Invokable對象的全部Sate述,拍Snapshot (克隆一份)
  7. 而後Invokable 將State數據傳遞迴JobMaster,
  8. 最後JobMaster再persist 到指定的存儲中。 

至於CheckPoint怎麼配置,State數據, StateBackend 包含那些, 以及CheckpointBarrier再工做流裏的聯動過程, 我就不贅述了,網上 應該不少介紹, 不過經過代碼閱讀,想強調以下幾點。

  • Checkpoint是對Streaming Job有效的, Batch Operator 不須要有狀態的。
  • Streaming Job 缺省狀態不開啓Checkpoint, 也不能經過Flink configuration 開啓, 只能經過Streaming API 再程序中開啓。 缺省狀態下, CheckPoint 週期被設置爲無窮大,所以永遠不會被執行。
  • AtLeastOnce只要開啓CheckPoint就能達到。
  • 對於ExtractlyOnce, 不少網上不少文章都聲稱這個 Flink的買點。 實際分析一下以上步驟, Checkpoint Snapshot 存儲的是上當barrier 到達operator是它的狀態, 但並非Operator 意外退出的狀態。因此恢復時,只能恢復到觸發barrier 時的現場,這沒法保證source的數據無重複下發。
  • 下面的文檔提供了ExactlyOnce的解決方案,這須要SinkOperator實現TwoPhaseCommitSinkFunction 。https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html。 大概意思時SinkOperator向external 下游發送數據時須要分兩步走(TwoPhase), 不過目前沒目前 Flink1.6.1 的 Sink都沒有實現這個功能 。
  1. 數據先暫存到臨時存儲裏, 好比存儲在臨時文件或buffer裏 ,這個叫PreCommit .
  2. 當全部的非Sink operator 都作完了CheckPoint , 當barrier 到達時, Sink再將臨時存儲中的數據一次性發送給下游。
  3. 固然若是下游支持Trasaction的話 (好比, precommit, commit ), 臨時存儲就不須要了。

 

1.3  TaskManager

在源碼裏, TaskManager 類同 JobManager被JobMaster取代 同樣,TaskExecutor取代了legcy TaskManager 併發揮着它的做用。本文裏TM指的是TaskManager的整個進程, JM表明JobManager整個進程。JM的核心類是JobMaster (固然還有ClusterEntryPoint, Dispatcher, WebMonitor和ResourceManager, 可是都起的是輔助做用), TM裏的核心類是TaskExecutor 。還有一個比較混亂的Term就是Task。Task對應JobGraph的一個節點,是一個Operator。在ExecutionGraph裏Tast被分解爲多個並行執行的subtask 。每一個subtask做爲一個excution分配到TM裏執行。但比較然人抓狂的是在TM裏這個subtask的概念由一個叫Task的類來實現。因此TM 裏談論的Task對象實際上對應的是EG裏的一個subtask ,若是須要表述Task的概念,用Operator。先澄清一下Terminology ,以避免語言混亂。

 

                                           圖-7 TaskExecutor

  • TaskSlotTable 是TaskExeutor最核心的數據結構, 它存放着TM全部的TaskSlot以及再Slot裏運行的Task。 TaskSlot只是一個邏輯單位,它並不綁定或鏈接任何資源, 但它規定了TM裏可以並行執行的SubTask的總數量。當TM 啓動時,總slot數由命令行參數傳入(-ys,default 爲1或flink configuration 裏設置的), TM建立這個指定數量的TaskSlot後供分配給SubTask使用。如前所示,TM裏的Task實際指的是EG裏的SubTask ,後面會詳述,Task的數據結構和執行過程。
  • NetworkBufferPool(NBP)是用來爲InputGate(IG)和ResultPartion(RP) 分配BufferPool的。一個Task要經過InputGate 從遠程另外一個Task的ResultPartition 要input數據,這個Task 同時也要將輸出的數據放到本身的ResultPartition裏。IG和RP都須要Buffer,而這些Buffer都從NetworkBufferPool去申請, NBP的poolsize由flink configuraiton 指定。
  • MemoryManager : 用於大量分配內存。在Bash模式下,輸入數據unbouned 的。一些subtask  須要對全體輸入數據進行 Sort或者Hash, 好比outJoin 。此時MemoryManager用大快速和大量的分配內存。關於Flink的內存管理,後面有一節詳述 。
  • IOManager:用於將內存的數據和硬盤之間交換。一樣在Bash模式下,輸入數據unbouned 的,若是EG很是複雜,Task的數量巨大。此時NetworkBuffer Pool分配的buffer不是夠用的。 IOManager可以用hard disk做爲Buffer還緩存數據,當Localbuffer夠用時,再將數據從硬盤裏換進,供本地或遠程消費。
  • ChannelManager是TaskExeutor很是關鍵的服務,他負責RP與IG之間快速的數據交換,後面專門有一節細述ChannelManager 。
  • BlobCacheService 用於加載將客戶的jar文件 ,Task裏年的Invokable須要調用jar文件裏代碼, 好比 source, sink, tranformation operator, 以及他們的依賴。
  • LocalStateStoreanager用於存取TM本地硬盤上的Sate數據, 但Task作CheckPoint是,除了向JM返回snapshot,它也會在本地存儲。
  • HeartBeatManager和FatalErrorHandler, 和JM裏的相似。
  • TaskExectorGatway, 同其餘的Gateway同樣,是用於cluster裏的其餘組件(JM和RM)遠程調用TM的stub interface .
  • ResourceManagerConnection 和 JobManagerConnections (注意,能夠鏈接多個JM ) 用於遠程RPC call RM 和 JM。

1.3.1  Data Exchange (ChannelManager)

我的以爲Task之間的數據交換,是TM裏的核心和重點,也是 Flink runtime的核心、重點和難點,它的質量是區別於其餘大數據系統的關鍵,它的質量直接影響Streaming/Batch 任務的延遲及吞吐量兩大指標。人們選擇一個大數據流式框架最想先了解的就是這兩個指標, 至因而不是Statefull, 可靠性可用性集成度怎麼樣,編程接口是否簡單不是不重要,但不是最關鍵的,是次要考慮的。因此本節做爲本文的重點,咱們深刻解讀一下TM管理的數據交換。

首先強調一下Task之間的在網絡裏或在本地的數據交換, 不是Task管理的,是由TM (或TaskExecutor)管理的。以下圖所示。

圖-8 JM和TM的數據交換

JM將EG的Execution 提交給 TM,TM將Execution轉化爲Task執行, 並負責Task之間的數據傳輸.

回想一下EG的數據結構:

  • 它包含多個EV對象, 每個EV表明一個並行子任務(sub task ),
  • EV 產生的結果存儲在termediateResultPartition(IRP)裏。
  • 多個屬於相同EJV的EV鏈接的IRP組成了IR(IntermediateResult)表明由一個Task節點產生的結果數據,
  • 這個結果數據Re-Partition後,每一個IRP須要根據新的Key從新分區(sub partition )而後經過EE (Execution Edge)發送給下游的SubTask 。
  • 每一條EE的source 是一個IRP, target 是另外一個EV .
  • 以下圖所示。

(R

圖-9 EG 的數據結構 和 數據流

爲了將JM的EG的邏輯工做流在物理機上執行起來, TM建立了相應的物理數據結構。

ResultPartition(RP)概念上對應着EG中的IRP(IntermediateResultPartition), 它負責存放一個Task生成的全部結果數據。

ResultSubpartition (RS) 是對應着RP的數據Repartion以後shuffle數的中的一個parition ,它存放着RP從新分區後發送下游的某一個sub task的分區數據。

InputGate(IG): 在EE中, IRP是source, EV 是target, 它描述了分區數據的流向, 但對不物理實現這樣的結構仍是不夠的。 IG 由此而引入, RP是EE上的數據發送端(IRP的物理實現),IG是接收端上與RP功能相似的組件, 負責收集來自上游(RP)的數據區 (data buffer)。

InputChannel(IC) 是接受端與RS功能相似的組件,IG使用不一樣的IC鏈接不一樣EE上的RP 中特定分區的數據區。 好比在data shuffle的過程當中多個RP都產生鍵值爲a1的RS ,  這些RSa1中的數據最終都會流向同一個IG中的IC, 這其中每個IC承擔接受上游的RP中每個RS-a1中的數據。

數據在EE上的通信, 由RP到IG ,數據是以binary 形式傳輸的。也就是說數據進入RP前,會由總Serializer 將data record 序列化成binary format, 並存入data buffer 中。 Data data buffer 由IG 接受傳遞給下游的Oparor 前, 由Deseriealizer將數據從DataBuffer反序列化成data record, 供該Task消費使用。 Data buffer至關於高速公路上運輸乘客的大巴車, buffer中的數據至關於乘客 。 每一個大巴車的形狀, 運載量也是固定的,  不裝滿不發車 。它能極大的增長了整體數據傳遞傳輸量和創數率, 增長系統的吞吐量,  但同時也增長的單個數據的延遲 。缺省狀況下2048 個buffer會被建立,   每個32K字節 。對與比較大的record 須要多個buffer 承載 。每個RS和IC有一些大buffer組成, RP和IG就是這些大巴車的裝載者和卸載者 。

關於DataBuffer若是建立和管理, 參考後面的內存管理章節。

 

另外值得一提的是, 不一樣的對於RS的實現決定了實際的數據傳輸的方式。

PipelinedSubpartition支持streaming模式下的數據傳輸 (大部分的RS都是這種實現): 數據壓滿一個buffer (buffer size是configuration 指定)就向下傳遞 。 SpillableSubpartition 只有當RP的類型爲BLOCKING是纔會建立出來(Batch job 中的部分RS 是這種實現)。它支持在事先分配的Buffer不夠用的狀況下, 將Buffer中的數據Spill到硬盤中,從而該RS佔有的Buffer得以釋放。由於他會涉及IOManager (應該只有它會使用到IOManager), 因此我們細說一下。

那什麼是BLOCKING類型的? 從ResultPartitionType是這樣定義:BLOCKING(false, false, false)能夠看到, 3個false 分別以下:

  1. BLOCKING類型的RP的DataExchangeMode不是PIPELINED,   對於Batch Job, 只要下游須要shuffle, DataExchangeMode 就會被設置爲 BATCH mode,而不是PIPELINED 。
  2. Task沒有BackPressure的, 對於Batch Job, 全部的 operator 都沒有Backpressure , Backpressure 在streaming job Backpressure纔會被enabled 。
  3. 數據不是Bounded 。對於Batch Job, 由於BatchJob的數據流是unbounded (沒有window的界限), streaming job纔會有Window operaor, 纔會有界限。

簡單來講只有Batch job 裏的一些RP類型是Blocking的 ,由於BatchJob總一些須要shffule 輸出的operator纔會有纔會啓動Blocking模式, SpillableSubpartition纔會被建立。

注意 上面說到了的三個模式:概念比較混亂。

1. JobMode (Batch or Streaming):任務模式, 因爲決定ExecutionConext

2. ExecutionMode (PIPELINED (default), BATCH, PIPELINED_FORCE, BATCH_FORCE):可配置的數據流總體模式, 目的是經過一個可配置的ExecutionMode(可在ExecutionConfig中配置)來決定全部Operator的DataExchangeMode。它是Optimizer在優化Edge的shipStrategy和DataExchangeMode作策略選擇的依據。 詳細看DataExchgeMode代碼中,DataExchgeMode與ExecutionMode的Mapping關係。

 

3. DataExchangeMode(PIPELINED, BATCH, PIPELINE_WITH_BATCH_FALLBACK ) : 根據下游Operator,和 ExecutionMode, 由Optimizer 決定 數據下發模式 , 

下面是當JobMode 爲Batch, ExecutionMode 爲PIPELINE時, DataExchangeMode應該優化的結果。能夠看出來只要下游須要Shuffle,  DataExchangeMode就會被優化成BATCH模式, 此時Flink會建立SpillableSubpartition 。

DataExchangeMode.PIPELINED,   // to map
DataExchangeMode.PIPELINED, // to combiner connections are pipelined
DataExchangeMode.BATCH, // to reduce
DataExchangeMode.BATCH, // to filter
DataExchangeMode.PIPELINED, // to sink after reduce
DataExchangeMode.PIPELINED, // to join (first input)
DataExchangeMode.BATCH, // to join (second input)
DataExchangeMode.PIPELINED, // combiner connections are pipelined
DataExchangeMode.BATCH, // to other reducer
DataExchangeMode.PIPELINED, // to flatMap
DataExchangeMode.PIPELINED, // to sink after flatMap
DataExchangeMode.PIPELINED, // to coGroup (first input)
DataExchangeMode.PIPELINED, // to coGroup (second input)
DataExchangeMode.PIPELINED // to sink after coGroup

由於batch 工做模式下的 shuffle一般會伴隨的對整個group的排序, aggregation等, 下游須要獲得(該group的)全集纔可作這些操做。全局數據量比較大, 物理內存Buffer極可能不夠用, 這時候SpillableSubpartition(在IOManager的幫助下)可將一部分硬盤當Buffer來用, 者極大的幫助了RP端的數據緩存。但SpillToDisk必須實如今RP端嗎?不能夠在接受端(InputGate)實現嗎? 接受端的計算須要作密集的內存訪問(sort, hash, etc ),  這些算法都是在整個數據集上的操做, 因此數據須要緩存在MemoryManager管理的MemorySegment中 從而提升存取效率, 也能爲使Flink對內存作有效的管理。可現實是Flink的BatchJob須要消耗巨大的內存, 這跟接收端不使用硬盤作Buffer由很大關係。至少Flink 1.6.1的Memory是不使用硬盤作緩存的,雖然有HybridOffHeapMemoryPool, 並且它也使用了DirectMemory 來分配MemorySegment, 但並無實現用磁盤文件來映射內存 , 全部當上遊數據量很大, 但內存不足時, Flink task會很快的out of memory。以後看看Flink的最新版本是否有改善。

 

R

 

 圖-10 使用EG控制物流數據流(數據交換)

 圖-10描述了Task之間數據交換的大概流程。

  • 這是一個最簡單的MapReduce工做圖: 由一個Map和一個Reduce組成。這個Job並行度爲2, 並運行在兩個TM上。
  • M1, M2是同一個Map Operator的兩個並行的子任務。R1, R2是同一個Map Operator的兩個並行的子任務 。 
  • M1產生的數據存入RP1 中 (arrow 1)。RP1 通知JobManager(準確的說是JobMaster)(arrow 2) RP1中有數據產生。
  • RP1中其實產生一些SubParitition(RS) . JM 會通知R1和R2他們分別感興趣的RS已經準備好。(arrow 3a,3b)
  • R1, R2向 RP1發起數據請求 (4a,4b), 這些請求會觸發數據在兩個Task之間的傳輸 (5a,5b) 。之中5a是本地傳輸, 5b是跨TM經過網絡傳輸。

 

 圖-11, 跨TM的兩個Task之間的數據交換

 圖-11給出了跨TM的兩個Task之間的數據交換的更多細節。

  • M1中的MapDriver持續的產生record對象,而後傳遞給RecordWriter 對象。RecordWriter 由ChannelSelector,  一系列RecordSerializer (每個下游的RS都有一個對應的Serializer)和一個BufferWritter(更準確的說ResultPartitionWriter )組成。
  • 不一樣ChannelSelector的實現 (BroadcastPartitioner, ShfflePartitioner, ForwardPartitioner, etc)會由不一樣的選擇RecordSerializer 的策略。好比BroadcastPartitioner會將record發給全部的RecordSerializer, ShfflePartitioner會根據record的key決定發送給相應的RecordSerializer 。
  • RecordSerializer 將record序列化成二進制數據, 並把他們存入一個固定大小的buffer中, 當buffer 寫滿後, 由BufferWriter將該Buffer 寫入相應的RS中 (本例中是RS2)。
  • RP通知JM 本RP中的RS2已經有填滿數據,可供消費。 JM經過EG查找全部的消費這個RS的Task,並經過TaskExecutor通知到這些Task對應的IC, 本例中爲IC1。
  • IC1向RP申請傳送RS2中的數據 。RP將Databuffer 交給基於netty實現的的ChannelManager,(發送後, RS2中的buffer 得以釋放,還給NetworkBufferPool) 並由它發送給對端TM的ChannelManager , 從而數據將存儲到IC1中。
  • ReduceDriver 或者其餘的Driver/Invokable的run的方法是每一個一個Task的Engine 。ReduceDriver 的 Run()方法是一個while循環, 不停的從RecordReader(或更準確的名字是MutableObjectIterator)
    讀取next record, 根據他們的key 來決定他們是否來自同一個group, 而後調用相應的ReduceFunction進行reduce 。
  • MutableObjectIterator的next record的binary實際上來自於IC(本里中爲IC2)中buffer, 並經過Deserializer將binary轉化爲相應的Record。  當buffer 中的數據所有讀完, 該Buffer 得以釋放,還給NetworkBufferPool 。
  • 今後完成一個buffer從M1到R1的數據傳遞。

 

1.3.2  Task 提交與執行 (TDD, Task, AbstractInvokable , Driver , ...)

根據前面的介紹,Task是由JM(經過EG) 通過Scheduler (申請和Offer slot resource和根據schedule 策略,等等) 提交給TM執行的。申請resource 和提交Task都是經過JM, RM 和TM之間的RPC通道完成的。那麼具體提交了什麼呢?Task若是執行的呢?Task如何知道誰是上游從而創建InputChannel的?

實際上這是一個 從EV->TDD->Task->AbstractInvokable->Driver ->具體的Oparator 實現類變形的過程。

 

1. 從EV 到TDD : 

TaskDeploymentDescriptor(TDD) : 是TM在submitTask是提交給TM的數據結構。 他包含了關於Task的全部描述信息:
  • TaskInfo : 包含該Task 執行的java 類 , 該類是某個 AbstractInvokable的實現類 , 固然也是某個operator的實現類 (好比DataSourceTask, DataSinkTask, BatchTask,StreamTask 等), 
  • IG描述 :一般包含一個或兩個InputGateDeploymentDescriptor(IGD),
    •   一般一個Operator 有一個或多個邏輯的輸入, 好比Map/redue 只會有一個輸入, join會有兩個輸入。因此IGD描述是一個數組。
    •   每個IGD都會包含一組InputChannelDeploymentDescriptor(ICD), 每個ICD是該子任務對應的一個inputChannel 。 
    •   每個ICD 包含上游RP的ID和IP 地址。那麼IC怎麼知道應該消費RP的哪一個RS呢?
      •  當IG 和 source RP 的並行度相同時, 每個IC都會去消費各個SourceRP 中同子任務序號相同的RS .
      •  當IG 和 source RP 的並行度相同時,    經過對雙方平行度進行一番取餘, 來決定該IC 要消費的RS: 0個或多個。
  • 目標RP的描述: ParitionId, PartitionType, RS個數等等
  • 其餘的一些描述 : JobId, ExecutionId, SlotNumber,  subTaskIndex (子任務序號) , 等等。

 

不難看出,EV包含上述的全部信息,EV的一個方法createDeploymentDescriptor,完成了上述變形。JM 在向TM submitTask時,傳遞的是TDD不是EV。爲何要作此變形的而不是將EV直接傳過去既然他們很相似? 我想這個一個設計模式問題, EV是做爲ExcutionGraph的中的頂點 ,它最好只存在於JobMaster 的物理圖中, 而不是做爲參數傳遞給其餘組件,從而維護它的獨立性和單一性。參見Descripor設計模式。

當TaskExecutor(TM)接受submitTask 的RPC調用從而獲得TDD時, 他會將TDD實例化爲一個在TM上能夠執行的對象 : Task 。

 

2.  Task : 

Task 是一個Runnable 對象, TM接受到TDD 後會用它實例化成一個Task對象, 並啓動一個線程執行Task的Run方法。

Task實例化時, 他會將TDD中的IGD實例化成InputGate (IG) 和 InputChannel(IC), 將RPD實例化成RP . 在 Task 的Run 方法被調用時, 它根據TDD的 TaskInfo, 使用URLClassLoader將用戶的operator類從HDFS加載, 並實例化TaskInfo所描述的AbstractInvokable對象, 並將IG, IC, RP , 還有其餘全部的AbstractInvokable須要的服務類(MemoryManager, IOManager, CheckoutResponder, TaskConfig etc )都傳遞進去, 而後調用AbstractInvokable 的 invoke 方法。

3.  AbstractInvokable

如以前給出的一些例子 : DataSourceTask, DataSinkTask, BatchTask, StreamTask 。 它們是Task.Run()時, 經過TaskInfo加載並實例化的。 這些Task Operator的源代碼都在

org.apache.flink.runtime.operators 或org.apache.flink.streaming.runtime.tasks下面 。每一個BaskTask的須要的工具都是相似的, 只是計算的流程不一樣 , 因此BaskTask的invoke方法調用時, 它根據TaskConfig(信息繼承與TDD的TaskInfo) 加載和實例化不一樣的Driver 類, 並調用Driver的run方法由Driver指揮流程(input, calcuate, out , etc )。

4. Driver 

Driver類和AbstractInvokable位於同一個包內。 每個Driver的run() 大都是一個循環。 不停向IG 要next record , 寫metrics, 調用function 計算, 而後見結果發給RP。只不過有些drver一次計算須要兩個record, 有些driver 須要兩個record 來自不一樣的IG , 有些須要將全部的input所有收完才計算, 有些在window expired後才計算, 等等。下面是FlatmapDriver 的run() 。

while (this.running && ((record = input.next()) != null)) {
numRecordsIn.inc();
function.flatMap(record, output);
}

 

1.3.3  內存管理(NetworkBufferPool, MemoryManager, IOManager 

在stream模式下數據處理是有界限(bondary)的, 每一個window的處理所使用的內存是相對比較小 的,  因此Flink stream job 一般使用的內存較小。

但在batch 模式下, 數據處理是無界的 (所謂無界就是沒有Window),如前面所示, 不少job 須要將上游全部的input都取乾淨,纔開始計算, 如sort, hash join, cache 等, 此時所須要的內存是巨大的, 好比上游operator 讀取300G文件而後map成record, 下游operator 須要將這些record 同另外一組輸入作outer join  。因此Flink內存管理主要針對的是這些batch job 。

總起來將, Flink的內存管理仍是比較失敗的, 至少在我用的版本里(1.6.1) , 主要緣由仍是 MemoryManager並無聯合IOManager Disk去擴展內存。但我覺的MemoryManager的引入, 就是爲了管理Flink的內存, 以防止OutOfMemoryException的發生 (如Spark同樣 )。 防止溢出最直接的方法就是當系統內存不足或超過throttle時, 使用DiskFile以補充內存 , 從而完成 那些內存消耗巨大的操做。 只是version 1.6.1 還沒作好 ,或許之後版本會作好 。

先看下Flink的內存管理機制吧 。

從概念上將, Flink 將JVM的heap分紅三個區域。

  • Network buffers pool:  是一些 32 KiByte MemorySegment用於TM以前數據的批量傳遞。回憶一下前面圍繞圖-11的介紹。Network buffers Pool 在 TaskManager啓動時分配的。 缺省2048 buffers 被分配,可經過 "taskmanager.network.numberOfBuffers"調整。
  • Memory Manager : 是另外大量的  32 KiBytes MemorySegment, runtime algorithms (sort/hash/cache)使用這些buffer 用於將 records存入緩衝區 以後應用算法 ,record 是以serialized 的形式存儲在 MemorySegment裏的 。 這些MemorySegment Pool 是在TaskManager啓動時分配的 。MemorySegment Pool 的大小,能夠用兩種方式設置。
    •   Relative value (缺省): MemoryManager在全部的service 啓動後(包括network buffer pool ) 會計算heap 剩餘總量, 而後按必定比例 (by default 0.7) 做爲 pool size 。 這個比例可經過 "taskmanager.memory.fraction" 配置。
    •   Absolute value:  科經過 "taskmanager.memory.size" 配置一個絕對值, 好比10GB。
  • Remaining (Free) Heap:  剩下的Heap用於存儲 TaskManager's 數據結構, 好比 network buffer  Deserialized 後的 record 。

              

 

 圖-12 Flink Memory

Network buffers pool 和 memory segment pool 都是TM啓動時就分配的, 它們生存於JVM的老年代, 因此不會被GC回收的。只有Free Heap區域在新生代裏生存。 

IG/IC, 和RP/RS 的record存儲在network buffer 裏,有些RS (SpillableSubpartion)須要Spill to Disk ,所以他們須要IOManager 的幫助。

須要sort/hash/cache的Task瞬時內存消耗很是大,所以從IG接收record後就將他們存儲memory segment pool, 以後在對serialized record 應用 算法 。

其餘的Task大都是pipeline 方式, 來一個消費一個, 瞬時內存消耗不會大。此時這些Task會使用FeekHeap 區域的內存。

若是沒有人爲錯誤, 系統不會有瞬時的大量內存申請, 因此不會有OutOfMemoryException , 因此FreeHeap的區域應該是比較輕鬆的。

但是問題是, memory segment pool 目前沒有使用DiskFile 做爲OffHeapMemory, 它可以裝載上游下來的巨量 blocking 輸入嗎? 這就是Flink 1.6.1 的問題。不過這個MemoryManager實現的好很差的問題, 設計上是有防止OutOfMemoryException 的組件的。這個就期待Flink 新版本 補強這部分功能吧。

前面說「serialized record 應用 算法「,這是怎麼作到的呢?  Flink 實現了大量的 TypeSerializer TypeComparator,  他們懂得record 是如何序列化到字節數組裏的,因此也知道如何將record部分地deserialize 。一般算法只是使用record的某個或某些field。 部分deserialize極大的下降內存使用, 提高了數據存取的速度, 從而極大的提成了內存的使用效率。 serialized 形式的record 數據是Flink 可以將他們在TM之間, 跨進程和跨網絡的傳輸, 它是分佈式系統須要的數據形式, 同時, 它也爲MemoryManager 將它們在內存和硬盤之間交換提供了條件。

IOManager 使用FileChannel將MemorySegment同DiskFile創建映射, 從而實現將數據在MemorySegment和DiskFile之間寫入和讀回。

因此,雖然MemoryManager目前還有些問題,  在這種設計下, Flink的內存使用效率確定會進化的極好 。

 

1.4  ResourceManager

RM在Flink內部是Flink cluster 裏slot資源的管理者 , TM 提供slot,JM (JobMaster)消費slot 。 RM同JM和TM 都要保持心跳,以保持slot市場的活躍 ,以及在TM或JM失敗的時候通知給對方。

總起來講RM的做用主要包括:

  • 啓動和得到TM, 從而獲取Slots 。
  • 提供JM和TM發送對方的失敗通知。 如一個TM的心跳中止了, JM會通知消費該TM slot的JM。
  • 當註冊過來的TM的slot有剩餘時, RM會緩存起來。

第2、三項功能比較好實現。第一項功能須要同外部的集羣管理器合做才能實現。所用RM是一個隨環境不一樣而不一樣的組件。在不一樣的集羣環境裏, RM有不一樣的實現類。

市場上比較流行的集羣資源管理器主要有Yarn, Mesos, Kubernetes, 和 AWS ECS 。其中Yarn, Mesos中, 能夠利用ApplicationMaster/Scheduler是資源調度器,只要將RM在ApplicationMaster中運行起來,理論上 RM就能夠Yarn, Mesos的master通訊,爲TM分配容器和啓動TM。實際上JM的全部組件(Dispatcher, RestEndPoint, JM, ClusterEntryPoint, etc)都在ApplicationMaster啓動的。

Kubernetes和ECS 沒有ApplicationMaster的概念, 但TM能夠做爲一個有多個副本的deployment (K8s) 或 service (ECS) 運行。此時 TM 的多少(規模)是由deployment/service 根據必定策略自動擴容的而不是根據Flink須要的slot數量。Flink並無實現特殊的ResourceManager和K8s/ECS集成,此時的FlinkCluster使StandaloneCluster。若是能一個K8s/ECS 特殊的RM和集羣的master通信,使TM可以能按需擴容而不是自動擴容,那就和在Yarn裏同樣完美融合, 並且可以用上Docker的服務, 畢竟Yarn目前只是使用JVM做爲容器,並無真正達到真正資源的隔離 。

RM在不一樣cluster以不一樣的方式啓動新的TM以補充slot資源,固然當一個job 結束時,它也會同集羣的master通信,釋放container,關閉徹底空閒的TM。

在不一樣的集羣環境裏,RM可以管理TM的生命週期,那麼誰來啓動和結束JM 呢? 請參考後面的 Flink Deployment 。

1.5  HighAvailability 

 Flink的 HighAvialbility 主要有兩個服務組成 : LeaderEleketronService和LeaderRetrievalService 。

1.5.1.  LeaderEleketronService

該Service是用來選舉Leader的 。假設系統裏啓動了兩個Dispatcher 。先回顧一下什麼是Dispatcher , 看圖-2, JobMananager 的進程是ClusterEntryPoint的main 函數啓動的,ClusterEntryPoint啓動了 WebUI, Dispatcher 和ResourceManager。當用戶提交了 Job時, Dispather 實例化一個新的JobMaster 來管理這個job, Dispatcher也是CheckPointBarier 的發起者, 同時也是來自WebUI REST 後臺的請求的處理者。Dispatcher 的做用承上啓下, 做用很是核心,不可缺失,因此Flink啓用HA服務來保護它。 Flink 系統裏, 使用HA 保護的核心服務還包括ResourceManager ,JobMaster 還有WebUI 的REST Endpoint。當系統裏啓動兩個Dispatcher 誰來當Leader呢? 這就要LeaderEleketronService (LES) 的決定了。

目前Flink的起做用的LES使用ZooKeeper 實現的。實現方式就是兩個Dispather都試圖搶佔的特定ZooKeeper Path (dispather latch path)  的LeaderLatch(參見 curator framework),誰先搶到了, 就被選舉成Leader , leader的 AKKA URI 和UUID被被寫道一個特定的ZooKeeper path 中(leader path)  。只有Leader 是工做的,由於其餘組件在使用Dispather時會向獲取Leader Dispath的URI, 而後才RPC的 。 其餘競爭者不會接受到RPC, 他們只有繼續監聽,若是當前的Leader 退出了, LeaderLatch被釋放了, 從而新的leader會被選舉出來。

1.5.2.  LeaderRetrievalService

那麼對於Dispatcher的使用者, 怎麼知道誰是Leader呢? 這就須要LeaderRetrievalService了 (LES ) 。 對於用ZooKeeper 實現的LES, 它只須要監聽一下leader path, 它就知道誰是leader 了 。

好比, 當你用fink run 命令提交job 時, 假如系統裏有多個Fink REST Enpoint, Flink的ClusterClient 對先使用LES獲取Leader REST Endpoint, 而後纔會將job  發送過去。 

 

除了ZooKeeper的實現, HAService 還能夠以來外部的名字服務(如DNS, LoadBalencer , etc ) 實現。在LES和LRS 永遠返回 HA保護的服務的URI(不管是AKKA PRC URI, 或REST URI )。 該URI永遠保持不變, 若是提供服務的server失敗, 名字服務會將給URI映射到另一個工做的server 。 具體請參考 StandaloneHaServices的代碼。

 

1.6 Flink Deployment

目前爲止, Flink 支持大概4種部署方式或4種cluster 類型,MiniCluster(或LocalCluster),  StandaloneCluster,YarnCluster,MesosCluster 。 雖然前面提到過Kubernetes , ECS , Docker , 但他們的本質是將JM和TM docker 化 , 從而是他們運行在真正的 dockers裏面 , cluster 仍是 StandaloneCluster 。

圖-3 是StandaloneCluster , 圖-4是YarnCluster , MesosCluster 與YarnCluster 相似, MiniCluster是運行在IDE(e.g. IntelliJ )的虛擬cluster , 它主要用於 FlinkApplicaiton的調試 。在Session 模式下,表面上能夠看來StandaloneCluster 與YarnCluster基本流程是一致的, 只不過是啓動JM 和TM的啓動機制不一樣而已。YarnCluster/MesosCluster 裏TM是由他們各自的ResourceManager實現根據須要啓動的。 在 StandaloneCluster下, TM是手工啓動的。其實在YarnCluster下, JM 也是Yarn啓動的。其實, 如前所述, StandaloneCluster與YarnCluster / MesosCluster的本質區別是, 前者的用於TM硬件資源是管理員手工分配的, 後置是有RM同集羣管理器協調自動分配的。

從Flink內部看, 是爲了支持這些異構環境, Flink 對於不一樣的Cluster, 實現了不一樣的ClusterEntryPoint用於啓動不一樣ResoureManager ,以及Dispatcher 。不一樣集羣類型的ClusterEntryPoint對JobMode和SessionMode有 不一樣的實現, 好比YearnSessionClusterEntryPoint和YarnJobClusterEntryPoint。Session模式下, Dispatcher使用的時StandaloneDispatcher, Job模式下,使用的是MiniDispather 。 關於什麼是JobMode和SessionMode, 請參考圖-3下面的解釋。

如前所述,JM接受jobGraph的對象或對象文件(序列化後),jobGraph由FlinkClient生成, FlinkClient和JM位於不一樣的JVM (MiniCluster除外)。若是JobCluster沒法由FlinkClient啓動 (經過某種方式),則JobGraph生成之後須要存到指定位置,在手工啓動JobCluster讀入,這樣的過程比較複雜。基於目前的架構,儘管Flink在各類集羣環境裏對job mode , session mode 在代碼上都由支持, 但job mode 一般因爲集成度不夠好,用戶沒法方便使用。

 

我用一個表來描述他們跟具體的不一樣以及Flink是如何支持這些異構環境。爲了使這個表不至於太龐大,先把不一樣的cluster 類型表述下先。

1.  MiniCluser 

用於在IDE(IntelliJ, Eclipse) 運行FlinkApplication 的小型FlinkCluster環境,JM和TM運行在同一個進程裏,主要用於在IDE裏調試Flink Application。

2.   StandaloneCluster

能夠運行在單機或多機上的FlinkCluster環境,  JM和TM運行在不一樣的JVM裏。只要有JRE , 不管在Windows ,Linux均可以搭建StandaloneCluster。

用戶可使用FlinkClient 的command line,或API 直接向JM REST URI 提交job ,具體看Flink CLI 的 「-m" 選項。StandaloneCluster很是有利於搭建單元測試,集成測試以及演示環境。

3.   YarnCluster

在Hadoop集羣裏搭建的FlinkCluster, JM和TM都運行在Yarn管理的容器裏。JM作爲Yarn裏ApplicationMaster ,Flink cluster做爲Yarn的一個Application運行。

Yarn是Flink與之集成度最好的集羣管理器。 用戶能夠直接使用FlinkClient 的command line,或API 直接向Yarn提交job , YARN 會自動的啓動(或鏈接現有的)Flink JM, 自動啓動須要的TM,然後在該Flinkcluster運行任務。

具體看Flink CLI 的「-m yarn-cluster" 和 」applicationId" 命令行用法。


4.   MesosCluster

 在Mesos集羣裏搭建的FlinkCluster, JM作爲Mesos裏Scheduler , 一個 Flink cluster做爲Mesos的一個Framework運行, JM和TM均可以運行在Mesos管理的容器裏。 

TM由JM的resource Manager同Mesosmaster 協調按需自動啓動和銷燬。JM 須要先使用Marathon 建立服務, 由Marathon 啓動。Marathon服務的後端都是運行在Mesos的容器裏,

並且Marathon服務一個高可用性服務。  

用戶能夠直接使用FlinkClient的command line,或API 直接向Marathon 建立服務提交job ,具體看Flink CLI 的 「-m" 選項。

 

我的以爲Flink 與Mesos的集成度能夠在提升一些。目前用戶須要手動的爲JM建立Marathon Service, 可參考下面寫一個簡單的配置文件, 調用flink的shell腳本mesos-appmaster.sh啓動Flink

MesosSessionCluster的JM (該JM會啓動MemsosResourceManager用來啓動須要的TM)。

yhttps://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/mesos.html

  

Flink 徹底能夠改進一下Client端, 添加集成Mesos Marathon須要的 ClusterDescrptor類,CLI類(好比MesosJobClusterDescripor, MesosSessionClusterDescripor,FlinkMesosSessionCLI ),豐富「-m" 選項,

由ClusterDescrptor類自動建立和銷燬JM (Marathon 服務)

 

5.   Kubernetes/ECS/Docker

Flink 對於Kubernetes/ECS/Docker在源碼級別並無任何支持。只是說,能夠將StandaloneCluster的JM和TM運行在這三個以Docker 做爲容器服務的

集羣環境裏。因此是,Flink 與他們的集成度, 比較低 。Kubernetes的具體作法分別對StandaloneCluster的JM和TM作兩個deployment(就是能夠平行擴展的docker group ), 它們分別啓動StandaloneCluster的JM和TM (經過start-console.sh腳本), 而後對JM的

deployment作一個Service使其具備高可用性,固然須要將該Service的URI傳遞給TM,前面說過StandaloneHAService是靠統一的URI來提供HA服務的。更詳細的請參考下面。

https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/deployment/kubernetes.html

ECS和Docer的實現同Kubernetes 相似, 只是在各自使用的技術名稱不一樣而已。ECS的AutoScalingGroup等價於Kubernetes的Deployment。 common sense是同樣的, 只是各家用各家喜歡的名字。

總之,將Flink的部署在上述集羣裏, 目前手工的工做仍是比較多的,並且TM的數量是預先設定的,或按策略自動擴容的, 並非最優的由Flink RM 指導的按需擴容。獲得的好處最大應該是Docker的容器服務。畢竟Yarn或Mesos對Docker的支持並非很好。

我的以爲Flink 與他們的集成度能夠在提升一些。同提升Mesos的集成度的想法同樣, 在目前Fink的框架下,只須要改進FlinkClient,添加集成Kubernetes API-Server須要的 ClusterDescrptor類,CLI類(好比K8sJobClusterDescripor, K8sSessionClusterDescripor,

FlinkK8sSessionCLI ),豐富「-m" 選項,由ClusterDescrptor類自動建立和銷燬JM (JM 的Deployment和 Serice ), 添加K8sResourceManager, 用它來建立及擴容TM的deployment 。

 

Cluster Type  JM 啓動方式  TM啓動方式       ClusterEntryPoint   ResourceManager SuportedMode
 Mini

調試環境啓動的Flink Client是

LocalEnvironen或

LocalStreamEvronment,

它們的execute方法會

先啓動MiniClusterEntryPoint

由MiniClusterEntryPoint啓動  MiniClusterEntryPoint
.java

 StandaloneResource

Manager.java

只支持JobMode
 Standalone 手工,如sart-cluster.sh腳本
手工,
如flink-consol.sh
腳本

StandaloneSession

ClusterEntryPoint.java

 

StandaloneJobCluster

EntryPoint.java

 StandaloneResource

Manager.java

支持SessionMode

job mode 有支持,但用戶沒法方便使用。 

 Yarn

jobmode下 由FlinkClient裏的

YarnClusterDescriptor,

調用Yarn的API經過YARN啓動。

JM作爲ApplicationMaster自動啓動。 

 

SessionMode JM須要手工運行

yarn-session.sh,

JM和TM的內存不是per-job,

只有管理員才能設定。

 JM 裏的YarnResourceManager

利用YARN的API啓動TM。

 YarnSessionCluster

EntryPoint.java

 

 YarnJobCluster

EntryPoint.java

 

 YarnResource

Manager.java

 支持JobMode和SessionMode

 

Mesos

經過Marathon Service啓動,

JM運行在Mesos的容器裏。

但Marathon Service須要手

工建立。

 JM 裏的MesosResource

Manager

利用Mesos的API啓動TM。

 

 MesosSessionCluster

EntryPoint.java

 

 MesosJobCluster

EntryPoint.java

 MesosResource

Manager.java

 支持SessionMode

job mode有支持,但不方便使用。須要

提高集成度。

Kubernetes

/ECS/Docker

 由JM service 啓動  由TM deployment啓動

StandaloneSession

ClusterEntryPoint.java

 

StandaloneJobCluster

EntryPoint.java

 StandaloneResource

Manager.java

 

支持SessionMode

job mode有支持,但不方便使用。

須要提高集成度。

 

 

 表-1 Flink Deployment

 

1.7 Failure Detection and Reaction

如前所述FinkCluster裏 (參考圖-3), 最核心的組件並且交互最頻繁的組件是Dispatcher, ResourceManager, JobMaster, 和TaskManager 。

其中對於Dispatcher, ResourceManager, JobMaster,一般是ClusterEntryPoint啓動Dispatcher和ResourceManager,而後當有job提交時,Dispatcher啓動JobMaster。(MiniCluster, JobCluster略有不一樣,但相似)。儘管他們之間使用RPC通信,他們生存在同一個JVM裏,若是其中一個失敗,同時使整個JVM(因爲異常崩潰了)失敗了,全部組件都失敗了。那麼他們中若是有承擔Leader責任的,也都會經過HA Service 切換到另外一個工做的JM後端的RPCEndPoint。

若是TM失敗了,  JM可以感知到heatbeat 機制感知到, JM 和RM都與主動向TM發送和hearbeat , 因此TM heartbeat 一旦timeout , JM會釋放該TM 提供 過來的slot,與此同時將slot上運行的Execution 的狀態設置爲失敗, 參考 SingleLogicalSlot::signalPayloadRelease .

在stream mode 下,JM 會嘗試將該TM上失敗的Task重現分配到別的TM上(若是slot資源時有的,或能夠分配的)。

在Batch mode 下, JM 會將整個job 失敗,然後嘗試從新啓動整個job 。

從新啓動的streaming Task會根據上一次的checkpoint 回復狀態, 繼續運行。

 

1.8 Flink Security

to be filled .

2.  Flink的源碼結構

上一章介紹Flink了的架構, 它包括了那些主要組件? 組件是怎麼工做的? 組件之間是怎麼工做的?組件使用的資源(包括服務器,容器,內存和CPU和Disk)是若是分配的? 以及組件是如何部署的?一個job是怎樣分割成小的Task並行執行在cluster裏面的?

怎麼保證組件HignAvailablity 以及組件是如何作 失敗處理的?以及Flink關於系統安全的設計等等。 這些關於架構的介紹,重點在於瞭解flink cluster中各個組件的互操做行以及容錯處理, 瞭解框架, 以便於再出現問題的時候我們能夠對它有比較針對性的debug 。關於Flink引以爲傲的statefull operator, window watermak, checkpoint barrier , stream/batch API 以及web monitor本文都沒有介紹,請參考相關的Flink的官方文檔。

Flink的源碼還是比較清晰易懂的, 尤爲是瞭解了她的架構後, 大部分的實現都很是符合common sense 。 不想在這裏貼一堆代碼段然後加註釋解讀了, 本文的篇幅已經太長了。過一下全部的包,解釋一下他們的主要做用,以及須要框架裏使用的主要類吧。用一個大表來列舉比較合適。

 Artifect    功能介紹
Flink-client

FlinkCommandLine 入口類 (CliFrontEnd)

解析Flink 命令行 (DefaultCLI, YarnSessionCLI )

負責將從用戶jar 文件中main函數生成Plan (PackagedProgramUtils),用通過LcoalExecutor或RemoteExecutor調用flink-Optimizer生成 jobGraph .

使用ClusterDescriptor啓動jobManager (好比在yarn), 使用並將jobGraph 用ClusterClient提交給本地或遠程的JM。

Flink-runtime

Flink 源碼的核心。 框架的核心組件都在裏面, 包括 Diskpatcher, JobMaster, TaskExecutor, ResourceManager, ClusterEntryPoint, WebUI以及他們依賴的子組件以及核心數據模型,

JobGroup, ExecutionGraph, Execution, Task, Invokable, Operator, Driver, Function, NetworkBufferPool, ChannelManager, IOMemory, MemoryManager, PRCService,

HAService, HeartbeatService , CheckPointCoordinator ,  BackPressure, etc .

這些類的名字和做用在上一章都或多或少提到過, 最好結合架構的介紹, 理解他們的代碼。

Flink-runtime-web

Flink Web monitor 的界面和handler .  handler會調用Dispacher 的方法處理客戶請求, 好比sumitJob.

Flink-java

Fink-scala

用Java 和 scala實現的Flink Batch API  , Dataset, ExecutionEnrionment, etc .

Flink-stream-java

Flink-stream-scala

用Java 和 scala實現的Flink stream API ,  DataStream, ExecutionEnrionment, , StreamExecutionEnvironment, windowing, etc .

和對streaming提供支持的runtime, Checkpoint, StreamTask, StreamPartitioner , BarrierTracker,  WindowOperator, etc .

Flink-optimizer

主要功能是優化Plan 和生成JobGraph。Flink-optimizer是一個很client端使用重要的庫,它決定了從客戶代碼(application) 到jobGraph造成過過程。 我個人覺得一般也是調查和解決

application問題的根源, 好比application 的寫法是否是合適的,有問題的, 或最優的等等。

在前面架構極少裏面,並沒有談及Flink-optimizer, 因此在這簡單介紹下。

fink application 開發者使用Flink API 編寫application, flink-client 爲了將application 最終能在Flink cluster裏運行起來是,

它首先通過讀取用戶jar 文件中的main函數生成Plan:以DataSink為根的一個或多個樹結構, 樹的節點都是application使用的 API operator, 每一個節點的輸入來自與樹的下一層節點。

不難想象樹的葉子節點都是DataSoruce: 他們沒有輸入節點,但有用於讀取數據源的inputFormat,  根節點是DataSink :他們既有輸入節點,也有用於寫入目標系統的outputFormat,

中間節點都會有一個多個的輸入節點。Plan數據結構描述了同用戶的application徹底相同的數據流的節點, 但它只是一個邏輯樹狀結構, 並無對鏈接節點的邊作描述,

也沒有對application編寫的數據流作任何修改。o

而後使用Flink-optimizer將Plan根據時根據優化策略設置節點的邊上的數據傳輸方式,並同時用OptimizerNode生成多種優化方案, 最後選擇cost 最小的方案產生的方案 做爲OptimizedPlan 。

好比將數據裝載方式(ShipStrategyType) 設置 FORWARD, 而不是 PARTITION_HASH而 應爲 FORWARD 的網絡cost 最低(0) 。OptimizedPlan是一個圖結構, 圖中的頂點(PlanNode)

記錄了自身的cost以及從source 開始到它的累計的cost 。OptimizedPlan主要針對與join和interation操做。

而後Flink-optimizer將OptimizedPlan 編譯成JobGraph。編譯的過程應該基本上是一對一的翻譯(從PlanNode 到 JobVertex), 但若是一串PlanNode 知足Chaining 條件

(好比數據在每一個oparator 都不需從新分區, 流過operator 以後, 直接forward到下一個operator), Flink-optimizer就像這些 operator 鏈接到一塊而後在JobGraph裏面只建立一個

ChainedOperator jobVertex, ChainedOperator同Spark裏面的stage 概念相似, 是優化的一部分。 

最後flink-client將jobGraph提交給FlinkCluster ,jobGraph 變形爲 ExceutionGraph在JM和TM上執行。

能夠從Optimizer 的compile 和JogGraphGenerator的 compileJobGraph展開看, 他們分別complie的是Plan和OptimizedPlan 。

Flink-optimizer優化的是parition 的選擇以及算法的選擇, 而不是DAG 的workflow 。

 

Flink-Table

 

用戶能夠用flink-java/flink-stream-java裏的api編寫flink application, 也能夠用 Flink-Table 的table api和 flink-sql寫 application 。 Flink-Table將數據源(Dataset, DataStream)都

generalize 成Table (row based ),用戶能夠用相似關係型數據庫的操做方式操做Flink的數據源, 這種方式雖然屏蔽了一些flink-api的特性 (好比 broadcast), 但極大的下降了application

的開發難度,減小了客戶程序的代碼量,從而極大的提升系統的重用度。

Flink-table 的底層仍是依賴dataset/datastream api, 因此基於table api 或SQL 的程序 (Program) 最終會翻譯成 Flink-optimizer 的Plan , 通過前面所述一樣的編譯優化過程,

最終一個EG的形式運行在JM和TM上。 固然,在被翻譯成Plan以前, Flink-table 的Program 也會有自身的優化過程, 好比SQL Plan optimization .

代碼須要看, 如何實現一個Table : StreamTableSource, BatchTableSource, BatchTableSink,AppendStreamTableSink

如何擴充FlinkSQL : UserDefinedFunction, ScalarFunction,TableFunction,AggregateFunction
TableEnvironment

Flink-yarn

Flink-mesos

Flink-container

 Flink 怎麼支持異構環境的, 包括不一樣環境裏的, 主要是異構環境裏的不一樣ResourceManager (啓動TM ),以及 ClusterDescriptor (啓動JM)如何實現的。

Flink-library

 flink-cep, flink-gelly, flink-ml

Flink-connector

Flink-format

鏈接外圍數據系統(數據源和輸出)系統的InputFormat, OutputFormat .

Flink-filesystem

 Flink支持的分佈式文件系統, hadoop, s3, mapr

Flink-metrics

 flink 的metrics 系統

Flink-statebackends

Flink-queryable-satate

 flink 的 state的存儲

flink-jepsen
flink-test

Flink的UT和集成測試。

 表-2 Flink packages                     

 

3.  Debug Flink

3.1.  Intellij 準備工做

須要安裝JDK1.8和Scala的plugin, 不然Flink沒法在IntelliJ裏編譯。

下載flink1.6.1的代碼,到下面的地址 download ,而後用maven 編譯 。

 https://github.com/apache/flink/tree/release-1.6.1

 

下載例子程序:  https://github.com/kaixin1976/flink-arch-debug 。 因爲github的限制, 上傳的instrument文件相對較小,讀者能夠經過自我複製擴大文件尺寸。

運行例子程序, 有兩個參數 :

第一個是 joinType,可取值爲"normal" 或"broadcast"。normal 方式徹底依賴FlinkOptimizer產生 優化的方案, broadcast是運用一下方法, 使join的第一路輸入廣播, 第二路輸入自由平均分配。

第二個使api類型, 可取值爲 "api" 或 "sql" 。api方式指的是Flink Java API, sql 使 Flink SQL API 。對於Join Flink Java API 能夠指定JoinHint或Parateters從而影響優化, sql 沒有這些接口, 只能在DataSoource 上作文章。

具體看JoinTest::main()。

3.1.2  Intellij debug configration 

如前所述, Flink分佈式環境的主要包含三部分:

1.  FlinkCient : 生成、優化和提交JobGraph.

FlinkClient的main 函數在CliFrontend中, VM 的classpath 和工做路徑設置爲本地安裝的一個flink路徑 , 用於找到全部須要的jar文件和 flink confugration .

程序參數跟flink run 同樣, 

 圖-13  Flink Client debug configuration

 

2.  JM : 生成ExecutionGraph,併爲其中的全部的的子任務分配slot資源, 並將子任務發送到slot所在TM上運行。

JM 使用StandaloneSessionClusterEntrypoint , 他會啓動Standalone Session Cluster 的entry point .

 圖-14  Flink JobManager debug configuration

 

3.  TM: 運行ExecutionGraph的子任務。

  圖-14  Flink TaskManager debug configuration

  

3.2  例子程序 和問題描述

例子程序是一個利用Flink SQL  把一大一小兩個數據源join再一塊兒的Flink  application 。大數據源叫instruments 包含了一些假造金融工具(好比股票期權等)的基礎數據(其中包括該工具交易貨幣的ID),小數據源是貨幣currencies  包含貨幣ID,和貨幣名稱。join目的是給與金融工具的貨幣ID得到對應的貨幣名稱(好比,人民幣,歐元,美圓等)。

join的過程是比較簡單的,首先 從文件建立兩個數據源(instrument 和 currency 的 TableSource), 而後用調用 SQL 作join, 最後建立TableSink將join的結果輸出到文件裏。程序的主體以下。

 public void joinSmallWithBig(String joinType) throws Exception {

String currencies = "currencies";
String instruments = "instruments";
//0. create the table environment
ExecutionEnvironment env = buildLocalEnvironment() ;

env.setParallelism(4);
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

//1. create and register instrument table source
registerInstrumentTableSource(tableEnv, instruments, System.getProperty("user.dir") +"/data/instruments.csv");

//2. create and register currency table source
if(joinType.equalsIgnoreCase("broadcast")){
this.registerCurrencyBroadcastTableSource(tableEnv,currencies, System.getProperty("user.dir") +
                                              "/data/currency.csv");

}else {
this.registerCurrencyTableSource(tableEnv,currencies, System.getProperty("user.dir") + "/data/currency.csv");
}

//3. join
Table currencyTable = tableEnv.sqlQuery(
"SELECT CurrencyId AS CurrencyId," +
"(CASE " +
"WHEN ISOCode IS NULL OR ISOCode ='' THEN ISOExtended ELSE ISOCode " +
"END ) AS CurrencyCode FROM " + currencies);
tableEnv.registerTable("currencyTable", currencyTable);


Table result = tableEnv.sqlQuery(
"SELECT RIC,Asset,AssetClass,Exchange,Periodicity,ContractType,CallPutOption,ExpiryDate,StrikePrice,
" +        
        "StrikePriceMultiplier,LotSize,CurrencyCode,AssetState " +
  "FROM currencyTable join instruments on currencyTable.CurrencyId=instruments.CurrencyID ");

//4. create sink and output the result
CsvTableSink sink = new CsvTableSink(System.getProperty("user.dir") + "/data/sql_join_result.csv", ",", 1,
           FileSystem.WriteMode.OVERWRITE);

result.writeToSink(sink);

env.execute();

}
 

 表-3  例子程序

 

問題是join的過程發生了嚴重的數據傾斜 (data skew )。全世界的金融工具分佈式極不平均的, 絕大部分使用美圓,歐元計價 (好比例子程序裏假造的instrument的數據源,大概有13/14 的instrument是以歐元計價的)。若是使用經常使用的 hash join 或 sort-merage join, 數據傾斜是必然發生的。 調用joinSmallWithBig,並傳入任意字符串 作爲joinType 時, 程序調用的registerCurrencyTableSource方法, 該方法就是建立了一個普通的CsvTableSourcei並註冊, 以後利用sql join 將兩個數據join起來,此時會發生數據傾斜。以下圖所示:

圖-15 flink-webmonitor 上的currency 數據源

 

  

 圖-16 flink-webmonitor 上的instrument數據源

能夠看出currency 的CsvTableSource一共有492條記錄分4個split讀入,數據量確實小,instrument數據源記錄數大概14million 條記錄有,比較大,一樣分4個split讀入, 分配不可謂不均衡 。但當join後, 均衡完全打破了,以下圖所示:

圖-17 flink-webmonitor 上的hash-join

兩個數據源的輸出策略(或JoinOperator的輸入策略)也就是鏈接數據源與JoinOperator的那條邊的ShipStrategyType被設置爲一個currencyId 爲key的HashPartition。  因爲本例中約13/14的instrument以歐元計價,大概有13 million個instrument 記錄依照hash code 被髮送到一個task的IG裏, 剩下約1 million的其餘三個task 。

借這張圖我們須要重溫一下的前面提到的task, RP,RS,IG,IC的關係, instrument數據源 有4 個task因此4個RP, 每一個RP會產生4個RS以裝載不一樣的hash code,共16個RS 。 下游join operator 有兩個IG (分別鏈接instrument數據源 和currency), 每個IG 的ShipStrategy都是HashParition。每個IG 有4個 IC, 每一個IC 都鏈接上游標號相同的RS (好比JoinOperator task1的 IG1 的IC1,IC2,IC3,IC4)只鏈接上游 instrumentSource task1, 2,3,4 的 RS1 。 JoinOperator是一個DualInputOperator, 一般的operator只有一個IG 。

上圖中13 million的record received (13,836,997)是joinOperator的第三個 task 的IG1 和 IG2中相同標號IC的記錄數量(好比IC2),因爲currency數據量很小,構成13 million 這個數量級的來源是instrument中以某幾個貨幣計價的instrument, 如前述主要是歐元 , 此例中歐元計價佔比大概爲13/14 。

數據傾斜的結果形成joinOperator的第三個task須要多於別的 task的100倍的內存,一般會超過容器預先分配的內存配額。罪魁禍首是 join operator 輸入邊 的 ShipStrategy : hash partition 。 SQL 裏沒法指定 join 使用哪一種策略, 爲何flink 會在翻譯(將sql 翻譯成jobGraph)的過程當中將 join 輸入冊率 優化成hash partition 呢?一個小數據集和大數據集join,最好的方式是將小數據集廣播給下游的全部task, 大數據平均分配,而後再join operator內部作hash join, 這樣的cost是最低的 。Flink爲何不能作到這樣的優化呢?若是使用Flink API (而不是 SQL) 狀況會好嗎?

 

3.3  分析問題設置斷點

初步的感受是Flink-optimizer 並無作到這樣的優化: 當join的兩個數據源大小懸殊時, 小數據集廣播,大數據集均分的輸入給join operator 。Flink這麼經常使用的優化都沒作到嗎?

讓咱們經過debug Flink-optimizer 代碼, 看看他時怎麼將本例的join input的ShipStrategy優化成HashPartition 吧。

3.3.1. 首先,準備調式環境

下載例子代碼,而後編譯,package 成flink-arch-debug-0.0.1.jar, 而後下載flink-1.6.1 的代碼,編譯, 按照圖-13設置flink-client的運行選項。

program arguments 設置爲 run flink-arch-debug-0.0.1.jar normal sql, 使用 "normal ","sql" 方式, 以下:

 C:\projects\flink-arch-debug\target\flink-arch-debug-0.0.1.jar normal sql

3.3.2. 而後,瞭解ShipStrategy

Flink Optimizer 的優化過程(參考Optimizer::compile方法)大概是:

首先將利用GraphCreatingVisitor 將Flink API (API, Table, 或 SQL) 產生的Plan翻譯成dag 包裏的OptimizerNode和DagConnection組成圖,

而後從圖的SinkNode(一個或多個)先前遞歸的調用getAlternativePlans方法從產生最優的Plan , 最優的Plan 是由plan包裏的PlanNode和Channel組成的Graph 。ShipStratigy是channel的一個屬性, 它描述數據從一個operator以那種方式發佈到RP中的RS中的。

最後由JobGraphGenerator將最優圖翻譯成JobGraph

那麼有多少種ShipStrategy呢? 連續按兩下SHIFT, 再鍵入類名(ShipStategyType.java)。參考一下本文最後面的IntelliJ的經常使用鍵吧.

 

public enum ShipStrategyType {   

NONE(false, false), //沒有策略
   FORWARD(false, false), //數據按原分區號傳遞到本地運行的Task相同分區,不跨網絡,不須要比較器(用於排序或Hash)
 PARTITION_RANDOM(true, false),//數據隨機傳遞到本地運行的Task任意分區,不跨網絡,不須要比較器
   PARTITION_HASH(true, true),//數據按照指定鍵值的Hash決定分區,到目標Task須要跨網絡傳遞,須要比較器
   PARTITION_RANGE(true, true),//數據按照指定鍵值的排序順序決定分區,到目標Task須要跨網絡傳遞,須要比較器  
  BROADCAST(true, false) // 將數據複製到任意一個分區中,不須要比較器
   PARTITION_FORCED_REBALANCE(true, false), // 將數據平均到各個分區中,不須要比較器
   PARTITION_CUSTOM(true, true); // 將數據按用戶指定的分區器分配分區,不須要比較器
...
表-4 ShipStrategyType

搜索一下PARTITION_HASH, 看誰將它設置給最優圖的Channel 對象, 過濾一下,應該由兩個地方:

一個是TwoInputNode::setInput 方法中, 它根據join operator 的JoinHint 或 Parameters 來設置 input Channel的ShipStrategy 。 好比, 若是想讓join的第一個input(Currencies)用廣播方式傳遞, 就可以使用」BROADCAST_HASH_FIRST「 hint, 或者將這樣的參數"INPUT_LEFT_SHIP_STRATEGY"="SHIP_BROADCAST"傳遞給join operator 。不過這些都只能在Flink Java API中 使用, (具體參考flink-arch-debug中的ApiJointTest類),沒法在SQL API 中使用。

第二個在TwoInputNode::getAlternativePlans中。如前文所述, getAlternativePlans首先會 OptimizerNode (本例中主要是JoinNode) 節點用於產生多個plan , 每一個 plan的輸入輸出的節點都是相同, 不一樣的是邊上ShipStrategy 。對於每一個Plan , getAlternativePlans都會調用RequestedGlobalProperties::parameterizeChannel從而設置這個Plan的InputChannel的ShipStrategy 。

在SQL application 中, 用戶沒法像在使用Java API 時那樣經過設置JoinHint或Parameter 來選擇想要的ShipStategy , 實際上若是指定了這兩項中的任意一個,也等同於跳過了getAlternativePlans (優化過程)。 由於既然用戶已選擇了想要的ShipStategy,就沒有必要用Optimizer 根據計算每一個方案的cost 來選擇最優方案了。 但是即便是由Optimizer 來選擇, 對於本例(小數據集join大數據集), 根據表-4 ShipStrategyType的 定義,PARTITION_HASH的cost 確定不是最低的 。對於大數據集 (instruments), 若是採用PARTITION_HASH將數據傳遞給下游, 網絡cost 必然是很高的, 由於PARTITION_HASH是一個shuffule 的過程, 下游 必然有位於不一樣TM遠程Task , 所以網絡傳遞的cost 必然很高。 看看Flink 怎麼定義Cost 和怎麼估計cost 的?

public class Costs implements Comparable<Costs>, Cloneable {

public static final double UNKNOWN = -1;

private double networkCost; // 上游數據傳遞到本節點在網絡上傳輸的cost, in transferred bytes

private double diskCost; //本節點緩存數據到磁盤上讀寫的cost , in bytes, 回憶一下前面說的SpillableSubpartition
 private double cpuCost; // 本節點算法在計算中使用CPU 的costs, 好比Hash, sort , merage, etc 
   
private double heuristicNetworkCost; // 如下時假設的cost, 很是高不許確

private double heuristicDiskCost;

private double heuristicCpuCost;
...

表-5 Cost定義

根據表-5 Flink cost的定義, 一個節點(好比JoinNode)的cost ,包括上游數據由上游傳遞過來的network cost, 數據本地緩存的 disk cost, 還有算法的cpu cost 。 圖-17第二條邊(Instrments)使用的HASH_PARTITION策略確定比FORWARD cost高不少, 由於若是使用FORWARD 的network cost 爲0 (參考表-4的定義),diskCostcpuCost 跟算法有關(好比使用HashTable或sort merage 去作join)。 第一條邊(Currencies)使用的HASH_PARTITION相比BROADCAST, cost 確定低一些, 可是Currencies數據源 數量特別小, 它的cost 和 這兩個cost 之間的差異 在Instrments引發的cost面前均可以忽略不記。因此感受上 HH(兩邊都是HASH_PARTITION)和BF(currency 邊Broadcuast, instrument邊Forward)相比, BF方案的cost必定低不少。 但爲何getAlternativePlans沒有選擇 BF呢?JoinNode沒有給出BF的選擇權嗎?看看JoinNode::getDataProperties()的代碼:

private List<OperatorDescriptorDual> getDataProperties(InnerJoinOperatorBase<?, ?, ?, ?> joinOperatorBase, JoinHint joinHint,   
Partitioner<?> customPartitioner )
{
...
  
switch (joinHint) {
case BROADCAST_HASH_FIRST:
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, true, false, false));
break;
case BROADCAST_HASH_SECOND:
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, true, false));
break;
case REPARTITION_HASH_FIRST:
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_HASH_SECOND:
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2, false, false, true));
break;
case REPARTITION_SORT_MERGE:
list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2, false, false, true));
break;
case OPTIMIZER_CHOOSES:
list.add(new SortMergeInnerJoinDescriptor(this.keys1, this.keys2));
list.add(new HashJoinBuildFirstProperties(this.keys1, this.keys2));
list.add(new HashJoinBuildSecondProperties(this.keys1, this.keys2));
break;
default:
throw new CompilerException("Unrecognized join hint: " + joinHint);
}

...
}

表-6  JoinNode的輸入節點要求的屬性

從表-6  JoinNode的輸入節點要求的屬性可知, 但由Optimizer 來決定(OPTIMIZER_CHOOSES)輸入節點的ShipStrategy(輸出分區策略)時, JoinNode對上游的要求時很是寬泛的,幾乎就是什麼都行 。

SortMergeInnerJoinDescriptor(this.keys1, this.keys2),HashJoinBuildFirstProperties(this.keys1, this.keys2),HashJoinBuildSecondProperties(this.keys1, this.key),
實際上就是他們爲JoinNode和它的input定義了三類需求 :
第一, Join算法但是時Sort-merge, 或者用一路輸入作HashTable的HashJoin ,或者用二路輸入作HashTable的HashJoin
第二, 第一路第二路輸入的partition 策略: 也就是所謂的RequestGlobalPropertities, 能夠是RANDOM_PARTITIONED (對應FORWARD shipStategyType),或者HASH_PARTITIONED(PARTITION_HASH),或者FULL_REPLICATION(對應BROADCAST),
或者ANY_PARTITIONING(它是一個通配策略,意思是RANDOM or HASH均可以)
第三,輸入的排序策略 :就是所謂的RequestGlobalPropertities, 排序或者不排序。
想想這三類需求的可選項組合起來,一共會有多少組合? 弄個表, 描述一下可能更清楚些。
     序號                Input1 的 
partition 備選 策略
 Input2 的 
partition 備選 策略
算法  Input1/Input2 的 排序備選策略  兼容性   network Cost   
1
FORWARD
 
HASH
SortMergeInnerJoin
 sort/sort yes  
2
FORWARD
 
HASH
 
HashJoinBuildFirst
 sort/sort  yes  
3
 
FORWARD
 
HASH
 
HashJoinBuildFirst
 not sort/ not sort  yes  
4
 
FORWARD
 
HASH
 
HashJoinBuildFirst
 sort/not sort  yes  
5
 
FORWARD
 
HASH
 
HashJoinBuildFirst
 not sort /sort  yes  
6
 
FORWARD
 
HASH
 
HashJoinBuildSecond
  sort/sort  yes  
7
 
FORWARD
 
HASH
 
HashJoinBuildSecond
 not sort/ not sort  yes  
8
 
FORWARD
 
HASH
 
HashJoinBuildSecond
  sort/not sort yes   
9  
FORWARD
 
HASH
 
HashJoinBuildSecond
 not sort /sort  yes  

表-6  GloabalProperties, LocalProperties 和算法的組合
從表-6 可知, 對於input1和input2的partition策略(表-6實際上用的是shipStategy, 他們是1-1對應的)一個組合FH(Forward, Hash ), 一共有9個能夠接受的分區-算法-排序的方案,每一種方案都有本身的cost, getAlternativePlans會最後選擇cost最低的方案。 除了FH組合, 合法組合還有
HB:Hash-Broadcast
HH:Hash-Hash
BB:Broadcast-Broadcast
BF:Broadcast-Forward
FB:Forward-Broadcast
固然還有HF,FH ,FF,這些都是不合法的,對於join操做會有數據丟失的組合。 6個合法的input1和input2的partition策略組合,根據表-6能夠計算, 一共能有6*9=54可互相兼容的分區-算法-排序的組合方案 。實際上若是在TwoInputNode.java:516行上設計斷點 (getAlternativePlans方法內)並運行例子程序,
你會發現變量 outputPlans裏有90個備選方案,那麼其中36個必定是重複和多徐的。

那麼, BF方案是既然在備選方案裏,並且它的cost理論上是最低的, 並且getAlternativePlans的算法是選擇cost最低方案, 爲何cost並不低的HH方案最終被選擇了 ? 答案只能有一個 : cost estimation有問題。

  

3.3 發現問題和給出解決方案 

 不貼Cost, CostEstimiator, DefualtCostEstimator (costs包的三個類)的代碼了。總起來所, CostEstimator 是根據OptimizerNode 的 estimatedOutputSize 成員變量(包括本節點和數據節點的)來計算 network和disk cost 的 , estimatedOutputSize是由節點成員函數computeOperatorSpecificDefaultEstimates()設定, 能夠參考DataSourceNode中該函數的實現:estimatedOutputSize 被設置成來自於InputFormat的數據源實際物理尺寸。 但是FlatMapNode, MapNode 只是簡簡單單的設置了estimatedNumRecords (使之同 source的 同樣),而並無計算estimatedOutputSize 。

重看一下圖-4的ExcutionGraph可知, Join的第一個input是FlatMap , 若是Flatmap沒有estimatedOutputSize 會致使它的下一個節點JoionNode沒法計算cost. 請參考DefualtCostEstimator::addHybridHashCosts()的實現, 當上遊的estimatedOutputSize爲負數(Unknown)時,本節點的cost爲被設置爲Unknown 。Unknown cost在選擇cheapest cost的plan時是不能作比較的, 只能依靠假設的Cost (如heuristicNetworkCost ), 這個東西很不靠譜, 同時也是HH方案被選上的緣由。

解決方案應該修改Flink代碼在FlatmapNode::computeOperatorSpecificDefaultEstimates 加上對estimatedOutputSize的估計(好比保持和上游相同, 或加一個discount), 或者修改DefualtCostEstimator 使之 在計算Cost時能經過estimatedNumRecords估計estimatedOutputSize (下策, 沒有 前者好)。 總之,Flink Optimizter 在 Cost Estimation 作的不夠好, 改善是應該的, 我剛剛看了Flink 1.9 (最新版)的 Optimizer , 這一塊仍是沒有修改。忽然還有加入Flink community 的衝動。

 若是不想修改 Flink 源碼, 或改了也沒法發佈, 那就看看有沒有替換方案了。

 

...
if (child1.getGlobalProperties().isFullyReplicated()) {
// fully replicated input is always locally forwarded if parallelism is not changed
if (dopChange1) {
// can not continue with this child
childrenSkippedDueToReplicatedInput = true;
continue;
} else {
this.input1.setShipStrategy(ShipStrategyType.FORWARD);
}
}
...

表-7 FullyReplicated DataSource 。

這段代碼表示若是上游節點是一個FullyReplicated DataSource , 那麼就不須要備選方案的選擇過程, 直接將這個輸入邊的shipStagy 設置爲Forward 。 FullyReplicated DataSource意思是, 這個Data Source 的每一個Parition輸出的都是數據源的全集 而不是不部分。那麼只須要將Currencies作成這種數據源,問題不就解決了嗎?!

雖然ShipStategy是Forward, 但實際下游(Join Node)的每個Task都獲得了Currencies的這個數據集的全集, 這個同Broadcast的效果是同樣的。並且第一路爲F的組合有FF, FB , FH , 選擇的範圍小了。 這不對阿,前面不是說FF, FH, HF是不合法的嗎? 要注意此時的F是因爲FullyReplicated DataSource而既決定的F,不是普通的Forward , 是合法的。FF, FB , FH產生了27種可選方案(參考表-6), 根據heuristicNetworkCost , FF是cost最低的。 那確定是最低的阿, Forward的數據都是本地傳輸的, network cost 是 0 。

下面就是如何將Currencies作成FullyReplicated DataSource了, 根據DataSourceNode.java:92, DataSource 的InputFormat只要是ReplicatingInputFormat, 則就是FullyReplicated  。在看看ReplicatingInputFormat的代碼, 只須要將原有的InputFormat (好比CsvInputFormat)外面包一層ReplicatingInputFormat就能夠了, 具體的看例子的代碼吧 : SqlJoinTest.java:95。運行結果以下。

 

  圖-18 flink-webmonitor 上FullyReplicated Currencies, 每個partition都輸出數據集的所有, 492條記錄。

 

 圖-19 flink-webmonitor 上instrument , 跟以前沒什麼區別。

 

 圖-20 flink-webmonitor 上的join , 數據在各個並行Task上很是均衡。

 

雖然這個例子只用到了Flink-client上的知識, 並無與JM和TM聯合調試, 可是問題的發現,分析和解決的方法是同樣的。 都是須要根據對架構的認識,縮小懷疑的範圍,然會反覆閱讀和調式相關代碼,找到問題,以及找到解決方案, 試想一下,若是沒有架構的知識,你怎麼可以懷疑這必定是Flink-optimizer的問題呢? 並且有時解決問題的方法並不必定是直接修改的出問題的代碼(固然這樣是最好的方案),根據相關代碼找到一個合適的替代方案也是解決問題的方法。

沒有想到描述一個這樣的小問題,這一章運用這麼長的篇幅, 對於複雜的問題,簡直能寫本書了。但願拋磚引玉,可以運用這樣的方法學來解決使用Flink遇到的問題: 架構->縮小範圍->閱讀和調式代碼->發現和分析問題->找到解決方案。

 

 3.4. IntelliJ的經常使用鍵

CTRL+ALT+Enter to complete line

SHIFT+SHIT search class in source code

CTRL+SHIFT+F serarch string in soruce code

Alt + F1 show current java in project view
CTRL+ALT+B navigate to implementation classes


CTRL+U navigate to super method
ALT + F7 Show reference
ALT + 4 Show run window
ALT + F12 Show terminal window

ALT+Enter Create A testClass
ALT+Insert TestMethod

CTRL+home move to file start
CTRL+end move to file end

 

參考

Flink conference Page: https://cwiki.apache.org/confluence/display/FLINK/Flink+Internals

很不錯的Blog: http://www.javashuo.com/article/p-khdfegoi-mh.html

Flink1.6 documentation: https://ci.apache.org/projects/flink/flink-docs-release-1.6/

Flink Github: https://github.com/apache/flink

相關文章
相關標籤/搜索