[源碼解析]Oozie前因後果之提交任務

[源碼解析]Oozie前因後果之提交任務

0x00 摘要

Oozie是由Cloudera公司貢獻給Apache的基於工做流引擎的開源框架,是Hadoop平臺的開源的工做流調度引擎,用來管理Hadoop做業。本文是系列的第一篇,介紹Oozie的任務提交階段。html

0x01 問題

咱們從需求逆推實現,即考慮若是咱們從無到有實現工做流引擎,咱們須要實現哪些部分?從而咱們能夠提出一系列問題從而去Oozie中探尋。java

做爲工做流引擎須要實現哪些部分?大體想了想,以爲須要有:node

  • 任務提交
  • 任務持久化
  • 任務委託給某一個執行器執行
  • 任務調度
  • 任務回調,即任務被執行器完成後通知工做流引擎
  • 支持不一樣任務(同步,異步)
  • 控制任務之間邏輯關係(跳轉,等待...)
  • 狀態監控,監控任務進度
  • ......

由於篇幅和精力所限,咱們沒法研究全部源碼,回答全部問題,因此咱們先整理出部分問題,在後面Oozie源碼分析中一一解答:web

  • Oozie分爲幾個模塊?
  • 每一個模塊功能是什麼?
  • Oozie如何提交任務?
  • 任務提交到什麼地方?如何持久化?
  • Oozie任務有同步異步之分嗎?
  • Oozie如何處理同步任務?
  • Oozie如何處理異步任務?
  • 任務的控制流節點(Control Flow Nodes)和動做節點(Action Nodes)之間如何跳轉?
  • Oozie都支持什麼類型的任務?Shell?Java? Hive?
  • Oozie如何同Yarn交互?
  • Oozie如何知道Yarn任務完成?

0x02 Oozie 基本概念

2.1 組件

Oozie由Oozie client和Oozie Server兩個組件構成,Oozie Server是運行於Java Servlet容器(Tomcat)中的web應用程序。Oozie client用於給Oozie Server說起任務,Oozie client 提交任務的途徑是HTTP請求。sql

實際上Oozie Server就至關於Hadoop的一個客戶端,當用戶須要執行多個關聯的MR任務時,只須要將MR執行順序寫入workflow.xml,而後使用Oozie Server提交本次任務,Oozie Server會託管此任務流。shell

Oozie Server 具體操做的是workflow,即Oozie主要維護workflow的執行 / workflow內部Action的串聯和跳轉。數據庫

具體Action的執行是由Yarn去執行,Yarn會把Action分配給有充足資源的節點執行。Action是異步執行,因此Action結束時候會經過回調方式通知Oozie執行結果,Oozie也會採用輪詢方式去獲取Action結果(爲了提升可靠性)。apache

大體提交流程以下:json

Oozie client ------> Oozie  Server -------> Yarn ------> Hadoop

2.2 特色

Oozie特色以下:服務器

  • Oozie不是僅用來配置多個MR工做流的,它能夠是各類程序夾雜在一塊兒的工做流,好比執行一個MR1後,接着執行一個java腳本,再執行一個shell腳本,接着是Hive腳本,而後又是Pig腳本,最後又執行了一個MR2,使用Oozie能夠輕鬆完成這種多樣的工做流。使用Oozie時,若前一個任務執行失敗,後一個任務將不會被調度。
  • Oozie定義了控制流節點(Control Flow Nodes)和動做節點(Action Nodes),其中控制流節點定義了流程的開始和結束,以及控制流程的執行路徑(Execution Path),如decision,fork,join等;而動做節點包括Haoop map-reduce hadoop文件系統,Pig,SSH,HTTP,eMail和Oozie子流程。
  • Oozie以action爲基本單位,能夠將多個action構成一個DAG圖的模式運行。
  • Oozie工做流必須是一個有向無環圖,實際上Oozie就至關於Hadoop的一個客戶端,當用戶須要執行多個關聯的MR任務時,只須要將MR執行順序寫入workflow.xml,而後使用Oozie提交本次任務,Oozie會託管此任務流。

