Oozie是由Cloudera公司貢獻給Apache的基於工做流引擎的開源框架,是Hadoop平臺的開源的工做流調度引擎,用來管理Hadoop做業。本文是系列的第一篇,介紹Oozie的任務提交階段。html
咱們從需求逆推實現,即考慮若是咱們從無到有實現工做流引擎,咱們須要實現哪些部分?從而咱們能夠提出一系列問題從而去Oozie中探尋。java
做爲工做流引擎須要實現哪些部分?大體想了想,以爲須要有:node
由於篇幅和精力所限,咱們沒法研究全部源碼,回答全部問題,因此咱們先整理出部分問題,在後面Oozie源碼分析中一一解答:web
Oozie由Oozie client和Oozie Server兩個組件構成,Oozie Server是運行於Java Servlet容器(Tomcat)中的web應用程序。Oozie client用於給Oozie Server說起任務,Oozie client 提交任務的途徑是HTTP請求。sql
實際上Oozie Server就至關於Hadoop的一個客戶端,當用戶須要執行多個關聯的MR任務時,只須要將MR執行順序寫入workflow.xml,而後使用Oozie Server提交本次任務,Oozie Server會託管此任務流。shell
Oozie Server 具體操做的是workflow,即Oozie主要維護workflow的執行 / workflow內部Action的串聯和跳轉。數據庫
具體Action的執行是由Yarn去執行,Yarn會把Action分配給有充足資源的節點執行。Action是異步執行,因此Action結束時候會經過回調方式通知Oozie執行結果,Oozie也會採用輪詢方式去獲取Action結果(爲了提升可靠性)。apache
大體提交流程以下:json
Oozie client ------> Oozie Server -------> Yarn ------> Hadoop
Oozie特色以下:服務器
Oozie主要由如下功能模塊構成:
咱們就從無到有,看看一個Workflow從提交到最後是如何運行的,假設這個workflow開始後,進入一個hive action,這個hive自己配置的是由tez引擎執行 。下面是代碼簡化版。
<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive-wf"> <start to="hive-node"/> <action name="hive-node"> <hive xmlns="uri:oozie:hive-action:0.5"> <script>hive.sql</script> </hive> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Hive failed, error message</message> </kill> <end name="end"/> </workflow-app>
Oozie Client是用戶用來提交任務給Oozie Server的途徑,其能夠啓動任務,中止任務,提交任務,開始任務,查看任務執行狀況。好比啓動任務以下:
oozie job -oozie oozie_url -config job.properties_address -run
既然有啓動腳本,咱們就直接去裏面探尋程序入口。
${JAVA_BIN} ${OOZIE_CLIENT_OPTS} -cp ${OOZIECPPATH} org.apache.oozie.cli.OozieCLI "${@}"
這就看到了Client 的入口類,咱們去看看。
public class OozieCLI { public static void main(String[] args) { if (!System.getProperties().containsKey(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP)) { System.setProperty(AuthOozieClient.USE_AUTH_TOKEN_CACHE_SYS_PROP, "true"); } System.exit(new OozieCLI().run(args)); } }
咱們能夠看到,通過驗證以後,程序直接從main函數進入到了run函數。
public class OozieCLI { public synchronized int run(String[] args) { final CLIParser parser = getCLIParser(); try { final CLIParser.Command command = parser.parse(args); String doAsUser = command.getCommandLine().getOptionValue(DO_AS_OPTION); if (doAsUser != null) { OozieClient.doAs(doAsUser, new Callable<Void>() { @Override public Void call() throws Exception { processCommand(parser, command); return null; } }); } else { processCommand(parser, command); } return 0; } } }
看來主要的內容是在這個processCommand裏面,其會根據命令調用相應的命令方法。經過command.getName()
咱們能夠清楚的知道Oozie目前支持什麼種類的任務,好比 JOB_CMD,JOBS_CMD,PIG_CMD,SQOOP_CMD,MR_CMD。
public void processCommand(CLIParser parser, CLIParser.Command command) throws Exception { switch (command.getName()) { case JOB_CMD: jobCommand(command.getCommandLine()); break; case JOBS_CMD: jobsCommand(command.getCommandLine()); break; case HIVE_CMD: scriptLanguageCommand(command.getCommandLine(), HIVE_CMD); break; ...... default: parser.showHelp(command.getCommandLine()); } }
咱們以Hive爲例看看如何處理。Hive就是調用 scriptLanguageCommand。
private void scriptLanguageCommand(CommandLine commandLine, String jobType){ List<String> args = commandLine.getArgList(); try { XOozieClient wc = createXOozieClient(commandLine); Properties conf = getConfiguration(wc, commandLine); String script = commandLine.getOptionValue(SCRIPTFILE_OPTION); List<String> paramsList = new ArrayList<>(); ...... System.out.println(JOB_ID_PREFIX + wc.submitScriptLanguage(conf, script, args.toArray(new String[args.size()]), paramsList.toArray(new String[paramsList.size()]), jobType)); } }
這裏關鍵代碼是:wc.submitScriptLanguage,因此咱們須要看看XOozieClient.submitScriptLanguage。其註釋代表做用是經過HTTP來提交 Pig 或者 Hive。
public String submitScriptLanguage(Properties conf, String scriptFile, String[] args, String[] params, String jobType) throws IOException, OozieClientException { switch (jobType) { case OozieCLI.HIVE_CMD: script = XOozieClient.HIVE_SCRIPT; options = XOozieClient.HIVE_OPTIONS; scriptParams = XOozieClient.HIVE_SCRIPT_PARAMS; break; case OozieCLI.PIG_CMD: ...... } conf.setProperty(script, readScript(scriptFile)); setStrings(conf, options, args); setStrings(conf, scriptParams, params); return (new HttpJobSubmit(conf, jobType)).call(); }
而HttpJobSubmit就是向Oozie Server提交job,因此咱們最終是須要去Oozie Server探究。
private class HttpJobSubmit extends ClientCallable<String> { @Override protected String call(HttpURLConnection conn) throws IOException, OozieClientException { conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); writeToXml(conf, conn.getOutputStream()); if (conn.getResponseCode() == HttpURLConnection.HTTP_CREATED) { JSONObject json = (JSONObject) JSONValue.parse( new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8)); return (String) json.get(JsonTags.JOB_ID); } return null; } }
前面咱們提到,Oozie Server是運行於Java Servlet容器(Tomcat)中的web應用程序。因此具體啓動等配置信息是在web.xml中。好久沒有看到web.xml了,忽然以爲好陌生,嘿嘿。
<!-- Servlets --> <servlet> <servlet-name>callback</servlet-name> <display-name>Callback Notification</display-name> <servlet-class>org.apache.oozie.servlet.CallbackServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> <servlet> <servlet-name>v1jobs</servlet-name> <display-name>WS API for Workflow Jobs</display-name> <servlet-class>org.apache.oozie.servlet.V1JobsServlet</servlet-class> <load-on-startup>1</load-on-startup> </servlet> ......
Ooize的不少基礎工做是由Services來完成的,每個service都是一個單例。這些服務的配置信息在ooze-default.xml中
<property> <name>oozie.services</name> <value> org.apache.oozie.service.HadoopAccessorService, org.apache.oozie.service.LiteWorkflowAppService, org.apache.oozie.service.JPAService, org.apache.oozie.service.DBLiteWorkflowStoreService, org.apache.oozie.service.CallbackService, org.apache.oozie.service.ActionService, org.apache.oozie.service.CallableQueueService, org.apache.oozie.service.CoordinatorEngineService, org.apache.oozie.service.BundleEngineService, org.apache.oozie.service.DagEngineService, ...... </value> </property>
ServicesLoader這個類用來啓動,加載配置的全部service。
public class ServicesLoader implements ServletContextListener { private static Services services; /** * Initialize Oozie services. */ public void contextInitialized(ServletContextEvent event) { services = new Services(); services.init(); } }
init函數是用來初始化全部配置好的Services,若是有同類型服務,則後來者會被存儲。
public class Services { public void init() throws ServiceException { loadServices(); } private void loadServices() throws ServiceException { try { Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>(); Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES); Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES); List<Service> list = new ArrayList<Service>(); loadServices(classes, list); loadServices(classesExt, list); //removing duplicate services, strategy: last one wins for (Service service : list) { if (map.containsKey(service.getInterface())) { service.getClass()); } map.put(service.getInterface(), service); } for (Map.Entry<Class<?>, Service> entry : map.entrySet()) { setService(entry.getValue().getClass()); } } } }
客戶經過oozie腳本提交job以後,進入org.apache.oozie.cli.OozieCLI。會生成一個OozieClient,而後使用JobCommand,提交運行的信息到V1JosServlet的doPost接口,Oozier在doPos接口中會調用submitJob()方法。此時會生成一個DAG對象,而後DAG.submitJon(JobConf,startJob)。
咱們從V1JosServlet.doPost入手。這裏是基類。
public abstract class BaseJobsServlet extends JsonRestServlet { protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { JSONObject json = submitJob(request, conf); } }
而後回到 V1JosServlet.submitJob
@Override protected JSONObject submitJob(HttpServletRequest request, Configuration conf) throws XServletException,IOException { String jobType = request.getParameter(RestConstants.JOBTYPE_PARAM); if (jobType == null) { String wfPath = conf.get(OozieClient.APP_PATH); if (wfPath != null) { json = submitWorkflowJob(request, conf); // 咱們的目標在這裏 } else if (coordPath != null) { json = submitCoordinatorJob(request, conf); } else { json = submitBundleJob(request, conf); } } else { // This is a http submission job ...... } return json; }
而後調用到了 DagEngine.submitJob。從其註釋能夠看出 The DagEngine provides all the DAG engine functionality for WS calls. 這樣咱們就正式來到了DAG的世界。
private JSONObject submitWorkflowJob(HttpServletRequest request, Configuration conf) throws XServletException { try { String action = request.getParameter(RestConstants.ACTION_PARAM); DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); if (action != null) { dryrun = (action.equals(RestConstants.JOB_ACTION_DRYRUN)); } if (dryrun) { id = dagEngine.dryRunSubmit(conf); } else { id = dagEngine.submitJob(conf, startJob); // 咱們在這裏 } json.put(JsonTags.JOB_ID, id); } return json; }
Oozie有三種核心引擎,其都是繼承抽象類BaseEngine。
這三種引擎是:
分別對應
咱們以前提到,這些屬於系統Services,都是Singletgon,在Oozie啓動時候會加入到Services中。當須要時候經過get來獲取。
public class Services { private Map<Class<? extends Service>, Service> services = new LinkedHashMap<Class<? extends Service>, Service>(); public <T extends Service> T get(Class<T> serviceKlass) { return (T) services.get(serviceKlass); } }
具體在V1JosServlet中調用舉例:
String user = conf.get(OozieClient.USER_NAME); DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
Oozie把全部命令抽象成Command,這樣其內部把程序執行總結成用Command來推進,相似於消息驅動。
Command分爲同步和異步。其基類都是XCommand。XCommand提供以下模式:
public abstract class XCommand<T> implements XCallable<T> { ...... private String key; private String name; private String type; private AtomicBoolean used = new AtomicBoolean(false); private Map<Long, List<XCommand<?>>> commandQueue; protected Instrumentation instrumentation; ...... }
XCommand的父接口XCallable繼承了java.util.concurrent.Callable
。最終目的是當異步執行時候,基於優先級來排列命令的執行計劃。
因此XCommand的幾個關鍵函數就是:queue,call,execute:
從咱們常見的SubmitXCommand來看,繼承關係以下:
public class SubmitXCommand extends WorkflowXCommand<String> public abstract class WorkflowXCommand<T> extends XCommand<T> public abstract class XCommand<T> implements XCallable<T> public interface XCallable<T> extends Callable<T>
再好比TransitionXCommand的繼承關係:
abstract class TransitionXCommand<T> extends XCommand<T> public abstract class SubmitTransitionXCommand extends TransitionXCommand<String>
從以前的組件能夠看到,任務是有狀態機的概念的,準備,開始,運行中,失敗結束 等等,因此對任務進行操做的命令同時須要處理狀態機的變化,oozie處理任務的命令都須要繼承TransitionXCommand這個抽象類,而TransitionXCommand的父類是XCommand。
前面提到,doPost 會調用到 id = dagEngine.submitJob(conf, startJob);
咱們看看DAGEngine是如何處理提交的任務。
首先經過SubmitXCommand直接運行其call()來提交job。
public String submitJob(Configuration conf, boolean startJob) throws DagEngineException { validateSubmitConfiguration(conf); try { String jobId; SubmitXCommand submit = new SubmitXCommand(conf); jobId = submit.call(); if (startJob) { start(jobId); } return jobId; } }
而後經過StartXCommand來啓動Job。從註釋中咱們能夠看到,此時依然是同步執行 (經過主動執行call()函數)。
public void start(String jobId) throws DagEngineException { // Changing to synchronous call from asynchronous queuing to prevent the // loss of command if the queue is full or the queue is lost in case of // failure. new StartXCommand(jobId).call(); }
SubmitXCommand處理的是提交工做,將用戶提交的任務解析後更新到數據庫。
主要業務是在execute中實現。
代碼摘要以下:
protected String execute() throws CommandException { WorkflowAppService wps = Services.get().get(WorkflowAppService.class); try { HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); FileSystem fs = has.createFileSystem(user, uri, fsConf); // 解析配置,獲取WorkflowApp WorkflowApp app = wps.parseDef(conf, defaultConf); // 建立WorkflowInstance WorkflowInstance wfInstance; wfInstance = workflowLib.createInstance(app, conf); // 生成 WorkflowJobBean WorkflowJobBean workflow = new WorkflowJobBean(); workflow.setId(wfInstance.getId()); workflow.setAppName(ELUtils.resolveAppName(app.getName(), conf)); workflow.setAppPath(conf.get(OozieClient.APP_PATH)); workflow.setConf(XmlUtils.prettyPrint(conf).toString()); ...... workflow.setWorkflowInstance(wfInstance); workflow.setExternalId(conf.get(OozieClient.EXTERNAL_ID)); if (!dryrun) { workflow.setSlaXml(jobSlaXml); // 添加到臨時list insertList.add(workflow); JPAService jpaService = Services.get().get(JPAService.class); if (jpaService != null) { // 保存WorkflowJobBean 到wf_jobs BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); } } return workflow.getId(); } }
其中insertList是用來臨時存儲 WorkflowJobBean
private List<JsonBean> insertList = new ArrayList<JsonBean>();
WorkflowJobBean 對應數據庫中表 WF_JOBS。
public class WorkflowJobBean implements Writable, WorkflowJob, JsonBean { ......//省略其餘變量 @Transient private List<WorkflowActionBean> actions; }
在Oozie爲了方便將用戶定義的Action以及Workflow進行管理,底層使用Jpa將這些數據存儲於數據庫中。具體是調用executeBatchInsertUpdateDelete來經過JPA插入到數據庫。
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
具體BatchQueryExecutor代碼以下。
public class BatchQueryExecutor { public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,Collection<JsonBean> deleteList) { List<QueryEntry> queryList = new ArrayList<QueryEntry>(); JPAService jpaService = Services.get().get(JPAService.class); EntityManager em = jpaService.getEntityManager(); if (updateList != null) { for (UpdateEntry entry : updateList) { Query query = null; JsonBean bean = entry.getBean(); if (bean instanceof WorkflowJobBean) { // 咱們程序在這裏 query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery( (WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em); } else if (bean instanceof WorkflowActionBean) { query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery( (WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em); } else if { //此處省略衆多其餘類型 } queryList.add(new QueryEntry(entry.getQueryName(), query)); } } // 這裏插入數據庫 jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em); } }
JPA摘要代碼以下:
public class JPAService implements Service, Instrumentable { private OperationRetryHandler retryHandler; public void executeBatchInsertUpdateDelete(final Collection<JsonBean> insertBeans, final List<QueryEntry> updateQueryList, final Collection<JsonBean> deleteBeans, final EntityManager em) { try { retryHandler.executeWithRetry(new Callable<Void>() { public Void call() throws Exception { ...... if (CollectionUtils.isNotEmpty(insertBeans)) { for (final JsonBean bean : insertBeans) { em.persist(bean); } } ...... } }); } } }
這樣,一個Workflow Job就存儲到了數據庫中。
首先介紹下workflow生命週期,咱們代碼立刻會用到PREP狀態。
prep:一個工做流第一次建立就處於prep狀態,表示工做流以及建立可是尚未運行。
running:當一個已經被建立的工做流job開始執行的時候,就處於running狀態。它不會達到結束狀態,只能由於出錯而結束,或者被掛起。
suspended:一個running狀態的工做流job會變成suspended狀態,並且它會一直處於該狀態,除非這個工做流job被從新開始執行或者被殺死。
killed:當一個工做流job處於被建立後的狀態,或者處於running,suspended狀態時,被殺死,則工做流job的狀態變爲killed狀態。
failed:當一個工做流job不可預期的錯誤失敗而終止,就會變爲failed狀態。
處理完SubmitXCommand以後,Oozie Server 立刻處理StartXCommand。
StartXCommand 的做用是啓動Command,其繼承了SignalXCommand ,因此 StartXCommand(jobId).call();
調用到了SignalXCommand的call。
public class StartXCommand extends SignalXCommand
相關代碼以下:
首先,StartXCommand調用基類構造函數
public StartXCommand(String id) { super("start", 1, id); InstrumentUtils.incrJobCounter(getName(), 1, getInstrumentation()); }
而後,SignalXCommand獲得了jobId,這個就是以前SubmitXCommand生成而且傳回來的。
public SignalXCommand(String name, int priority, String jobId) { super(name, name, priority); this.jobId = ParamChecker.notEmpty(jobId, "jobId"); }
call()首先調用到 SignalXCommand.loadState。其會根據jobId從數據庫中讀取Workflow job信息。
protected void loadState() throws CommandException { try { jpaService = Services.get().get(JPAService.class); if (jpaService != null) { this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); if (actionId != null) { this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId); } } }
SQL語句以下:
@NamedQuery(name = "GET_WORKFLOW", query = "select OBJECT(w) from WorkflowJobBean w where w.id = :id"),
call()接着調用SignalXCommand.execute(),這裏具體操做以下:
代碼以下:
protected Void execute() throws CommandException { WorkflowInstance workflowInstance = wfJob.getWorkflowInstance(); workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob); WorkflowJob.Status prevStatus = wfJob.getStatus(); WorkflowActionBean syncAction = null; List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>(); if (wfAction == null) { if (wfJob.getStatus() == WorkflowJob.Status.PREP) { // 對於上面的 1) completed = workflowInstance.start(); wfJob.setStatus(WorkflowJob.Status.RUNNING); wfJob.setStartTime(new Date()); wfJob.setWorkflowInstance(workflowInstance); generateEvent = true; queue(new WorkflowNotificationXCommand(wfJob)); } } else { ...... } if (completed) { ...... } else { // 對於上面最外層的 2) for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) { insertList.add(newAction); if (wfAction != null) { // null during wf job submit // 註釋指出,wf job 提交時候不走這裏 ..... } else { syncAction = newAction; // first action after wf submit should always be sync } } } // 寫入 WorkflowActionBean,對於上面的 3) BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { ...... } else if (syncAction != null) { // 直接調用 call(),對應上面的 4) new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(); } }
Workflow 數據庫信息從WorkflowActionBean中能夠看出,咱們這裏要重點說明的就是transition字段,Oozie用transition來記錄本Action下一步要跳轉到哪裏。WorkflowActionBean摘要以下:
public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean { @Id private String id; @Basic @Index @Column(name = "wf_id") private String wfId = null; @Basic @Index @Column(name = "status") private String statusStr = WorkflowAction.Status.PREP.toString(); @Basic @Column(name = "execution_path", length = 1024) private String executionPath = null; @Basic @Column(name = "transition") private String transition = null; @Basic @Column(name = "data") @Lob @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") private StringBlob data; }
同步執行的跳轉主要是在LiteWorkflowInstance.signal這裏體現,若是命令結束後發現後續還有同步跳轉,則就繼續執行。
public synchronized boolean signal(String executionPath, String signalValue) throws WorkflowException { // signal all new synch transitions for (String pathToStart : pathsToStart) { signal(pathToStart, "::synch::"); } }
至此,程序提交已經完成,後續就是程序在Oozie內部的執行階段,這就是從 ActionStartXCommand 開始了。
【原創】大叔經驗分享(6)Oozie如何查看提交到Yarn上的任務日誌