Azkaban的Web Server源碼探究系列23: 一次性執行execute的正式提交

下面正式提交node

String message =  executorManager.submitExecutableFlow(exflow, user.getUserId());數據庫

那麼,到底executorManager是如何處理的呢?json

=================================================================================緩存

stop in azkaban.executor.ExecutorManager.submitExecutableFlowide

=================================================================================spa

其實主要就是在數據庫中插入一行記錄,具體看ip

runner.update(connection, INSERT_EXECUTABLE_FLOW, flow.getProjectId(), flow.getFlowId(), flow.getVersion(),get

Status.PREPARING.getNumVal(), submitTime, flow.getSubmitUser(), submitTime);it

connection.commit();pip

這個很好理解吧

===

可是後面5個字段爲空對嗎?別急,而後緊接着就來處理這5個字段。

final String UPDATE_EXECUTABLE_FLOW_DATA = "UPDATE execution_flows "

+ "SET status=?,update_time=?,start_time=?,end_time=?,enc_type=?,flow_data=? " + "WHERE exec_id=?";

QueryRunner runner = new QueryRunner();


String json = JSONUtils.toJSON(flow.toObject());

這個是數據,好比個人數據能夠是以下所示:



json = "{


"lastModfiedTime":1460217889979,

"executionOptions":{

             "failureAction":"FINISH_CURRENTLY_RUNNING",

"memoryCheck":true,

"queueLevel":0,

"pipelineExecId":null,

"concurrentOption":"skip",

"notifyOnFirstFailure":false,

"notifyOnLastFailure":false,

"pipelineLevel":null,

"successEmailsOverride":false,

"failureEmails":[],

"disabled":[],

"flowParameters":{},

"successEmails":[],

"mailCreator":"default",

"failureEmailsOverride":false

},

 

"updateTime":-1,

"type":null,

"attempt":0,

"version":15,

"executionPath":null,

"executionId":4,

"nodes":[

                 {"jobSource":"Hello.job","startTime":-1,"updateTime":-1,"id":"Hello","endTime":-1,"type":"command","attempt":0,"status":"READY","outNodes":["World"]},

{"jobSource":"World.job","startTime":-1,"updateTime":-1,"id":"World","endTime":-1,"type":"command","attempt":0,"inNodes":["Hello"],"status":"READY"}

],


"submitTime":1460267483832,

"submitUser":"azkaban",

"startTime":-1,

"lastModifiedUser":"azkaban",

"id":null,

"endTime":-1,

"projectName":"myfirstProject",

"flowId":"World",

"projectId":9,

"proxyUsers":[],

"properties":[],

"status":"PREPARING"

}"

====================

這些數據須要被GZIP壓縮下。

try {

runner.update(connection, UPDATE_EXECUTABLE_FLOW_DATA, flow.getStatus().getNumVal(), flow.getUpdateTime(),

flow.getStartTime(), flow.getEndTime(), encType.getNumVal(), data, flow.getExecutionId());

connection.commit();

} catch (SQLException e) {

throw new ExecutorManagerException("Error updating flow.", e);

}

結果就是:

搞定!

================================================================================

在多核模式下,

if (isMultiExecutorMode()) {

// Take MultiExecutor route

executorLoader.addActiveExecutableReference(reference);

queuedFlows.enqueue(exflow, reference);

} else {

看來又須要操做數據庫了

@Override

public void addActiveExecutableReference(ExecutionReference reference) throws ExecutorManagerException {

// 看到這裏了

final String INSERT = "INSERT INTO active_executing_flows " + "(exec_id, update_time) values (?,?)";

QueryRunner runner = createQueryRunner();

 

try {

runner.update(INSERT, reference.getExecId(), reference.getUpdateTime());

} catch (SQLException e) {

throw new ExecutorManagerException("Error updating active flow reference " + reference.getExecId(), e);

}

}

在本地還要緩存起來

queuedFlows.enqueue(exflow, reference);




===

message += "Execution submitted successfully with exec id " + exflow.getExecutionId();

}

return message;

最後就是返回了,也就是你在界面上看到的!

===

總結起來就是涉及到了2個表:

1)execution_flows

2)  active_executing_flows

相關文章
相關標籤/搜索