2.3 功能模塊

Oozie主要由如下功能模塊構成:

  • workflow(工做流):該組件用於定義和執行一個特定順序的mapreduce,hive和pig做業。由咱們須要處理的每一個工做組成,進行需求的流式處理。
  • Coordinator(協調器):可將多個工做流協調成一個工做流來進行處理。多個workflow能夠組成一個coordinator,能夠把前幾個workflow的輸出做爲後 一個workflow的輸入,也能夠定義workflow的觸發條件,來作定時觸發。
  • Bundle Job:綁定多個coordinator,一塊兒提交或觸發全部coordinator,是對一堆coordinator的抽象。
  • Oozie SLA(服務器等級協定):該組件支持workflow應用程序執行過程的記錄跟蹤。

咱們就從無到有,看看一個Workflow從提交到最後是如何運行的,假設這個workflow開始後,進入一個hive action,這個hive自己配置的是由tez引擎執行 。下面是代碼簡化版。

<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive-wf">
  
    <start to="hive-node"/>

    <action name="hive-node">
        <hive xmlns="uri:oozie:hive-action:0.5">
            <script>hive.sql</script>
        </hive>
        <ok to="end"/>
        <error to="fail"/>
    </action>

    <kill name="fail">
       <message>Hive failed, error message</message>
    </kill>
    
    <end name="end"/>
</workflow-app>

0x03 Oozie client

Oozie Client是用戶用來提交任務給Oozie Server的途徑,其能夠啓動任務,中止任務,提交任務,開始任務,查看任務執行狀況。好比啓動任務以下:

oozie job -oozie oozie_url -config job.properties_address -run

3.1 程序入口

既然有啓動腳本,咱們就直接去裏面探尋程序入口。

${JAVA_BIN} ${OOZIE_CLIENT_OPTS} -cp ${OOZIECPPATH} org.apache.oozie.cli.OozieCLI "${@}"

這就看到了Client 的入口類,咱們去看看。

public class OozieCLI {
      public static void main(String[] args) {
        if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) {
            System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true");
        }
        System.exit(new OozieCLI().run(args));
    }
}

咱們能夠看到,通過驗證以後,程序直接從main函數進入到了run函數。

public class OozieCLI {
     public synchronized int run(String[] args) {
        final CLIParser parser = getCLIParser();
        try {
            final CLIParser.Command command = parser.parse(args);
            String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION);

            if (doAsUser != null) {
                OozieClient.doAs(doAsUser, new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        processCommand(parser, command);
                        return null;
                    }
                });
            }
            else {
                processCommand(parser, command);
            }
            return 0;
        }
    }
}

看來主要的內容是在這個processCommand裏面,其會根據命令調用相應的命令方法。經過command.getName()咱們能夠清楚的知道Oozie目前支持什麼種類的任務,好比 JOB_CMD,JOBS_CMD,PIG_CMD,SQOOP_CMD,MR_CMD。

public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception {
        switch (command.getName()) {
            case JOB_CMD:
                jobCommand(command.getCommandLine());
                break;
            case JOBS_CMD:
                jobsCommand(command.getCommandLine());
                break;
            case HIVE_CMD:
                scriptLanguageCommand(command.getCommandLine(), HIVE_CMD);
                break;
            ......
            default:
                parser.showHelp(command.getCommandLine());
        }
}

3.2 Hive爲例

咱們以Hive爲例看看如何處理。Hive就是調用 scriptLanguageCommand。

private void scriptLanguageCommand(CommandLine commandLine, String jobType){
    List<String> args = commandLine.getArgList();
    try {
        XOozieClient wc = createXOozieClient(commandLine);
        Properties conf = getConfiguration(wc, commandLine);
        String script = commandLine.getOptionValue(SCRIPTFILE_OPTION);
        List<String> paramsList = new ArrayList<>();
        ......
        System.out.println(JOB_ID_PREFIX + wc.submitScriptLanguage(conf, script,
                    args.toArray(new String[args.size()]),
                    paramsList.toArray(new String[paramsList.size()]), jobType));      
    }
}

