Azkaban簡介

  1. 1、Azkaban簡介  

  2. Azkaban做爲開源的調度系統,在大數據中有普遍地使用。它主要有三部分組成:Azkaban Webserver、Azkaban Executor、 DB。node

  3.               圖1 Azkaban架構ajax

  4. 圖1所示的是Azkaban的基本架構:Webserver主要負責權限驗證、項目管理、做業流下發等工做;Executor主要負責做業流/做業的具體執行以及蒐集執行日誌等工做;MySQL用於存儲做業/做業流的執行狀態信息。圖中所示的是單executor場景,可是實際應用中大部分的項目使用的都是多executor場景。下面主要介紹多executor場景下的azkaban調度過程。算法

  5. 2、做業流執行過程

  6.                      圖2 做業流執行過程數據庫

  7. 圖2展現的就是Azkaban做業流的執行過程:緩存

  8. 1. 首先Webserver根據內存中緩存的各Executor的資源狀態(Webserver有一個線程會遍歷各個active executor,去發送http請求獲取其資源狀態信息緩存到內存中),按照選擇策略(包括executor資源狀態、最近執行流個數等)選擇一個executor下發做業流;session

  9. 2. 而後executor判斷是否設置做業粒度分配,若是未設置做業粒度分配,則在當前executor執行全部做業;數據結構

  10. 3. 若是設置了做業粒度分配,則當前節點會成爲做業分配的決策者,即分配節點;架構

  11. 4. 分配節點從zookeeper獲取各個executor的資源狀態信息,而後根據策略選擇一個executor分配做業;併發

  12. 5. 被分配到做業的executor即成爲執行節點,執行做業,而後更新數據庫。ide

  13. 3、從源碼看做業流執行過程

  14. 首先是Webserver端:

  15. 1. ExecutorServlet類根據請求的ajax參數判斷,若是ajax=executeFlow,就去調ajaxAttemptExecuteFlow(req, resp, ret, session.getUser())方法

  16. 2. ajaxAttemptExecuteFlow方法裏,首先調getProjectAjaxByPermission方法判斷用戶是否有執行權限,若是驗證權限經過,且Project和Flow都存在,就調ajaxExecuteFlow方法

  17. 3. ajaxExecuteFlow方法的主要做用就是構造ExecutableFlow對象,設定執行參數(通知機制,併發,失敗策略),而後去調executorManager.submitExecutableFlow方法

  18. 4. executorManager.submitExecutableFlow方法:判斷執行策略(流水線、忽略、併發);若是是多執行節點模式,則將做業流提交到執行隊列queue;若是是單執行節點模式,選擇惟一執行節點下發做業流。

  19. 5. ExecutorManager.submitExecutableFlow()方法是Webserver端下發做業流的主要實現邏輯,下面重點細述其內容:

  20.     5.1 從exflow實例獲取做業流的flowId(就是做業流的名字),打日誌(「開始提交流XXX by 某某某了」)。

  21.     5.2 判斷queuedFlows是否滿,若是滿了打日誌(「提交失敗,Azkaban過飽和啦」),return;若是未滿,繼續往下執行代碼

  22.     5.3 獲取該做業流全部正在跑的實例的id, List<Integer> running
        5.4 獲取執行設置options
        5.5 從執行設置options裏獲取流的執行參數(是否enable,是則將參數生效)
        5.6 判斷running是否爲空,若是爲空,即沒有併發的實例在跑
        5.7 若是running不爲空,獲取併發設置getConcurrentOption()
             5.7.1 流水線(pipeline):設置pipelineExcutionId爲running中最後提交的實例id
             5.7.2 忽略(skip):拋異常,「流已經在執行了,忽略本次執行」
             5.7.3 併發(ignore):僅修改日誌
        5.8 根據白名單設置是否memoryCheck
        5.9 executorLoader.uploadExecutableFlow(exflow) 寫數據庫表execution_flows,狀態爲preparing

  23.     5.10 構造具體的執行實例ExecutionReference
        5.11 判斷是否多執行節點模式,若是不是,將該執行流的狀態標記爲active,即寫數據庫表active_executing_flows,將流dispatch到惟一執行節點執行。
        5.12 若是是多執行節點模式,則將該執行流的狀態標記爲active,而後將流放入執行隊列queuedFlows。

  24. 6. 若是是多執行節點模式,ExecutorManager類在構造函數裏會調setupMultiExecutorMode()方法,該方法會建一個線程經過processQueuedFlows方法去持續地消費隊列裏的首個做業流。processQueuedFlows方法的主要內容就是按照必定規則去refreshExecutors刷新執行節點的資源信息,以及selectExecutorAndDispatchFlow從activeExecutors中根據策略選擇一個executor下發做業流。refreshExecutors()方法其實是經過遍歷每一個active executor,去發請求獲取狀態信息,而不是經過zookeeper。

  25. 至此,Webserver端的工做已經完畢。

  26. 而後是Executor端:

  27. 1. 執行流到達Executor端,此時在數據庫中的狀態已是preparing

  28. 2. ExecutorServlet類根據請求的action參數判斷,若是action=execute,就去調handleAjaxExecute(req, respMap, execid)方法

  29. 3. handleAjaxExecute方法裏執行flowRunnerManager.submitFlow(execId),去調FlowRunnerManager的submitFlow(execId)方法來提交執行流。

  30. 4. FlowRunnerManager的兩個重要的數據結構:

  31.     4.1 Map<Future<?>, Integer> submittedFlows = new ConcurrentHashMap<Future<?>, Integer>();

  32.     4.2 Map<Integer, FlowRunner> runningFlows = new ConcurrentHashMap<Integer, FlowRunner>();

  33.     submittedFlows用於跟蹤當前executor全部處於preparing狀態的流的執行;runningFlows用於存數當前executor全部正在執行的流的信息,當須要執行cancling()或killing()的時候就能夠找到這些流。

  34. 5. FlowRunnerManager.submitFlow(execId)方法是Executor端執行做業流的主要實現邏輯,下面重點細述其內容:

  35.     5.1 先判斷runningFlows是否包含該execId對應的實例,若是已經包含,拋異常

  36.     5.2 從executorLoader去獲取execId對應的執行實例(ExecutableFlow)flow

  37.     5.3 執行setupFlow(flow),配置flow:建立項目和執行的目錄等
        5.4 獲取執行設置ExecutionOptions
        5.5 判斷pipelineExecId是否爲null。若是不爲null,就判斷pipelineExecId對應的flowRunner在不在runningFlows中。若是在runningFlows中,起一個LocalFlowWatcher去監控在flow中各個job的執行狀態;

  38.     5.6 若是不在runningFlows中,起一個RemoteFlowWatcher去監控,即每隔必定時間(默認爲60秒)經過讀取數據庫的記錄來監控流中各個job的狀態

  39.     5.7 判斷執行參數裏是否包含flow.num.job.threads,若是存在且小於默認值10,則修改該值。這個值表明該流能夠同時執行的job線程數。
        5.8 構造一個新的FlowRunner實例runner
        5.9 configureFlowLevelMetrics(runner)配置runner 
        5.10 再次判斷runningFlows是否包含該次execId對應的執行實例,若是包含,拋異常
        5.11 將runner加入到runningFlows的map
        5.12 提交到TrackingThreadPool(工做線程池)
        5.13 加入到submittedFlows的map

  40. 6. 自此,咱們就有了FlowRunner實例,下面咱們看FlowRunner中都幹了些什麼事。

  41. FlowRunner其實就是一個線程,它的run()方法的內容以下:

  42.     6.1 Executors.newFixedThreadPool(numJobThreads) 建立flow內部job線程池flow
        6.2 setupFlowExecution()
        6.3 updateFlowReference()
        6.4 updateFlow() 更新flow的狀態信息,寫數據庫表execution_flows
        6.5 loadAllProperties()載入job參數和共享的參數
        6.6 判斷輸入參數是否包含job.dispatch(做業粒度分配),若是包含且爲true,起一個新的線程jobEventUpdaterThread,用於跟蹤該做業流下各個做業的執行狀態。
        6.7 執行runFlow()
        6.8 runFlow()方法:根據DAG圖的算法依次執行job。從流的開始節點,遞歸調用runReadyjob()來執行做業,而後updateFlow();若是流還沒結束,根據重試設置,決定是否重跑失敗的做業。
        6.9 在runReadyjob()裏會調runExecutableNode(node)方法,runExecutableNode方法再判斷job.dispatch參數,若是爲false,則經過LocalJobRunner本地執行;若是爲true,則再經過JobRunnerManager提交做業。
        6.10 JobRunnerManager經過submitExecutableNode方法構建RemoteJobRunner,RemoteJobRunner會根據各執行節點(包含本節點)的資源狀態去選擇一個節點執行做業。

  43.  最後,整個過程能夠總結成一個圖,以下圖所示:

                                                                       圖3 從源碼看做業流執行過程

相關文章
相關標籤/搜索