Oozie由Cloudera公司貢獻給Apache的基於工做流引擎的開源框架,是用於Hadoop平臺的開源的工做流調度引擎,用來管理Hadoop做業,進行。本文是系列的第二篇,介紹Oozie的內部執行階段。html
前文[源碼解析]Oozie的前因後果 --- (1)提交任務階段 已經爲你們展現了用戶提交一個Oozie Job以後作了什麼,本文將沿着一個Workflow的執行流程爲你們繼續剖析Oozie接下來作什麼。java
大體以下:shell
咱們假設Workflow在start以後,就進入到了一個Hive命令。數據庫
ActionStartXCommand的主要做用就是和Yarn交互,最後提交一個Yarn Application Master。apache
ActionStartXCommand是 WorkflowXCommand的子類。重點函數仍是loadState和execute。api
public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext> { private String jobId = null; protected String actionId = null; protected WorkflowJobBean wfJob = null; protected WorkflowActionBean wfAction = null; private JPAService jpaService = null; private ActionExecutor executor = null; private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); private List<JsonBean> insertList = new ArrayList<JsonBean>(); protected ActionExecutorContext context = null; }
loadState 的做用就是從數據庫中獲取 WorkflowJobBean 和 WorkflowActionBean 信息session
protected void loadState() throws CommandException { try { jpaService = Services.get().get(JPAService.class); if (jpaService != null) { if (wfJob == null) { this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId); } this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION, actionId); } } }
execute函數以下。其主要業務就是executor.start(context, wfAction);
這裏的executor是HiveActionExecutor。併發
@Override protected ActionExecutorContext execute() throws CommandException { Configuration conf = wfJob.getWorkflowInstance().getConf(); try { if(!caught) { // 這裏是業務重點,就是啓動任務 executor.start(context, wfAction); if (wfAction.isExecutionComplete()) { if (!context.isExecuted()) { failJob(context); } else { wfAction.setPending(); if (!(executor instanceof ControlNodeActionExecutor)) { queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); } else { execSynchronous = true; } } } updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, wfAction)); } } finally { BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); ...... if (execSynchronous) { // Changing to synchronous call from asynchronous queuing to prevent // undue delay from ::start:: to action due to queuing callActionEnd(); } } } return null; }
ActionExecutor.start是異步的,還須要檢查Action執行狀態來推動流程,oozie經過兩種方式來檢查任務是否完成。app
回調:當一個任務和一個計算被啓動後,會爲任務提供一個回調url,該任務執行完成後,會執行回調來通知oozie框架
輪詢:在任務執行回調失敗的狀況下,不管任何緣由,都支持以輪詢的方式進行查詢。
oozie提供這兩種方式來控制任務。後續咱們會再提到。
上面代碼中 executor.start(context, wfAction);
就是啓動任務。
HiveActionExecutor繼承 ScriptLanguageActionExecutor,ScriptLanguageActionExecutor繼承 JavaActionExecutor,因此後續不少函數執行的是JavaActionExecutor中的函數。
public class HiveActionExecutor extends ScriptLanguageActionExecutor {}
ActionExecutor.start就是執行的JavaActionExecutor.start()。
其會檢查文件系統,好比hdfs是否是支持,Action Dir是否ready,而後會submitLauncher。
public void start(Context context, WorkflowAction action) throws ActionExecutorException { FileSystem actionFs = context.getAppFileSystem(); prepareActionDir(actionFs, context); submitLauncher(actionFs, context, action); // 這裏是業務 check(context, action); }
submitLauncher主要功能是:
具體代碼以下:
public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action)throws ActionExecutorException { YarnClient yarnClient = null; try { // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); setupActionConf(actionConf, context, actionXml, appPathRoot); addAppNameContext(context, action); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); // 配置回調Action injectActionCallback(context, actionConf); Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); yarnClient = createYarnClient(context, launcherConf); //繼續配置各類Credentials if (UserGroupInformation.isSecurityEnabled()) { ...... } if (alreadyRunning && !isUserRetry) { ...... } else { YarnClientApplication newApp = yarnClient.createApplication(); ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml); // 這裏正式與 Yarn 交互。 yarnClient.submitApplication(appContext); launcherId = appId.toString(); ApplicationReport appReport = yarnClient.getApplicationReport(appId); consoleUrl = appReport.getTrackingUrl(); } String jobTracker = launcherConf.get(HADOOP_YARN_RM); context.setStartData(launcherId, jobTracker, consoleUrl); } } protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException { String user = context.getWorkflow().getUser(); return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf); }
這裏咱們有必要提一下舊版本的實現:LauncherMapper。
網上關於Oozie的文章不少都是基於舊版本,因此基本都提到了 LauncherMapper,好比:
Oozie本質就是一個做業協調工具(底層原理是經過將xml語言轉換成mapreduce程序來作,但只是在集中map端作處理,避免shuffle的過程)。
Oozie執行Action時,即ActionExecutor(最主要的子類是JavaActionExecutor,hive、spark等action都是這個類的子類),JavaActionExecutor首先會提交一個LauncherMapper(map任務)到yarn,其中會執行LauncherMain(具體的action是其子類,好比JavaMain、SparkMain等),spark任務會執行SparkMain,在SparkMain中會調用org.apache.spark.deploy.SparkSubmit來提交任務。其實訴個人map任務就是識別你是什麼樣的任務(hive,shell,spark等),並經過該任務來啓動任務所須要的環境來提交任務。提供了提交任務的接口(如hive任務,啓動hive客戶端或beeline等)
從文檔看,OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko)
這時候被移除了。
咱們從舊版本代碼中大體看看LauncherMapper的實現。
LauncherMapper繼承了 import org.apache.hadoop.mapred.Mapper;
,實現了 map 函數。其內部就是調用用戶代碼的主函數。
import org.apache.hadoop.mapred.Mapper; public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable { @Override public void map(K1 key, V1 value, OutputCollector<K2, V2> collector, Reporter reporter) throws IOException { SecurityManager initialSecurityManager = System.getSecurityManager(); try { else { String mainClass = getJobConf().get(CONF_OOZIE_ACTION_MAIN_CLASS); new LauncherSecurityManager(); setupHeartBeater(reporter); setupMainConfiguration(); // Propagating the conf to use by child job. propagateToHadoopConf(); executePrepare(); Class klass = getJobConf().getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); Method mainMethod = klass.getMethod("main", String[].class); mainMethod.invoke(null, (Object) args); } } } }
在LauncherMapperHelper中,會設置LauncherMapper爲啓動函數。
public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { launcherConf.setMapperClass(LauncherMapper.class); }
在 JavaActionExecutor 中有 org.apache.hadoop.mapred.JobClient
import org.apache.hadoop.mapred.JobClient; public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { jobClient = createJobClient(context, launcherJobConf); LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML); // Set the launcher Main Class LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); LauncherMapperHelper.setupMainArguments(launcherJobConf, args); ...... runningJob = jobClient.submitJob(launcherJobConf); // 這裏進行了提交 }
綜上所述,舊版本 LauncherMapper 實現了一個 import org.apache.hadoop.mapred.Mapper;
,具體是org.apache.hadoop.mapred.JobClient
負責與hadoop交互。
新版本的Oozie是和Yarn深度綁定的,因此咱們須要先介紹Yarn。
YARN 是 Hadoop 2.0 中的資源管理系統,它的基本設計思想是將 MRv1 中的 JobTracker拆分紅了兩個獨立的服務:一個全局的資源管理器 ResourceManager 和每一個應用程序特有的ApplicationMaster。 其中 ResourceManager 負責整個系統的資源管理和分配, 而 ApplicationMaster負責單個應用程序的管理。
YARN 整體上仍然是 Master/Slave 結構,在整個資源管理框架中,ResourceManager 爲Master,NodeManager 爲 Slave,ResourceManager 負責對各個 NodeManager 上的資源進行統一管理和調度。
當用戶提交一個應用程序時,須要提供一個用以跟蹤和管理這個程序的ApplicationMaster, 它負責向 ResourceManager 申請資源,並要求 NodeManager 啓動能夠佔用必定資源的任務。 因爲不一樣的ApplicationMaster 被分佈到不一樣的節點上,所以它們之間不會相互影響。
用戶提交的每一個應用程序均包含一個 AM,主要功能包括:
當用戶向 YARN 中提交一個應用程序後, YARN 將分兩個階段運行該應用程序 :
工做流程分爲如下幾個步驟:
LauncherAM就是Oozie的ApplicationMaster實現。LauncherAM.main就是Yarn調用之處。
public class LauncherAM { public static void main(String[] args) throws Exception { final LocalFsOperations localFsOperations = new LocalFsOperations(); final Configuration launcherConf = readLauncherConfiguration(localFsOperations); UserGroupInformation.setConfiguration(launcherConf); // MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+ // SecurityUtil.setConfiguration(launcherConf); UserGroupInformation ugi = getUserGroupInformation(launcherConf); // Executing code inside a doAs with an ugi equipped with correct tokens. ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(), new AMRMCallBackHandler(), new HdfsOperations(new SequenceFileWriterFactory()), new LocalFsOperations(), new PrepareActionsHandler(new LauncherURIHandlerFactory(null)), new LauncherAMCallbackNotifierFactory(), new LauncherSecurityManager(), sysenv.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()), launcherConf); launcher.run(); return null; } }); } }
launcher.run主要完成
經過registerWithRM調用AMRMClientAsync來註冊到Resource Manager
具體代碼以下:
public void run() throws Exception { try { actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH)); registerWithRM(amrmCallBackHandler); // Run user code without the AM_RM_TOKEN so users can't request containers UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME); ugi.doAs(new PrivilegedExceptionAction<Object>() { @Override public Object run() throws Exception { executePrepare(errorHolder); setupMainConfiguration(); runActionMain(errorHolder); // 會根據配置調用具體的main函數,好比HiveMain return null; } }); } finally { try { actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString()); hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData); } finally { try { unregisterWithRM(actionResult, errorHolder.getErrorMessage()); } finally { LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf); cn.notifyURL(actionResult); } } } }
可是你會發現,對比以前所說的ApplicationMaster應該實現的功能,LauncherAM 作得恁少了點,這是個疑問! 咱們在後續研究中會爲你們揭開這個祕密。
上文提到,runActionMain會根據配置調用具體的main函數。咱們假設是hive action,則對應的是HiveMain。
Hive job的入口函數是在HIVE_MAIN_CLASS_NAME配置的。
public class HiveActionExecutor extends ScriptLanguageActionExecutor { private static final String HIVE_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.HiveMain"; @Override public List<Class<?>> getLauncherClasses() { List<Class<?>> classes = new ArrayList<Class<?>>(); classes.add(Class.forName(HIVE_MAIN_CLASS_NAME)); // 這裏配置了 HiveMain return classes; } }
HiveMain後續調用以下
HiveMain.main ----> run ----> runHive ----> CliDriver.main(args);
最後調用 org.apache.hadoop.hive.cli.CliDriver 完成了hive操做,大體有:
具體以下:
public class HiveMain extends LauncherMain { public static void main(String[] args) throws Exception { run(HiveMain.class, args); } @Override protected void run(String[] args) throws Exception { Configuration hiveConf = setUpHiveSite(); List<String> arguments = new ArrayList<String>(); String logFile = setUpHiveLog4J(hiveConf); arguments.add("--hiveconf"); arguments.add("hive.log4j.file=" + new File(HIVE_L4J_PROPS).getAbsolutePath()); arguments.add("--hiveconf"); arguments.add("hive.exec.log4j.file=" + new File(HIVE_EXEC_L4J_PROPS).getAbsolutePath()); //setting oozie workflow id as caller context id for hive String callerId = "oozie:" + System.getProperty(LauncherAM.OOZIE_JOB_ID); arguments.add("--hiveconf"); arguments.add("hive.log.trace.id=" + callerId); String scriptPath = hiveConf.get(HiveActionExecutor.HIVE_SCRIPT); String query = hiveConf.get(HiveActionExecutor.HIVE_QUERY); if (scriptPath != null) { ...... // print out current directory & its contents File localDir = new File("dummy").getAbsoluteFile().getParentFile(); String[] files = localDir.list(); // Prepare the Hive Script String script = readStringFromFile(scriptPath); arguments.add("-f"); arguments.add(scriptPath); } else if (query != null) { String filename = createScriptFile(query); arguments.add("-f"); arguments.add(filename); } // Pass any parameters to Hive via arguments ...... String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS); for (String hiveArg : hiveArgs) { arguments.add(hiveArg); } LauncherMain.killChildYarnJobs(hiveConf); try { runHive(arguments.toArray(new String[arguments.size()])); } finally { writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive"); } } }
所以咱們能看到,Oozie ApplicationMaster 在被Yarn調用以後,就是經過org.apache.hadoop.hive.cli.CliDriver
給Hive發送命令讓其執行,沒有什麼再和ResourceManager / NodeManager 交互的過程,這真的很奇怪。這個祕密要由下面的Tez來解答。
Tez是Apache開源的支持DAG做業的計算框架,它直接源於MapReduce框架,核心思想是將Map和Reduce兩個操做進一步拆分,即Map被拆分紅Input、Processor、Sort、Merge和Output, Reduce被拆分紅Input、Shuffle、Sort、Merge、Processor和Output等,這樣,這些分解後的元操做能夠任意靈活組合,產生新的操做,這些操做通過一些控制程序組裝後,可造成一個大的DAG做業。
Tez有如下特色:
能夠看到,Tez也是和Yarn深度綁定的。
首先咱們就找到了Tez對應的Application Master,即Tez DAG Application Master。
public class DAGAppMaster extends AbstractService { public String submitDAGToAppMaster(DAGPlan dagPlan, Map<String, LocalResource> additionalResources) throws TezException { startDAG(dagPlan, additionalResources); } } }
咱們能看到提交Application Master代碼。
public class TezYarnClient extends FrameworkClient { @Override public ApplicationId submitApplication(ApplicationSubmissionContext appSubmissionContext) throws YarnException, IOException, TezException { ApplicationId appId= yarnClient.submitApplication(appSubmissionContext); ApplicationReport appReport = getApplicationReport(appId); return appId; } }
這裏是創建Application Master context 代碼,設置了Application Maste類和Container。
public static ApplicationSubmissionContext createApplicationSubmissionContext( ApplicationId appId, DAG dag, String amName, AMConfiguration amConfig, Map<String, LocalResource> tezJarResources, Credentials sessionCreds, boolean tezLrsAsArchive, TezApiVersionInfo apiVersionInfo, ServicePluginsDescriptor servicePluginsDescriptor, JavaOptsChecker javaOptsChecker) throws IOException, YarnException { // Setup the command to run the AM List<String> vargs = new ArrayList<String>(8); vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); String amOpts = constructAMLaunchOpts(amConfig.getTezConfiguration(), capability); vargs.add(amOpts); // 這裏設置了 Application Master vargs.add(TezConstants.TEZ_APPLICATION_MASTER_CLASS); // 這裏設置了命令行參數 Vector<String> vargsFinal = new Vector<String>(8); // Final command StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); } vargsFinal.add(mergedCommand.toString()); // 設置了container // Setup ContainerLaunchContext for AM container ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(amLocalResources, environment, vargsFinal, serviceData, securityTokens, acls); // Set up the ApplicationSubmissionContext ApplicationSubmissionContext appContext = Records .newRecord(ApplicationSubmissionContext.class); appContext.setAMContainerSpec(amContainer); return appContext; }
這裏只摘要部分代碼,能看到Tez實現了與Yarn Resource Manager交互。
YarnTaskSchedulerService實現了AMRMClientAsync.CallbackHandler,其功能是處理由Resource Manager收到的消息,其實現了方法
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; public class YarnTaskSchedulerService extends TaskScheduler implements AMRMClientAsync.CallbackHandler { @Override public void onContainersAllocated(List<Container> containers) { if (!shouldReuseContainers) { List<Container> modifiableContainerList = Lists.newLinkedList(containers); assignedContainers = assignNewlyAllocatedContainers( modifiableContainerList); } } // upcall to app must be outside locks informAppAboutAssignments(assignedContainers); } @Override public void onContainersCompleted(List<ContainerStatus> statuses) { synchronized (this) { for(ContainerStatus containerStatus : statuses) { ContainerId completedId = containerStatus.getContainerId(); HeldContainer delayedContainer = heldContainers.get(completedId); Object task = releasedContainers.remove(completedId); appContainerStatus.put(task, containerStatus); continue; } // not found in released containers. check currently allocated containers // no need to release this container as the RM has already completed it task = unAssignContainer(completedId, false); if (delayedContainer != null) { heldContainers.remove(completedId); Resources.subtract(allocatedResources, delayedContainer.getContainer().getResource()); } if(task != null) { // completion of a container we have allocated currently // an allocated container completed. notify app. This will cause attempt to get killed appContainerStatus.put(task, containerStatus); continue; } } } // upcall to app must be outside locks for (Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) { getContext().containerCompleted(entry.getKey(), entry.getValue()); } } }
由此咱們能夠看到,Oozie是一個甩手掌櫃,他只管啓動Hive,具體後續如何與RM交互,則徹底由Tez搞定。這就解答了以前咱們全部疑惑。
最後總結下新流程:
原諒我用這種辦法畫圖,由於我最討厭看到一篇好文,結果發現圖沒了......
+---------+ +----------+ +-----------+ | | 1-submit LauncherAM | | 2.CliDriver.main | | | |---------------------->| HiveMain |---------------------> | | | | | | | |--+ | [Oozie] | | [Yarn] | | [Hive] | | 3.Run | | | | | | | Hive | | 5-notifyURL of Oozie | | 4-submit DAGAppMaster | |<-+ | |<----------------------| | <-------------------->| Tez | | | | | | | +---------+ +----------+ +-----------+
下面咱們看看若是Oozie執行一個Java程序,是如何進行的。
Java程序的主執行函數是 JavaMain,這個就簡單多了,就是直接調用用戶的Java主函數。
public class JavaMain extends LauncherMain { public static final String JAVA_MAIN_CLASS = "oozie.action.java.main"; /** * @param args Invoked from LauncherAM:run() * @throws Exception in case of error when running the application */ public static void main(String[] args) throws Exception { run(JavaMain.class, args); } @Override protected void run(String[] args) throws Exception { Configuration actionConf = loadActionConf(); setYarnTag(actionConf); setApplicationTags(actionConf, TEZ_APPLICATION_TAGS); setApplicationTags(actionConf, SPARK_YARN_TAGS); LauncherMain.killChildYarnJobs(actionConf); Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class); Method mainMethod = klass.getMethod("main", String[].class); mainMethod.invoke(null, (Object) args); } }
前面提到,ActionExecutor.start是異步的,還須要檢查Action執行狀態來推動流程,oozie經過兩種方式來檢查任務是否完成。
oozie提供這兩種方式來控制任務。
LauncherAM 在用戶程序執行完成以後,會作以下調用,以通知Oozie。這就用到了「回調」機制。
LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf); cn.notifyURL(actionResult);
Oozie的CallbackServlet會響應這個調用。能夠看到,DagEngine.processCallback是Oozie處理程序結束之處。
public class CallbackServlet extends JsonRestServlet { @Override protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String queryString = request.getQueryString(); CallbackService callbackService = Services.get().get(CallbackService.class); String actionId = callbackService.getActionId(queryString); DagEngine dagEngine = Services.get().get(DagEngineService.class).getSystemDagEngine(); dagEngine.processCallback(actionId, callbackService.getExternalStatus(queryString), null); } } }
DagEngine.processCallback主要是使用CompletedActionXCommand來進行。能夠看到這個命令是放到 CallableQueueService 的 queue中,因此下面咱們須要介紹 CallableQueueService。
public void processCallback(String actionId, String externalStatus, Properties actionData) throws DagEngineException { XCallable<Void> command = new CompletedActionXCommand(actionId, externalStatus, actionData, HIGH_PRIORITY); if (!Services.get().get(CallableQueueService.class).queue(command)) { LOG.warn(XLog.OPS, "queue is full or system is in SAFEMODE, ignoring callback"); } }
Oozie 使用 CallableQueueService 來異步執行操做;
public class CallableQueueService implements Service, Instrumentable { private final Map<String, AtomicInteger> activeCallables = new HashMap<String, AtomicInteger>(); private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>(); private Set<String> interruptTypes; private int interruptMapMaxSize; private int maxCallableConcurrency; private int queueAwaitTerminationTimeoutSeconds; private int queueSize; private PriorityDelayQueue<CallableWrapper<?>> queue; private ThreadPoolExecutor executor; private Instrumentation instrumentation; private boolean newImpl = false; private AsyncXCommandExecutor asyncXCommandExecutor; public void init(Services services) { queue = new PollablePriorityDelayQueue<CallableWrapper<?>>(PRIORITIES, MAX_CALLABLE_WAITTIME_MS, TimeUnit.MILLISECONDS, queueSize) { @Override protected boolean eligibleToPoll(QueueElement<?> element) { if (element != null) { CallableWrapper wrapper = (CallableWrapper) element; if (element.getElement() != null) { return callableReachMaxConcurrency(wrapper.getElement()); } } return false; } }; } }
特色:
線程池選取的隊列是oozie自定義的隊列 PriorityDelayQueue:
特色:
根據隊列中元素的延時時間以及其執行優先級出隊列:
實現策略:
PriorityDelayQueue 中爲每一個優先級別的任務設置一個 延時隊列 DelayQueue 由於使用的是jdk自帶的延時隊列 DelayQueue,能夠保證的是若是任務在該隊列中的延時時間知足條件,咱們 經過poll()方法便可獲得知足延時條件的任務,若是 poll()獲得的是null,說明該隊列的中任務沒有知足時間條件的任務。 如何編排多個優先級的隊列: 每次從PriorityDelayQueue去選取任務,都優先從最高優先級的隊列來poll出任務,若是最高的優先級隊列中沒有知足條件的任務,則次優先級隊列poll出任務,若是仍未獲取 將按照隊列優先等級以此類推。 餓死現象:假如高優先級中的任務在每次獲取的時候都知足條件,這樣容易將低優先級的隊列中知足條件的任務活活餓死,爲了防止這種狀況的產生,在每次選取任務以前,遍歷 低優先級隊列任務,若是任務早已經知足出隊列條件,若是超時時間超過了咱們設定的最大值,咱們會爲這個任務提升優先級,將這個任務優先級加一,添加到上個優先級隊列中進行 排隊。
特色:
在從隊列中選取任務的時候,先判斷知足時間的任務是否知足併發等限制,若是知足再從隊列中取出,而不是像PriorityDelayQueue那樣,先取出若是不知足併發等限制,再將該任務從新放置回去。
任務類型:
使用線程池異步執行任務,任務和任務之間是無序的,針對具體的業務場景,可能執行的單元是須要串序執行的。oozie中封裝了 CompositeCallable 和 通常的 XCallable的任務類型,前者是XCallable的一個集合,它能保證的是這個集合裏面的XCallable是順序執行的。
CompletedActionXCommand 當Workflow command結束時候會執行,且只執行一次。對於程序結束,會在異步隊列中加入一個 ActionCheckXCommand。
public class CompletedActionXCommand extends WorkflowXCommand<Void> { @Override protected Void execute() throws CommandException { if (this.wfactionBean.getStatus() == WorkflowActionBean.Status.PREP) { ..... } else { // RUNNING ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(this.wfactionBean.getType()); // this is done because oozie notifications (of sub-wfs) is send // every status change, not only on completion. if (executor.isCompleted(externalStatus)) { queue(new ActionCheckXCommand(this.wfactionBean.getId(), getPriority(), -1)); } } return null; } }
異步調用到ActionCheckXCommand,其主要做用是:
public class ActionCheckXCommand extends ActionXCommand<Void> { @Override protected Void execute() throws CommandException { ActionExecutorContext context = null; boolean execSynchronous = false; try { boolean isRetry = false; // 若是有重試機制,則作相應配置 if (wfAction.getRetries() > 0) { isRetry = true; } boolean isUserRetry = false; context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); executor.check(context, wfAction); // 檢查環境信息 if (wfAction.isExecutionComplete()) { if (!context.isExecuted()) { failJob(context); generateEvent = true; } else { wfAction.setPending(); execSynchronous = true; } } updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_CHECK, wfAction)); updateList.add(new UpdateEntry<WorkflowJobQuery> (WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); } finally { // 更新數據庫中的任務信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); if (generateEvent && EventHandlerService.isEnabled()) { generateEvent(wfAction, wfJob.getUser()); } if (execSynchronous) { // 用ActionEndXCommand來執行結束 new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); } } return null; } }
調用到 JavaActionExecutor.check
@Override public void check(Context context, WorkflowAction action) throws ActionExecutorException { boolean fallback = false; YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); Configuration jobConf = createBaseHadoopConf(context, actionXml); FileSystem actionFs = context.getAppFileSystem(); yarnClient = createYarnClient(context, jobConf); // 根據配置信息創建 FinalApplicationStatus appStatus = null; try { final String effectiveApplicationId = findYarnApplicationId(context, action); final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId); final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); // 獲取程序報告信息 final YarnApplicationState appState = appReport.getYarnApplicationState(); if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED) { appStatus = appReport.getFinalApplicationStatus(); } } if (appStatus != null || fallback) { Path actionDir = context.getActionDir(); // load sequence file into object Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); // 獲取程序數據 if (fallback) { String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS); if (finalStatus != null) { appStatus = FinalApplicationStatus.valueOf(finalStatus); } else { context.setExecutionData(FAILED, null); } } String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID); // MapReduce was launched if (externalID != null) { context.setExternalChildIDs(externalID); } // Multiple child IDs - Pig or Hive action String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); if (externalIDs != null) { context.setExternalChildIDs(externalIDs); } // 設置各類信息 context.setExecutionData(appStatus.toString(), null); if (appStatus == FinalApplicationStatus.SUCCEEDED) { if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) { context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS))); } else { context.setExecutionData(SUCCEEDED, null); } if (LauncherHelper.hasStatsData(actionData)) { context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS)); } getActionData(actionFs, action, context); } else { ...... context.setExecutionData(FAILED_KILLED, null); } } } finally { if (yarnClient != null) { IOUtils.closeQuietly(yarnClient); } } }
ActionEndXCommand會進行結束和跳轉:
public class ActionEndXCommand extends ActionXCommand<Void> { @Override protected Void execute() throws CommandException { Configuration conf = wfJob.getWorkflowInstance().getConf(); if (!(executor instanceof ControlNodeActionExecutor)) { maxRetries = conf.getInt(OozieClient.ACTION_MAX_RETRIES, executor.getMaxRetries()); retryInterval = conf.getLong(OozieClient.ACTION_RETRY_INTERVAL, executor.getRetryInterval()); } executor.setMaxRetries(maxRetries); executor.setRetryInterval(retryInterval); boolean isRetry = false; if (wfAction.getStatus() == WorkflowActionBean.Status.END_RETRY || wfAction.getStatus() == WorkflowActionBean.Status.END_MANUAL) { isRetry = true; } boolean isUserRetry = false; ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); try { executor.end(context, wfAction); // 調用Executor來完成結束操做 if (!context.isEnded()) { failJob(context); } else { wfAction.setRetries(0); wfAction.setEndTime(new Date()); boolean shouldHandleUserRetry = false; Status slaStatus = null; switch (wfAction.getStatus()) { case OK: slaStatus = Status.SUCCEEDED; break; ...... } if (!shouldHandleUserRetry || !handleUserRetry(context, wfAction)) { SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(), slaStatus, SlaAppType.WORKFLOW_ACTION); if(slaEvent != null) { insertList.add(slaEvent); } } } WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); DagELFunctions.setActionInfo(wfInstance, wfAction); wfJob.setWorkflowInstance(wfInstance); updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction)); wfJob.setLastModifiedTime(new Date()); updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); } finally { try { // 更新數據庫的job信息 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); } if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) { generateEvent(wfAction, wfJob.getUser()); } new SignalXCommand(jobId, actionId).call(); // 進行跳轉,進行下一個Action的執行 } return null; } }
【原創】大叔經驗分享(6)Oozie如何查看提交到Yarn上的任務日誌