這裏關鍵代碼是:wc.submitScriptLanguage,因此咱們須要看看XOozieClient.submitScriptLanguage。其註釋代表做用是經過HTTP來提交 Pig 或者 Hive。

public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) throws IOException, OozieClientException {

    switch (jobType) {
        case OozieCLI.HIVE_CMD:
            script = XOozieClient.HIVE_SCRIPT;
            options = XOozieClient.HIVE_OPTIONS;
            scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS;
            break;
        case OozieCLI.PIG_CMD:
            ......
    }

    conf.setProperty(script, readScript(scriptFile));
    setStrings(conf, options, args);
    setStrings(conf, scriptParams, params);

    return (new HttpJobSubmit(conf, jobType)).call();
}

HttpJobSubmit就是向Oozie Server提交job,因此咱們最終是須要去Oozie Server探究。

private class HttpJobSubmit extends ClientCallable<String> {
    @Override
    protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
        conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE);
        writeToXml(conf, conn.getOutputStream());
        if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) {
            JSONObject json = (JSONObject) JSONValue.parse(
                    new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8));
            return (String) json.get(JsonTags.JOB_ID);
        }
        return null;
    }
}

0x04 Oozie Server

4.1 我是個web程序

前面咱們提到,Oozie Server是運行於Java Servlet容器(Tomcat)中的web應用程序。因此具體啓動等配置信息是在web.xml中。好久沒有看到web.xml了,忽然以爲好陌生,嘿嘿。

<!-- Servlets -->
<servlet>
    <servlet-name>callback</servlet-name>
    <display-name>Callback Notification</display-name>
    <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
</servlet>

<servlet>
    <servlet-name>v1jobs</servlet-name>
    <display-name>WS API for Workflow Jobs</display-name>
    <servlet-class>org.apache.oozie.servlet.V1JobsServlet</servlet-class>
    <load-on-startup>1</load-on-startup>
</servlet>

......

4.2 初始化服務

Ooize的不少基礎工做是由Services來完成的,每個service都是一個單例。這些服務的配置信息在ooze-default.xml中

<property>
    <name>oozie.services</name>
    <value>
        org.apache.oozie.service.HadoopAccessorService,
        org.apache.oozie.service.LiteWorkflowAppService,
        org.apache.oozie.service.JPAService,
        org.apache.oozie.service.DBLiteWorkflowStoreService,
        org.apache.oozie.service.CallbackService,
        org.apache.oozie.service.ActionService,
        org.apache.oozie.service.CallableQueueService,
        org.apache.oozie.service.CoordinatorEngineService,
        org.apache.oozie.service.BundleEngineService,
        org.apache.oozie.service.DagEngineService,
        ......
    </value>
</property>

ServicesLoader這個類用來啓動,加載配置的全部service。

public class ServicesLoader implements ServletContextListener {
    private static Services services;
    /**
     * Initialize Oozie services.
     */
    public void contextInitialized(ServletContextEvent event) {
        services = new Services();
        services.init();
    }
}

init函數是用來初始化全部配置好的Services,若是有同類型服務,則後來者會被存儲。

public class Services {
      public void init() throws ServiceException {
 			 loadServices();	
      }  
  
    private void loadServices() throws ServiceException {
        try {
            Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>();
            Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES);
            Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES);

            List<Service> list = new ArrayList<Service>();
            loadServices(classes, list);
            loadServices(classesExt, list);

            //removing duplicate services, strategy: last one wins
            for (Service service : list) {
                if (map.containsKey(service.getInterface())) {
                      service.getClass());
                }
                map.put(service.getInterface(), service);
            }
            for (Map.Entry<Class<?>, Service> entry : map.entrySet()) {
                setService(entry.getValue().getClass());
            }
        } 
    }  
}

4.3 從Job到DAG

