Azkaban的Exec Server分析 28:Execute Server的任務真正執行過程

上節,咱們看到了任務放在executor的一個線程池裏,下面開始分析真正的執行過程!node

================================================================================app

jdb azkaban.execapp.AzkabanExecutorServer  -conf  /root/azkb/azkaban_3.0.0_debug/confspa

stop in azkaban.execapp.FlowRunner.run線程

rundebug

================================================================================ci

你們知道,關於一個拓撲圖來講,有一個起點的說法get

可能在拓撲圖裏有多個起點,看下azkaban的起點計算方法it

 public List<String> getStartNodes() {io

    if (startNodes == null) {table

      startNodes = new ArrayList<String>();

      for (ExecutableNode node : executableNodes.values()) {

        if (node.getInNodes().isEmpty()) {

          startNodes.add(node.getId());

        }

      }

    }

 

    return startNodes;

  }

================================================================================

以及執行過程

for (String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {

ExecutableNode startNode = flow.getExecutableNode(startNodeId);

runReadyJob(startNode);

}

================================================================================

通過一番操做後,最終執行的代碼以下所示:

stop in azkaban.execapp.FlowRunner.runExecutableNode





private void runExecutableNode(ExecutableNode node) throws IOException {

// Collect output props from the job's dependencies.

prepareJobProperties(node);

 

node.setStatus(Status.QUEUED);

JobRunner runner = createJobRunner(node);

logger.info("Submitting job '" + node.getNestedId() + "' to run.");

try {

executorService.submit(runner);

activeJobRunners.add(runner);

} catch (RejectedExecutionException e) {

logger.error(e);

}

;

}


本質就是把各個job拋到線程池裏運行,而後執行下面的代碼!

while (!flowFinished) {

synchronized (mainSyncObj) {

if (flowPaused) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

 

continue;

} else {

if (retryFailedJobs) {

retryAllFailures();

} else if (!progressGraph()) {

try {

mainSyncObj.wait(CHECK_WAIT_MS);

} catch (InterruptedException e) {

}

}

}

}

}

這裏面就是一些策略性的問題了,懶的看了。

後面重點去看具體的一個JobRunner的運行!

相關文章
相關標籤/搜索