下面正式提交node
String message = executorManager.submitExecutableFlow(exflow, user.getUserId());數據庫
那麼,到底executorManager是如何處理的呢?json
=================================================================================緩存
stop in azkaban.executor.ExecutorManager.submitExecutableFlowide
=================================================================================spa
其實主要就是在數據庫中插入一行記錄,具體看ip
runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(),get
Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);it
connection.commit();pip
這個很好理解吧
===
可是後面5個字段爲空對嗎?別急,而後緊接着就來處理這5個字段。
final String UPDATE_EXECUTABLE_FLOW_DATA = "UPDATE execution_flows "
+ "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? " + "WHERE exec_id=?";
QueryRunner runner = new QueryRunner();
String json = JSONUtils.toJSON(flow.toObject());
這個是數據,好比個人數據能夠是以下所示:
json = "{
"lastModfiedTime":1460217889979,
"executionOptions":{
"failureAction":"FINISH_CURRENTLY_RUNNING",
"memoryCheck":true,
"queueLevel":0,
"pipelineExecId":null,
"concurrentOption":"skip",
"notifyOnFirstFailure":false,
"notifyOnLastFailure":false,
"pipelineLevel":null,
"successEmailsOverride":false,
"failureEmails":[],
"disabled":[],
"flowParameters":{},
"successEmails":[],
"mailCreator":"default",
"failureEmailsOverride":false
},
"updateTime":-1,
"type":null,
"attempt":0,
"version":15,
"executionPath":null,
"executionId":4,
"nodes":[
{"jobSource":"Hello.job","startTime":-1,"updateTime":-1,"id":"Hello","endTime":-1,"type":"command","attempt":0,"status":"READY","outNodes":["World"]},
{"jobSource":"World.job","startTime":-1,"updateTime":-1,"id":"World","endTime":-1,"type":"command","attempt":0,"inNodes":["Hello"],"status":"READY"}
],
"submitTime":1460267483832,
"submitUser":"azkaban",
"startTime":-1,
"lastModifiedUser":"azkaban",
"id":null,
"endTime":-1,
"projectName":"myfirstProject",
"flowId":"World",
"projectId":9,
"proxyUsers":[],
"properties":[],
"status":"PREPARING"
}"
====================
這些數據須要被GZIP壓縮下。
try {
runner.update(connection, UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus().getNumVal(), flow.getUpdateTime(),
flow.getStartTime(), flow.getEndTime(), encType.getNumVal(), data, flow.getExecutionId());
connection.commit();
} catch (SQLException e) {
throw new ExecutorManagerException("Error updating flow.", e);
}
結果就是:
搞定!
================================================================================
在多核模式下,
if (isMultiExecutorMode()) {
// Take MultiExecutor route
executorLoader.addActiveExecutableReference(reference);
queuedFlows.enqueue(exflow, reference);
} else {
看來又須要操做數據庫了
@Override
public void addActiveExecutableReference(ExecutionReference reference) throws ExecutorManagerException {
// 看到這裏了
final String INSERT = "INSERT INTO active_executing_flows " + "(exec_id, update_time) values (?,?)";
QueryRunner runner = createQueryRunner();
try {
runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());
} catch (SQLException e) {
throw new ExecutorManagerException("Error updating active flow reference " + reference.getExecId(), e);
}
}
在本地還要緩存起來
queuedFlows.enqueue(exflow, reference);
===
message += "Execution submitted successfully with exec id " + exflow.getExecutionId();
}
return message;
最後就是返回了,也就是你在界面上看到的!
===
總結起來就是涉及到了2個表:
1)execution_flows
2) active_executing_flows