客戶經過oozie腳本提交job以後,進入org.apache.oozie.cli.OozieCLI。會生成一個OozieClient,而後使用JobCommand,提交運行的信息到V1JosServlet的doPost接口,Oozier在doPos接口中會調用submitJob()方法。此時會生成一個DAG對象,而後DAG.submitJon(JobConf,startJob)。

咱們從V1JosServlet.doPost入手。這裏是基類。

public abstract class BaseJobsServlet extends JsonRestServlet {
      protected void doPost(HttpServletRequest request,
            HttpServletResponse response) throws ServletException, IOException {
         JSONObject json = submitJob(request, conf);
      }
}

而後回到 V1JosServlet.submitJob

@Override
protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,IOException {
    String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM);

    if (jobType == null) {
            String wfPath = conf.get(OozieClient.APP_PATH);
 
            if (wfPath != null) {
                json = submitWorkflowJob(request, conf); // 咱們的目標在這裏
            }
            else if (coordPath != null) {
                json = submitCoordinatorJob(request, conf);
            }
            else {
                json = submitBundleJob(request, conf);
            }
    }
    else { // This is a http submission job
       ......
    }
    return json;
}

而後調用到了 DagEngine.submitJob。從其註釋能夠看出 The DagEngine provides all the DAG engine functionality for WS calls. 這樣咱們就正式來到了DAG的世界

private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException {
    try {
        String action = request.getParameter(RestConstants.ACTION_PARAM);
        DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
        if (action != null) {
            dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN));
        }
        if (dryrun) {
            id = dagEngine.dryRunSubmit(conf);
        }
        else {
            id = dagEngine.submitJob(conf, startJob); // 咱們在這裏
        }
        json.put(JsonTags.JOB_ID, id);
    }
    return json;
}

0x06 核心引擎

Oozie有三種核心引擎,其都是繼承抽象類BaseEngine。

這三種引擎是:

  • DAGEngine,負責workflow執行,咱們上面代碼就會來到這裏.....
  • CoordinatorEngine,負責coordinator執行
  • BundleEngine,負責bundle執行

分別對應

  • org.apache.oozie.service.CoordinatorEngineService
  • org.apache.oozie.service.BundleEngineService
  • org.apache.oozie.service.DagEngineService

咱們以前提到,這些屬於系統Services,都是Singletgon,在Oozie啓動時候會加入到Services中。當須要時候經過get來獲取。

public class Services {
		private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>();
  
		public <T extends Service> T get(Class<T> serviceKlass) {
    		return (T) services.get(serviceKlass);
		}
}

具體在V1JosServlet中調用舉例:

String user = conf.get(OozieClient.USER_NAME);
DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);

0x07 Command推進執行

Oozie把全部命令抽象成Command,這樣其內部把程序執行總結成用Command來推進,相似於消息驅動

Command分爲同步和異步。其基類都是XCommand。XCommand提供以下模式:

  • single execution: a command instance can be executed only once
  • eager data loading: loads data for eager precondition check
  • eager precondition check: verify precondition before obtaining lock
  • data loading: loads data for precondition check and execution
  • precondition check: verifies precondition for execution is still met
  • locking: obtains exclusive lock on key before executing the command
  • execution: command logic
public abstract class XCommand<T> implements XCallable<T> {
    ......
    private String key;
    private String name;
    private String type;
    private AtomicBoolean used = new AtomicBoolean(false);
    private Map<Long, List<XCommand<?>>> commandQueue;
    protected Instrumentation instrumentation;
    ......
}

XCommand的父接口XCallable繼承了java.util.concurrent.Callable。最終目的是當異步執行時候,基於優先級來排列命令的執行計劃。

因此XCommand的幾個關鍵函數就是:queue,call,execute:

  • queue :向commandQueue加入一個command,這個command是在當前command執行以後,作delayed execution。在當前command執行過程當中加入的具備一樣的delay的commands,會後續順序(single serial)執行。
  • call就是繼承的Callable實現函數,會調用到execute。
  • execute則是具體Command實現本身的具體業務。

