Oozie分佈式工做流——從理論和實踐分析使用節點間的參數傳遞

Oozie支持Java Action,所以能夠自定義不少的功能。本篇就從理論和實踐兩方面介紹下Java Action的妙用,另外還涉及到oozie中action之間的參數傳遞。html

本文大體分爲如下幾個部分:java

  • Java Action教程文檔
  • 自定義Java Action實踐
  • 從源碼的角度講解Java Action與Shell Action的參數傳遞。

若是你即將或者想要使用oozie,那麼本篇的文章將會爲你提供不少參考的價值。node

Java Action文檔

java action會自動執行提供的java classpublic static void main方法, 而且會在hadoop集羣啓動一個單獨的map-reduce的map任務來執行的。所以,若是你自定義了一個java程序,它會提交到集羣的某一個節點執行,不會每一個節點都執行一遍。shell

workflow任務會等待java程序執行完繼續執行下一個action。當java類正確執行退出後,將會進入ok控制流;當發生異常時,將會進入error控制流。Java程序絕對不能使用System.exit(int n)將會致使action進入error控制流。apache

在action的配置中,也支持EL表達式。而且使用<capture-output>也能夠把數據輸出出來,而後後面的action就能夠基於EL表達式使用了。app

語法規則

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <java>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
               <delete path="[PATH]"/>
               ...
               <mkdir path="[PATH]"/>
               ...
            </prepare>
            <job-xml>[JOB-XML]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <main-class>[MAIN-CLASS]</main-class>
            <java-opts>[JAVA-STARTUP-OPTS]</java-opts>
            <arg>ARGUMENT</arg>
            ...
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
            <capture-output />
        </java>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

prepare元素,支持建立或者刪除指定的文件內容。在delete時,支持通配的方式指定特定的路徑。java-opts以及java-opt參數提供了執行java應用時分配的JVM。ide

舉個例子:oop

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirstjavajob">
        <java>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${jobOutput}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.queue.name</name>
                    <value>default</value>
                </property>
            </configuration>
            <main-class>org.apache.oozie.MyFirstMainClass</main-class>
            <java-opts>-Dblah</java-opts>
            <arg>argument1</arg>
            <arg>argument2</arg>
        </java>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

覆蓋Main方法

oozie中的不少action都支持這個功能,在configure中指定classpath下的一個類方法,它會覆蓋當前action的main方法。這在不想從新編譯jar包,而想替換程序時,很是有用。源碼分析

自定義Java action程序以及部署

Java程序能夠任意定義,好比寫一個最簡單的hellword,而後打包成lib。ui

而後須要定義oozie腳本:

<action name="java-7cbb">
    <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>default</value>
                </property>
            </configuration>
            <main-class>a.b.c.Main</main-class>
            <arg>arg1</arg>
            <arg>arg2</arg>
            <file>/oozie/lib/ojdbc7.jar#ojdbc7.jar</file>
            <capture-output/>
        </java>
        <ok to="end"/>
        <error to="Kill"/>
    </action>

其中幾個比較重要的屬性,千萬不能拉下:

  • 1 須要指定Map-reduce的隊列:mapred.job.queue.name
  • 2 指定Main class<main-class>
  • 3 若是依賴其餘的jar,須要添加<file>
  • 4 若是想要捕獲輸出,須要設置<capture-output>

若是使用HUE圖形化配置,就比較簡單了:

點擊右上角的齒輪,配置其餘的屬性信息:

基於源碼分析參數傳遞

先從表象來講一下shell action如何傳遞參數:

你只須要定義一個普通的shell,在裏面使用echo把屬性輸出出來便可,後面的action自動就能夠基於EL表達式使用。

test='test123'
echo "test=$test"

這樣後面的action就能夠直接使用了:

${wf:actionData('action-name').test}或者${wf:actionData('action-name')['test']}

很簡單是吧!

在Java裏面就沒這麼容易了:

不管是 System.out.println() 仍是 logger.info/error,都沒法捕獲到數據

上網找了一篇文章,備受啓發

從中抄了一段代碼:

private static final String OOZIE_ACTION_OUTPUT_PROPERTIES = "oozie.action.output.properties";
...
String oozieProp = System.getProperty(OOZIE_ACTION_OUTPUT_PROPERTIES);
        if (oozieProp != null) {
            File propFile = new File(oozieProp);
            Properties props = new Properties();
            props.setProperty(propKey0, propVal0);
            props.setProperty(propKey1, propVal1);
            OutputStream os = new FileOutputStream(propFile);
            props.store(os, "");
            os.close();
        } else
            throw new RuntimeException(OOZIE_ACTION_OUTPUT_PROPERTIES + " System property not defined");

果真就好用了....

爲了理解其中的原因,咱們來看看代碼。首先在shell action中發現一句話:

<<< Invocation of Main class completed <<<


