Azkaban做爲開源的調度系統,在大數據中有普遍地使用。它主要有三部分組成:Azkaban Webserver、Azkaban Executor、 DB。node
圖1 Azkaban架構ajax
圖1所示的是Azkaban的基本架構:Webserver主要負責權限驗證、項目管理、做業流下發等工做;Executor主要負責做業流/做業的具體執行以及蒐集執行日誌等工做;MySQL用於存儲做業/做業流的執行狀態信息。圖中所示的是單executor場景,可是實際應用中大部分的項目使用的都是多executor場景。下面主要介紹多executor場景下的azkaban調度過程。算法
圖2 做業流執行過程數據庫
圖2展現的就是Azkaban做業流的執行過程:緩存
1. 首先Webserver根據內存中緩存的各Executor的資源狀態(Webserver有一個線程會遍歷各個active executor,去發送http請求獲取其資源狀態信息緩存到內存中),按照選擇策略(包括executor資源狀態、最近執行流個數等)選擇一個executor下發做業流;session
2. 而後executor判斷是否設置做業粒度分配,若是未設置做業粒度分配,則在當前executor執行全部做業;數據結構
3. 若是設置了做業粒度分配,則當前節點會成爲做業分配的決策者,即分配節點;架構
4. 分配節點從zookeeper獲取各個executor的資源狀態信息,而後根據策略選擇一個executor分配做業;併發
5. 被分配到做業的executor即成爲執行節點,執行做業,而後更新數據庫。ide
1. ExecutorServlet類根據請求的ajax參數判斷,若是ajax=executeFlow,就去調ajaxAttemptExecuteFlow(req, resp, ret, session.getUser())方法
2. ajaxAttemptExecuteFlow方法裏,首先調getProjectAjaxByPermission方法判斷用戶是否有執行權限,若是驗證權限經過,且Project和Flow都存在,就調ajaxExecuteFlow方法
3. ajaxExecuteFlow方法的主要做用就是構造ExecutableFlow對象,設定執行參數(通知機制,併發,失敗策略),而後去調executorManager.submitExecutableFlow方法
4. executorManager.submitExecutableFlow方法:判斷執行策略(流水線、忽略、併發);若是是多執行節點模式,則將做業流提交到執行隊列queue;若是是單執行節點模式,選擇惟一執行節點下發做業流。
5. ExecutorManager.submitExecutableFlow()方法是Webserver端下發做業流的主要實現邏輯,下面重點細述其內容:
5.1 從exflow實例獲取做業流的flowId(就是做業流的名字),打日誌(「開始提交流XXX by 某某某了」)。
5.2 判斷queuedFlows是否滿,若是滿了打日誌(「提交失敗,Azkaban過飽和啦」),return;若是未滿,繼續往下執行代碼
5.3 獲取該做業流全部正在跑的實例的id, List<Integer> running
5.4 獲取執行設置options
5.5 從執行設置options裏獲取流的執行參數(是否enable,是則將參數生效)
5.6 判斷running是否爲空,若是爲空,即沒有併發的實例在跑
5.7 若是running不爲空,獲取併發設置getConcurrentOption()
5.7.1 流水線(pipeline):設置pipelineExcutionId爲running中最後提交的實例id
5.7.2 忽略(skip):拋異常,「流已經在執行了,忽略本次執行」
5.7.3 併發(ignore):僅修改日誌
5.8 根據白名單設置是否memoryCheck
5.9 executorLoader.uploadExecutableFlow(exflow) 寫數據庫表execution_flows,狀態爲preparing
5.10 構造具體的執行實例ExecutionReference
5.11 判斷是否多執行節點模式,若是不是,將該執行流的狀態標記爲active,即寫數據庫表active_executing_flows,將流dispatch到惟一執行節點執行。
5.12 若是是多執行節點模式,則將該執行流的狀態標記爲active,而後將流放入執行隊列queuedFlows。
6. 若是是多執行節點模式,ExecutorManager類在構造函數裏會調setupMultiExecutorMode()方法,該方法會建一個線程經過processQueuedFlows方法去持續地消費隊列裏的首個做業流。processQueuedFlows方法的主要內容就是按照必定規則去refreshExecutors刷新執行節點的資源信息,以及selectExecutorAndDispatchFlow從activeExecutors中根據策略選擇一個executor下發做業流。refreshExecutors()方法其實是經過遍歷每一個active executor,去發請求獲取狀態信息,而不是經過zookeeper。
至此,Webserver端的工做已經完畢。
1. 執行流到達Executor端,此時在數據庫中的狀態已是preparing
2. ExecutorServlet類根據請求的action參數判斷,若是action=execute,就去調handleAjaxExecute(req, respMap, execid)方法
3. handleAjaxExecute方法裏執行flowRunnerManager.submitFlow(execId),去調FlowRunnerManager的submitFlow(execId)方法來提交執行流。
4. FlowRunnerManager的兩個重要的數據結構:
4.1 Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();
4.2 Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();
submittedFlows用於跟蹤當前executor全部處於preparing狀態的流的執行;runningFlows用於存數當前executor全部正在執行的流的信息,當須要執行cancling()或killing()的時候就能夠找到這些流。
5. FlowRunnerManager.submitFlow(execId)方法是Executor端執行做業流的主要實現邏輯,下面重點細述其內容:
5.1 先判斷runningFlows是否包含該execId對應的實例,若是已經包含,拋異常
5.2 從executorLoader去獲取execId對應的執行實例(ExecutableFlow)flow
5.3 執行setupFlow(flow),配置flow:建立項目和執行的目錄等
5.4 獲取執行設置ExecutionOptions
5.5 判斷pipelineExecId是否爲null。若是不爲null,就判斷pipelineExecId對應的flowRunner在不在runningFlows中。若是在runningFlows中,起一個LocalFlowWatcher去監控在flow中各個job的執行狀態;
5.6 若是不在runningFlows中,起一個RemoteFlowWatcher去監控,即每隔必定時間(默認爲60秒)經過讀取數據庫的記錄來監控流中各個job的狀態
5.7 判斷執行參數裏是否包含flow.num.job.threads,若是存在且小於默認值10,則修改該值。這個值表明該流能夠同時執行的job線程數。
5.8 構造一個新的FlowRunner實例runner
5.9 configureFlowLevelMetrics(runner)配置runner
5.10 再次判斷runningFlows是否包含該次execId對應的執行實例,若是包含,拋異常
5.11 將runner加入到runningFlows的map
5.12 提交到TrackingThreadPool(工做線程池)
5.13 加入到submittedFlows的map
6. 自此,咱們就有了FlowRunner實例,下面咱們看FlowRunner中都幹了些什麼事。
FlowRunner其實就是一個線程,它的run()方法的內容以下:
6.1 Executors.newFixedThreadPool(numJobThreads) 建立flow內部job線程池flow
6.2 setupFlowExecution()
6.3 updateFlowReference()
6.4 updateFlow() 更新flow的狀態信息,寫數據庫表execution_flows
6.5 loadAllProperties()載入job參數和共享的參數
6.6 判斷輸入參數是否包含job.dispatch(做業粒度分配),若是包含且爲true,起一個新的線程jobEventUpdaterThread,用於跟蹤該做業流下各個做業的執行狀態。
6.7 執行runFlow()
6.8 runFlow()方法:根據DAG圖的算法依次執行job。從流的開始節點,遞歸調用runReadyjob()來執行做業,而後updateFlow();若是流還沒結束,根據重試設置,決定是否重跑失敗的做業。
6.9 在runReadyjob()裏會調runExecutableNode(node)方法,runExecutableNode方法再判斷job.dispatch參數,若是爲false,則經過LocalJobRunner本地執行;若是爲true,則再經過JobRunnerManager提交做業。
6.10 JobRunnerManager經過submitExecutableNode方法構建RemoteJobRunner,RemoteJobRunner會根據各執行節點(包含本節點)的資源狀態去選擇一個節點執行做業。
最後,整個過程能夠總結成一個圖,以下圖所示:
圖3 從源碼看做業流執行過程