從咱們常見的SubmitXCommand來看,繼承關係以下:

public class SubmitXCommand extends WorkflowXCommand<String> 
public abstract class WorkflowXCommand<T> extends XCommand<T> 
public abstract class XCommand<T> implements XCallable<T> 
public interface XCallable<T> extends Callable<T>

再好比TransitionXCommand的繼承關係:

abstract class TransitionXCommand<T> extends XCommand<T> 
public abstract class SubmitTransitionXCommand extends TransitionXCommand<String>

從以前的組件能夠看到,任務是有狀態機的概念的,準備,開始,運行中,失敗結束 等等,因此對任務進行操做的命令同時須要處理狀態機的變化,oozie處理任務的命令都須要繼承TransitionXCommand這個抽象類,而TransitionXCommand的父類是XCommand。

0x08 引擎處理提交

前面提到,doPost 會調用到 id = dagEngine.submitJob(conf, startJob);

咱們看看DAGEngine是如何處理提交的任務。

首先經過SubmitXCommand直接運行其call()來提交job。

public String submitJob(Configuration conf, boolean startJob) throws DagEngineException {
    validateSubmitConfiguration(conf);
    try {
        String jobId;
        SubmitXCommand submit = new SubmitXCommand(conf);
        jobId = submit.call();
        if (startJob) {
            start(jobId);
        }
        return jobId;
    }
}

而後經過StartXCommand來啓動Job。從註釋中咱們能夠看到,此時依然是同步執行 (經過主動執行call()函數)。

public void start(String jobId) throws DagEngineException {
    // Changing to synchronous call from asynchronous queuing to prevent the
    // loss of command if the queue is full or the queue is lost in case of
    // failure.
    new StartXCommand(jobId).call();
}

8.1 SubmitXCommand

SubmitXCommand處理的是提交工做,將用戶提交的任務解析後更新到數據庫。

主要業務是在execute中實現。

  1. 解析配置,獲取WorkflowApp
  2. 建立WorkflowInstance
  3. 生成 WorkflowJobBean
  4. 經過JPA保存WorkflowJobBean 到wf_jobs

代碼摘要以下:

protected String execute() throws CommandException {

    WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
    try {
        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
        FileSystem fs = has.createFileSystem(user, uri, fsConf);

        // 解析配置,獲取WorkflowApp
        WorkflowApp app = wps.parseDef(conf, defaultConf);

        // 建立WorkflowInstance
        WorkflowInstance wfInstance;
        wfInstance = workflowLib.createInstance(app, conf);

        // 生成 WorkflowJobBean
        WorkflowJobBean workflow = new WorkflowJobBean();
        workflow.setId(wfInstance.getId());
        workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf));
        workflow.setAppPath(conf.get(OozieClient.APP_PATH));
        workflow.setConf(XmlUtils.prettyPrint(conf).toString());
        ......
        workflow.setWorkflowInstance(wfInstance);
        workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID));

        if (!dryrun) {
            workflow.setSlaXml(jobSlaXml);
            // 添加到臨時list
            insertList.add(workflow); 
            JPAService jpaService = Services.get().get(JPAService.class);
            if (jpaService != null) {
                // 保存WorkflowJobBean 到wf_jobs
    BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
                }
            }
            return workflow.getId();
        }
}

其中insertList是用來臨時存儲 WorkflowJobBean

private List<JsonBean> insertList = new ArrayList<JsonBean>();

WorkflowJobBean 對應數據庫中表 WF_JOBS。

public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean {
    ......//省略其餘變量
    @Transient
    private List<WorkflowActionBean> actions;
}

在Oozie爲了方便將用戶定義的Action以及Workflow進行管理,底層使用Jpa將這些數據存儲於數據庫中。具體是調用executeBatchInsertUpdateDelete來經過JPA插入到數據庫。

BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);

具體BatchQueryExecutor代碼以下。