Oozie Launcher, capturing output data:
=======================

因而全局搜索,果真找到對應的代碼,在org.apache.oozie.action.hadoop.LuancherMapper.java中,line275開始:

if (errorMessage == null) {
    handleActionData();
    if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
        System.out.println();
        System.out.println("Oozie Launcher, capturing output data:");
        System.out.println("=======================");
        System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
        System.out.println();
        System.out.println("=======================");
        System.out.println();
    }
    。。。
}

這裏的actionData其實就是個普通的MAP

private Map<String,String> actionData;

public LauncherMapper() {
    actionData = new HashMap<String,String>();
}

Map裏面保存了不少屬性值,其中就包括咱們想要捕獲的輸出內容:

static final String ACTION_PREFIX = "oozie.action.";
    static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";

    ...
    String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
    if (outputProp != null) {
        File actionOutputData = new File(outputProp);
        if (actionOutputData.exists()) {
            int maxOutputData = getJobConf().getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
            actionData.put(ACTION_DATA_OUTPUT_PROPS,
                getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
        }
    }
    ....
    
    public static String getLocalFileContentStr(File file, String type, int maxLen) throws LauncherException, IOException {
        StringBuffer sb = new StringBuffer();
        FileReader reader = new FileReader(file);
        char[] buffer = new char[2048];
        int read;
        int count = 0;
        while ((read = reader.read(buffer)) > -1) {
            count += read;
            if (maxLen > -1 && count > maxLen) {
                throw new LauncherException(type + " data exceeds its limit ["+ maxLen + "]");
            }
            sb.append(buffer, 0, read);
        }
        reader.close();
        return sb.toString();
    }

能夠看到其實就是從oozie.action.output.properties指定的目錄裏面去讀內容,而後輸出出來,後面的action就能夠用了。這就是爲何上面抄的那段代碼能夠使用的緣由。

那麼問題是,shell爲何直接echo就行,java裏面卻要這麼費勁?

別急,先來看看java action的啓動邏輯:

public static void main(String[] args) throws Exception {
        run(JavaMain.class, args);
    }

    @Override
    protected void run(String[] args) throws Exception {
        ...
        Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
        ...
        Method mainMethod = klass.getMethod("main", String[].class);
        try {
            mainMethod.invoke(null, (Object) args);
        } catch(InvocationTargetException ex) {
            // Get rid of the InvocationTargetException and wrap the Throwable
            throw new JavaMainException(ex.getCause());
        }
    }

它什麼也沒作,就是啓動了目標類的main方法而已。

再來看看shell:

private int execute(Configuration actionConf) throws Exception {
        ...
        //判斷是否要捕獲輸出
        boolean captureOutput = actionConf.getBoolean(CONF_OOZIE_SHELL_CAPTURE_OUTPUT, false);

        //執行命令
        Process p = builder.start();

        //處理進程
        Thread[] thrArray = handleShellOutput(p, captureOutput);

        ...
        return exitValue;
    }
    protected Thread[] handleShellOutput(Process p, boolean captureOutput)
            throws IOException {
        BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
        BufferedReader error = new BufferedReader(new InputStreamReader(p.getErrorStream()));

        // 捕獲標準輸出
        OutputWriteThread thrStdout = new OutputWriteThread(input, true, captureOutput);
        thrStdout.setDaemon(true);
        thrStdout.start();

        OutputWriteThread thrStderr = new OutputWriteThread(error, false, false);
        thrStderr.setDaemon(true);
        thrStderr.start();

        return new Thread[]{ thrStdout, thrStderr };
    }
    
    class OutputWriteThread extends Thread {
        ...

        @Override
        public void run() {
            String line;
            BufferedWriter os = null;

            //讀取數據保存在目標文件中

            try {
                if (needCaptured) {
                    File file = new File(System.getProperty(LauncherMapper.ACTION_PREFIX + LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
                    os = new BufferedWriter(new FileWriter(file));
                }
                while ((line = reader.readLine()) != null) {
                    if (isStdout) { // For stdout
                        // 1. Writing to LM STDOUT
                        System.out.println("Stdoutput " + line);
                        // 2. Writing for capture output
                        if (os != null) {
                            if (Shell.WINDOWS) {
                                line = line.replace("\\u", "\\\\u");
                            }
                            os.write(line);
                            os.newLine();
                        }
                    }
                    else {
                        System.err.println(line); // 1. Writing to LM STDERR
                    }
                }
            }
            catch (IOException e) {
                ...
            }finally {
                ...
            }
        }
    }

這樣就很清晰了,shell自動幫咱們把輸出的內容寫入了oozie.action.output.properties文件中。而在java中則須要用戶本身來定義寫入的過程。

後續將會介紹一下oozie中比較高級的用法——EL表達式

相關文章
相關標籤/搜索