Oozie支持Java Action,所以能夠自定義不少的功能。本篇就從理論和實踐兩方面介紹下Java Action的妙用,另外還涉及到oozie中action之間的參數傳遞。html
本文大體分爲如下幾個部分:java
若是你即將或者想要使用oozie,那麼本篇的文章將會爲你提供不少參考的價值。node
java action
會自動執行提供的java class
的public static void main
方法, 而且會在hadoop集羣啓動一個單獨的map-reduce的map任務來執行的。所以,若是你自定義了一個java程序,它會提交到集羣的某一個節點執行,不會每一個節點都執行一遍。shell
workflow任務會等待java程序執行完繼續執行下一個action。當java類正確執行退出後,將會進入ok控制流;當發生異常時,將會進入error控制流。Java程序絕對不能使用System.exit(int n)
將會致使action進入error控制流。apache
在action的配置中,也支持EL表達式。而且使用<capture-output>
也能夠把數據輸出出來,而後後面的action就能夠基於EL表達式使用了。app
<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1"> ... <action name="[NODE-NAME]"> <java> <job-tracker>[JOB-TRACKER]</job-tracker> <name-node>[NAME-NODE]</name-node> <prepare> <delete path="[PATH]"/> ... <mkdir path="[PATH]"/> ... </prepare> <job-xml>[JOB-XML]</job-xml> <configuration> <property> <name>[PROPERTY-NAME]</name> <value>[PROPERTY-VALUE]</value> </property> ... </configuration> <main-class>[MAIN-CLASS]</main-class> <java-opts>[JAVA-STARTUP-OPTS]</java-opts> <arg>ARGUMENT</arg> ... <file>[FILE-PATH]</file> ... <archive>[FILE-PATH]</archive> ... <capture-output /> </java> <ok to="[NODE-NAME]"/> <error to="[NODE-NAME]"/> </action> ... </workflow-app>
prepare
元素,支持建立或者刪除指定的文件內容。在delete
時,支持通配的方式指定特定的路徑。java-opts
以及java-opt
參數提供了執行java應用時分配的JVM。ide
舉個例子:oop
<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1"> ... <action name="myfirstjavajob"> <java> <job-tracker>foo:8021</job-tracker> <name-node>bar:8020</name-node> <prepare> <delete path="${jobOutput}"/> </prepare> <configuration> <property> <name>mapred.queue.name</name> <value>default</value> </property> </configuration> <main-class>org.apache.oozie.MyFirstMainClass</main-class> <java-opts>-Dblah</java-opts> <arg>argument1</arg> <arg>argument2</arg> </java> <ok to="myotherjob"/> <error to="errorcleanup"/> </action> ... </workflow-app>
oozie中的不少action都支持這個功能,在configure
中指定classpath下的一個類方法,它會覆蓋當前action的main方法。這在不想從新編譯jar包,而想替換程序時,很是有用。源碼分析
Java程序能夠任意定義,好比寫一個最簡單的hellword,而後打包成lib。ui
而後須要定義oozie腳本:
<action name="java-7cbb"> <java> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>default</value> </property> </configuration> <main-class>a.b.c.Main</main-class> <arg>arg1</arg> <arg>arg2</arg> <file>/oozie/lib/ojdbc7.jar#ojdbc7.jar</file> <capture-output/> </java> <ok to="end"/> <error to="Kill"/> </action>
其中幾個比較重要的屬性,千萬不能拉下:
mapred.job.queue.name
<main-class>
<file>
<capture-output>
若是使用HUE圖形化配置,就比較簡單了:
點擊右上角的齒輪,配置其餘的屬性信息:
先從表象來講一下shell action如何傳遞參數:
你只須要定義一個普通的shell,在裏面使用echo把屬性輸出出來便可,後面的action自動就能夠基於EL表達式使用。
test='test123' echo "test=$test"
這樣後面的action就能夠直接使用了:
${wf:actionData('action-name').test}或者${wf:actionData('action-name')['test']}
很簡單是吧!
在Java裏面就沒這麼容易了:
不管是 System.out.println() 仍是 logger.info/error,都沒法捕獲到數據
從中抄了一段代碼:
private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties"; ... String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES); if (oozieProp != null) { File propFile = new File(oozieProp); Properties props = new Properties(); props.setProperty(propKey0, propVal0); props.setProperty(propKey1, propVal1); OutputStream os = new FileOutputStream(propFile); props.store(os, ""); os.close(); } else throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined");
果真就好用了....
爲了理解其中的原因,咱們來看看代碼。首先在shell action中發現一句話:
<<< Invocation of Main class completed <<< Oozie Launcher, capturing output data: =======================
因而全局搜索,果真找到對應的代碼,在org.apache.oozie.action.hadoop.LuancherMapper.java
中,line275開始:
if (errorMessage == null) { handleActionData(); if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { System.out.println(); System.out.println("Oozie Launcher, capturing output data:"); System.out.println("======================="); System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS)); System.out.println(); System.out.println("======================="); System.out.println(); } 。。。 }
這裏的actionData其實就是個普通的MAP
private Map<String,String> actionData; public LauncherMapper() { actionData = new HashMap<String,String>(); }
Map裏面保存了不少屬性值,其中就包括咱們想要捕獲的輸出內容:
static final String ACTION_PREFIX = "oozie.action."; static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; ... String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS); if (outputProp != null) { File actionOutputData = new File(outputProp); if (actionOutputData.exists()) { int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024); actionData.put(ACTION_DATA_OUTPUT_PROPS, getLocalFileContentStr(actionOutputData, "Output", maxOutputData)); } } .... public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException { StringBuffer sb = new StringBuffer(); FileReader reader = new FileReader(file); char[] buffer = new char[2048]; int read; int count = 0; while ((read = reader.read(buffer)) > -1) { count += read; if (maxLen > -1 && count > maxLen) { throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]"); } sb.append(buffer, 0, read); } reader.close(); return sb.toString(); }
能夠看到其實就是從oozie.action.output.properties指定的目錄裏面去讀內容,而後輸出出來,後面的action就能夠用了。這就是爲何上面抄的那段代碼能夠使用的緣由。
那麼問題是,shell爲何直接echo就行,java裏面卻要這麼費勁?
別急,先來看看java action的啓動邏輯:
public static void main(String[] args) throws Exception { run(JavaMain.class, args); } @Override protected void run(String[] args) throws Exception { ... Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class); ... Method mainMethod = klass.getMethod("main", String[].class); try { mainMethod.invoke(null, (Object) args); } catch(InvocationTargetException ex) { // Get rid of the InvocationTargetException and wrap the Throwable throw new JavaMainException(ex.getCause()); } }
它什麼也沒作,就是啓動了目標類的main方法而已。
再來看看shell:
private int execute(Configuration actionConf) throws Exception { ... //判斷是否要捕獲輸出 boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false); //執行命令 Process p = builder.start(); //處理進程 Thread[] thrArray = handleShellOutput(p, captureOutput); ... return exitValue; } protected Thread[] handleShellOutput(Process p, boolean captureOutput) throws IOException { BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream())); BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream())); // 捕獲標準輸出 OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput); thrStdout.setDaemon(true); thrStdout.start(); OutputWriteThread thrStderr = new OutputWriteThread(error, false, false); thrStderr.setDaemon(true); thrStderr.start(); return new Thread[]{ thrStdout, thrStderr }; } class OutputWriteThread extends Thread { ... @Override public void run() { String line; BufferedWriter os = null; //讀取數據保存在目標文件中 try { if (needCaptured) { File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS)); os = new BufferedWriter(new FileWriter(file)); } while ((line = reader.readLine()) != null) { if (isStdout) { // For stdout // 1. Writing to LM STDOUT System.out.println("Stdoutput " + line); // 2. Writing for capture output if (os != null) { if (Shell.WINDOWS) { line = line.replace("\\u", "\\\\u"); } os.write(line); os.newLine(); } } else { System.err.println(line); // 1. Writing to LM STDERR } } } catch (IOException e) { ... }finally { ... } } }
這樣就很清晰了,shell自動幫咱們把輸出的內容寫入了oozie.action.output.properties文件中。而在java中則須要用戶本身來定義寫入的過程。