public class BatchQueryExecutor {
    public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,Collection<JsonBean> deleteList) {
        List<QueryEntry> queryList = new ArrayList<QueryEntry>();
        JPAService jpaService = Services.get().get(JPAService.class);
        EntityManager em = jpaService.getEntityManager();

        if (updateList != null) {
            for (UpdateEntry entry : updateList) {
                Query query = null;
                JsonBean bean = entry.getBean();
                if (bean instanceof WorkflowJobBean) {
                    // 咱們程序在這裏
                    query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
                            (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em);
                }
                else if (bean instanceof WorkflowActionBean) {
                    query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
                            (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em);
                }
                else if {
                  //此處省略衆多其餘類型
                }
                queryList.add(new QueryEntry(entry.getQueryName(), query));
            }
        }
        // 這裏插入數據庫
        jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em);
    }  
}

JPA摘要代碼以下:

public class JPAService implements Service, Instrumentable {
  
    private OperationRetryHandler retryHandler;
  
    public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans, final List<QueryEntry> updateQueryList, final Collection<JsonBean> deleteBeans, final EntityManager em) {
      
        try {
            retryHandler.executeWithRetry(new Callable<Void>() {

                public Void call() throws Exception {
                   ......
                    if (CollectionUtils.isNotEmpty(insertBeans)) {
                        for (final JsonBean bean : insertBeans) {
                            em.persist(bean);
                        }
                    }
                   ......
                }
            });
        }
    }
}

這樣,一個Workflow Job就存儲到了數據庫中。

8.2 workflow生命週期

首先介紹下workflow生命週期,咱們代碼立刻會用到PREP狀態。

  • prep:一個工做流第一次建立就處於prep狀態,表示工做流以及建立可是尚未運行。

  • running:當一個已經被建立的工做流job開始執行的時候,就處於running狀態。它不會達到結束狀態,只能由於出錯而結束,或者被掛起。

  • suspended:一個running狀態的工做流job會變成suspended狀態,並且它會一直處於該狀態,除非這個工做流job被從新開始執行或者被殺死。

  • killed:當一個工做流job處於被建立後的狀態,或者處於running,suspended狀態時,被殺死,則工做流job的狀態變爲killed狀態。

  • failed:當一個工做流job不可預期的錯誤失敗而終止,就會變爲failed狀態。

8.3 StartXCommand

處理完SubmitXCommand以後,Oozie Server 立刻處理StartXCommand

StartXCommand 的做用是啓動Command,其繼承了SignalXCommand ,因此 StartXCommand(jobId).call();調用到了SignalXCommand的call。

public class StartXCommand extends SignalXCommand

相關代碼以下:

首先,StartXCommand調用基類構造函數

public StartXCommand(String id) {
        super("start", 1, id);
        InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation());
}

而後,SignalXCommand獲得了jobId,這個就是以前SubmitXCommand生成而且傳回來的。

public SignalXCommand(String name, int priority, String jobId) {
    super(name, name, priority);
    this.jobId = ParamChecker.notEmpty(jobId, "jobId");
}

call()首先調用到 SignalXCommand.loadState。其會根據jobId從數據庫中讀取Workflow job信息。

protected void loadState() throws CommandException {
    try {
        jpaService = Services.get().get(JPAService.class);
        if (jpaService != null) {
            this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
            if (actionId != null) {
                this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId);
            }
        }
}

SQL語句以下:

@NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),

