上節,咱們看到了任務放在executor的一個線程池裏,下面開始分析真正的執行過程!node
================================================================================app
jdb azkaban.execapp.AzkabanExecutorServer -conf /root/azkb/azkaban_3.0.0_debug/confspa
stop in azkaban.execapp.FlowRunner.run線程
rundebug
================================================================================ci
你們知道,關於一個拓撲圖來講,有一個起點的說法get
可能在拓撲圖裏有多個起點,看下azkaban的起點計算方法it
public List<String> getStartNodes() {io
if (startNodes == null) {table
startNodes = new ArrayList<String>();
for (ExecutableNode node : executableNodes.values()) {
if (node.getInNodes().isEmpty()) {
startNodes.add(node.getId());
}
}
}
return startNodes;
}
================================================================================
以及執行過程
for (String startNodeId : ((ExecutableFlowBase) node).getStartNodes()) {
ExecutableNode startNode = flow.getExecutableNode(startNodeId);
runReadyJob(startNode);
}
================================================================================
通過一番操做後,最終執行的代碼以下所示:
stop in azkaban.execapp.FlowRunner.runExecutableNode
private void runExecutableNode(ExecutableNode node) throws IOException {
// Collect output props from the job's dependencies.
prepareJobProperties(node);
node.setStatus(Status.QUEUED);
JobRunner runner = createJobRunner(node);
logger.info("Submitting job '" + node.getNestedId() + "' to run.");
try {
executorService.submit(runner);
activeJobRunners.add(runner);
} catch (RejectedExecutionException e) {
logger.error(e);
}
;
}
本質就是把各個job拋到線程池裏運行,而後執行下面的代碼!
while (!flowFinished) {
synchronized (mainSyncObj) {
if (flowPaused) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
continue;
} else {
if (retryFailedJobs) {
retryAllFailures();
} else if (!progressGraph()) {
try {
mainSyncObj.wait(CHECK_WAIT_MS);
} catch (InterruptedException e) {
}
}
}
}
}
這裏面就是一些策略性的問題了,懶的看了。
後面重點去看具體的一個JobRunner的運行!