call()接着調用SignalXCommand.execute(),這裏具體操做以下:

  • 1)execute中,由於狀態是PREP,因此調用workflowInstance.start,這裏對應的實例是LiteWorkflowInstance
    • 1.1) LiteWorkflowInstance.start 調用 signal()
      • 1.1.1) signal() 調用 exiting = nodeHandler.enter(context),實際調用的是 LiteActionHandler.enter
        • 1.1.1.1) 調用 LiteWorkflowStoreService.liteExecute,這裏是生成WorkflowActionBean,而後添加到臨時變量ACTIONS_TO_START
          • 1.1.1.1.1) WorkflowActionBean action = new WorkflowActionBean();
          • 1.1.1.1.2) action.setJobId(jobId); 作其餘各類設置
          • 1.1.1.1.3) List list = (List) context.getTransientVar(ACTIONS_TO_START);
          • 1.1.1.1.4) list.add(action); 添加到臨時列表
    • 1.2) 回到 signal(), 由於start 是 同步操做,因此exiting 爲 true
      • 1.2.1) signal all new synch transitions。遍歷 pathsToStar,若是有同步跳轉,則開始進行後一步Action的跳轉,即 signal(pathToStart, "::synch:😊;
  • 2)回到execute(),遍歷WorkflowStoreService.getActionsToStart(workflowInstance),即從ACTIONS_TO_START取Action(由於以前剛剛放入一個,因此此次獲取到)
    • 2.1) for 循環中遍歷,由於是submit,因此 syncAction = newAction;
  • 3)execute() 會調用BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete把WorkflowActionBean寫入到數據庫
  • 4)execute() 直接啓動start command:new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();

代碼以下:

protected Void execute() throws CommandException {
    WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
    workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
    WorkflowJob.Status prevStatus = wfJob.getStatus();
    WorkflowActionBean syncAction = null;
    List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>();

    if (wfAction == null) {
           if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
                // 對於上面的 1)
                completed = workflowInstance.start();
                wfJob.setStatus(WorkflowJob.Status.RUNNING);
                wfJob.setStartTime(new Date());
                wfJob.setWorkflowInstance(workflowInstance);
                generateEvent = true;
                queue(new WorkflowNotificationXCommand(wfJob));
            }
    }
    else { 
      ...... 
    }
    if (completed) {
      ......
    }
    else {
       // 對於上面最外層的 2)
       for (WorkflowActionBean newAction : 
            WorkflowStoreService.getActionsToStart(workflowInstance)) {
         insertList.add(newAction);
         if (wfAction != null) { // null during wf job submit
            // 註釋指出,wf job 提交時候不走這裏
            .....
         }
         else {
            syncAction = newAction; // first action after wf submit should always be sync
         }
       }
    }
    // 寫入 WorkflowActionBean,對於上面的 3)
    BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
    if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
            ......
    }
    else if (syncAction != null) {
        // 直接調用 call(),對應上面的 4)
        new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
    }
}

8.4 數據庫信息

Workflow 數據庫信息從WorkflowActionBean中能夠看出,咱們這裏要重點說明的就是transition字段,Oozie用transition來記錄本Action下一步要跳轉到哪裏。WorkflowActionBean摘要以下:

public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
    @Id
    private String id;

    @Basic
    @Index
    @Column(name = "wf_id")
    private String wfId = null;

    @Basic
    @Index
    @Column(name = "status")
    private String statusStr = WorkflowAction.Status.PREP.toString();

    @Basic
    @Column(name = "execution_path", length = 1024)
    private String executionPath = null;

    @Basic
    @Column(name = "transition")
    private String transition = null;

    @Basic
    @Column(name = "data")
    @Lob
    @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler")
    private StringBlob data;
}

8.5 同步執行

同步執行的跳轉主要是在LiteWorkflowInstance.signal這裏體現,若是命令結束後發現後續還有同步跳轉,則就繼續執行。

public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException {
                    // signal all new synch transitions
                    for (String pathToStart : pathsToStart) {
                        signal(pathToStart, "::synch::");
                    }  
}

至此,程序提交已經完成,後續就是程序在Oozie內部的執行階段,這就是從 ActionStartXCommand 開始了。

0xFF 參考

大數據之Oozie——源碼分析(一)程序入口

什麼是Oozie——大數據任務調度框架

Oozie基礎小結

【原創】大數據基礎之Oozie(1)簡介、源代碼解析

【原創】大叔經驗分享(6)Oozie如何查看提交到Yarn上的任務日誌

Oozie和Azkaban的技術選型和對比

Oozie-TransitionXCommand

Oozie-Service-CallableQueueService

YARN基本框架分析

Oozie任務調度阻塞及內存優化方法

相關文章
相關標籤/搜索