工做流調度引擎---Oozie

Oozie使用教程java

一.   Oozie簡介node

Apache Oozie是用於Hadoop平臺的一種工做流調度引擎。linux

  1. 做用

- 統一調度hadoop系統中常見的mr任務啓動hdfs操做、shell調度、hive操做等。web

- 使得複雜的依賴關係時間觸發事件觸發使用xml語言進行表達開發效率提升。sql

- 一組任務使用一個DAG來表示,使用圖形表達流程邏輯更加清晰。shell

- 支持不少種任務調度,能完成大部分hadoop任務處理。apache

- 程序定義支持EL常量和函數,表達更加豐富。瀏覽器

  1. 架構

 

 

  1. 訪問

- 經過瀏覽器訪問 http://master:11000/oozie/bash

 

 

- 經過HUE訪問架構

 

 

  1. 概念

- workflow:工做流

- coordinator:多個workflow能夠組成一個coordinator,能夠把前幾個workflow的輸出做爲後- 一個workflow的輸入,也能夠定義workflow的觸發條件,來作定時觸發

- bundle:是對一堆coordinator的抽象

二.   Oozie操做

  1. Oozie shell

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.4" name="shell-wf">

    <start to="shell-node"/>

    <action name="shell-node">

        <shell xmlns="uri:oozie:shell-action:0.2">

            <job-tracker>${jobTracker}</job-tracker>

            <name-node>${nameNode}</name-node>

            <configuration>

                <property>

                    <name>mapred.job.queue.name</name>

                    <value>${queueName}</value>

                </property>

            </configuration>

            <exec>echo</exec>

            <argument>my_output=Hello Oozie</argument>

            <capture-output/>

        </shell>

        <ok to="check-output"/>

        <error to="fail"/>

    </action>

    <decision name="check-output">

        <switch>

            <case to="end">

                ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'}

            </case>

            <default to="fail-output"/>

        </switch>

    </decision>

    <kill name="fail">

        <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>

    </kill>

    <kill name="fail-output">

        <message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message>

    </kill>

    <end name="end"/>

</workflow-app>

 

 

- 執行oozie cli命令

 

 

執行命令後會返回一個job的id,在web的監控頁面或Hue的頁面能夠查看其信息。

 

 

Job的有向無環圖:

 

 

  1. Oozie fs

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.2" name="fs">
    <start to="fs-node"/>
    <action name="fs-node">
      <fs>
       <delete path='/home/kongc/oozie'/>
         <mkdir path='/home/kongc/oozie1'/>
         <move source='/home/kongc/spark-application' target='/home/kongc/oozie1'/>
      </fs>
      <ok to="end"/>
      <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 執行oozie cli命令

  1. Oozie Sqoop

- 編寫job.properties文件

 

 

- 編寫配置文件

#HSQL Database Engine 1.8.0.5
#Tue Oct 05 11:20:19 SGT 2010
hsqldb.script_format=0
runtime.gc_interval=0
sql.enforce_strict_size=false
hsqldb.cache_size_scale=8
readonly=false
hsqldb.nio_data_file=true
hsqldb.cache_scale=14
version=1.8.0
hsqldb.default_table_type=memory
hsqldb.cache_file_scale=1
hsqldb.log_size=200
modified=no
hsqldb.cache_version=1.7.0
hsqldb.original_version=1.8.0
hsqldb.compatible_version=1.8.0

 

 

- 編寫sql文件

CREATE SCHEMA PUBLIC AUTHORIZATION DBA
CREATE MEMORY TABLE TT(I INTEGER NOT NULL PRIMARY KEY,S VARCHAR(256))
CREATE USER SA PASSWORD ""
GRANT DBA TO SA
SET WRITE_DELAY 10
SET SCHEMA PUBLIC
INSERT INTO TT VALUES(1,'a')
INSERT INTO TT VALUES(2,'a')
INSERT INTO TT VALUES(3,'a')

 

 

- 編寫workflow.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sqoop-wf">
    <start to="sqoop-node"/>
 
    <action name="sqoop-node">
        <sqoop xmlns="uri:oozie:sqoop-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/sqoop"/>
                <mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <command>import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir /user/oozie/${examplesRoot}/output-data/sqoop -m 1</command>
            <file>db.hsqldb.properties#db.hsqldb.properties</file>
            <file>db.hsqldb.script#db.hsqldb.script</file>
        </sqoop>
        <ok to="end"/>
        <error to="fail"/>
    </action>
 
    <kill name="fail">
        <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 執行oozie cli命令

  1. Oozie Java

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-kc">
    <start to="java-node"/>
    <action name="java-node">
        <java>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <main-class>org.apache.oozie.example.DemoJavaMain</main-class>
            <arg>Hello</arg>
            <arg>Oozie!</arg>
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 執行oozie cli命令

  1. Oozie Hive

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf">
    <start to="hive2-node"/>
 
    <action name="hive2-node">
        <hive2 xmlns="uri:oozie:hive2-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/hive2"/>
                <mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
            </configuration>
            <jdbc-url>${jdbcURL}</jdbc-url>
            <script>script.q</script>
            <param>INPUT=/user/oozie/${examplesRoot}/input-data/table</param>
            <param>OUTPUT=/user/oozie/${examplesRoot}/output-data/hive2</param>
        </hive2>
        <ok to="end"/>
        <error to="fail"/>
    </action>
 
    <kill name="fail">
        <message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 編寫hive腳本

INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT * FROM test_machine;

 

 

- 執行oozie cli命令

  1. Oozie Impala

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app name="shell-impala" xmlns="uri:oozie:workflow:0.4">
      <start to="shell-impala-invalidate"/>
      <action name="shell-impala-invalidate">
               <shell xmlns="uri:oozie:shell-action:0.1">
                         <job-tracker>${jobTracker}</job-tracker>
                         <name-node>${nameNode}</name-node>
                         <configuration>
                                  <property>
                                            <name>mapred.job.queue.name</name>
                                            <value>${queueName}</value>
                                  </property>
                         </configuration>
                         <exec>${EXEC}</exec>
                         <file>${EXEC}#${EXEC}</file>
               </shell>
               <ok to="end"/>
               <error to="kill"/>
      </action>
      <kill name="kill">
               <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
      </kill>
      <end name="end"/>
</workflow-app>

 

 

- 編寫impala腳本文件

#!/bin/bash
impala-shell -i slave2:21000 -q "
select count(*) from test_machine"
echo 'Hello Shell'

 

 

- 執行oozie cli命令

  1. Oozie MapReduce

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wyl">
    <start to="mr-node"/>
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/${outputDir}"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.SampleMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.SampleReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/oozie/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/${outputDir}</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
        <message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>

 

 

- 執行oozie cli命令

  1. Oozie Spark

- 編寫job.properties文件

 

 

- 編寫workflow.xml文件

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
    <start to='spark-node' />
 
    <action name='spark-node'>
        <spark xmlns="uri:oozie:spark-action:0.1">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/spark"/>
            </prepare>
            <master>${master}</master>
            <name>Spark-FileCopy</name>
            <class>org.apache.oozie.example.SparkFileCopy</class>
            <jar>${nameNode}/user/oozie/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>
            <arg>${nameNode}/user/oozie/${examplesRoot}/input-data/text/data.txt</arg>
            <arg>${nameNode}/user/oozie/${examplesRoot}/output-data/spark</arg>
        </spark>
        <ok to="end" />
        <error to="fail" />
    </action>
 
    <kill name="fail">
        <message>Workflow failed, error
            message[${wf:errorMessage(wf:lastErrorNode())}]
        </message>
    </kill>
    <end name='end' />
</workflow-app>

 

 

- 執行oozie cli命令

  1. Oozie 定時任務

-    定義job.properties

 nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
examplesRoot=examples
 
oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/aggregator/coordinator.xml
start=2010-01-01T01:00Z
end=2010-01-01T03:00Z

 

 

-    定義coordinator.xml

 <coordinator-app name="aggregator-coord" frequency="${coord:hours(1)}" start="${start}" end="${end}" timezone="UTC"
                 xmlns="uri:oozie:coordinator:0.2">
    <controls>
        <concurrency>1</concurrency>
    </controls>
 
    <datasets>
        <dataset name="raw-logs" frequency="${coord:minutes(20)}" initial-instance="2010-01-01T00:00Z" timezone="UTC">
            <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
        </dataset>
        <dataset name="aggregated-logs" frequency="${coord:hours(1)}" initial-instance="2010-01-01T01:00Z" timezone="UTC">
            <uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/output-data/aggregator/aggregatedLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
        </dataset>
    </datasets>
 
    <input-events>
        <data-in name="input" dataset="raw-logs">
            <start-instance>${coord:current(-2)}</start-instance>
            <end-instance>${coord:current(0)}</end-instance>
        </data-in>
    </input-events>
 
    <output-events>
        <data-out name="output" dataset="aggregated-logs">
            <instance>${coord:current(0)}</instance>
        </data-out>
    </output-events>
 
    <action>
        <workflow>
            <app-path>${nameNode}/user/${coord:user()}/${examplesRoot}/apps/aggregator</app-path>
            <configuration>
                <property>
                    <name>jobTracker</name>
                    <value>${jobTracker}</value>
                </property>
                <property>
                    <name>nameNode</name>
                    <value>${nameNode}</value>
                </property>
                <property>
                    <name>queueName</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>inputData</name>
                    <value>${coord:dataIn('input')}</value>
                </property>
                <property>
                    <name>outputData</name>
                    <value>${coord:dataOut('output')}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

 

 

三.   Oozie實例

  1. 設計工做流

 

 

  1. 編寫job.properties文件
 nameNode=hdfs://localhost:8020
jobTracker=localhost:8021
queueName=default
examplesRoot=examples
streamingMapper=/bin/cat
streamingReducer=/usr/bin/wc
 
oozie.use.system.libpath=true
 
oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/demo
 

 

  1. 在workflow.xml文件定義節點
 <workflow-app xmlns="uri:oozie:workflow:0.2" name="demo-wf">
 
    <start to="cleanup-node"/>
 
    <action name="cleanup-node">
        <fs>
            <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo"/>
        </fs>
        <ok to="fork-node"/>
        <error to="fail"/>
    </action>
 
    <fork name="fork-node">
        <path start="pig-node"/>
        <path start="streaming-node"/>
    </fork>
 
    <action name="pig-node">
        <pig>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/pig-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                <property>
                    <name>mapred.map.output.compress</name>
                    <value>false</value>
                </property>
            </configuration>
            <script>id.pig</script>
            <param>INPUT=/user/oozie/${examplesRoot}/input-data/text</param>
            <param>OUTPUT=/user/oozie/${examplesRoot}/output-data/demo/pig-node</param>
        </pig>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>
 
    <action name="streaming-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/streaming-node"/>
            </prepare>
            <streaming>
                <mapper>${streamingMapper}</mapper>
                <reducer>${streamingReducer}</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
 
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/oozie/${examplesRoot}/input-data/text</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join-node"/>
        <error to="fail"/>
    </action>
 
    <join name="join-node" to="mr-node"/>
    
    
    <action name="mr-node">
        <map-reduce>
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <prepare>
                <delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/mr-node"/>
            </prepare>
            <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
 
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.oozie.example.DemoMapper</value>
                </property>
                <property>
                    <name>mapred.mapoutput
.key.class</name>
                    <value>org.apache.hadoop.io.Text</value>
                </property>
                <property>
                    <name>mapred.mapoutput.value.class</name>
                    <value>org.apache.hadoop.io.IntWritable</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.oozie.example.DemoReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/demo/pig-node,/user/oozie/${examplesRoot}/output-data/demo/streaming-node</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/user/oozie/${examplesRoot}/output-data/demo/mr-node</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="decision-node"/>
        <error to="fail"/>
    </action>
 
    <decision name="decision-node">
        <switch>
            <case to="hdfs-node">${fs:exists(concat(nameNode, '/user/oozie/examples/output-data/demo/mr-node')) == "true"}</case>
            <default to="end"/>
        </switch>
    </decision>
 
    <action name="hdfs-node">
        <fs>
            <move source="${nameNode}/user/oozie/${examplesRoot}/output-data/demo/mr-node"
                  target="/user/oozie/${examplesRoot}/output-data/demo/final-data"/>
        </fs>
        <ok to="end"/>
        <error to="fail"/>
    </action>
 
    <kill name="fail">
        <message>Demo workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
 
    <end name="end"/>
 
</workflow-app>

 

 

  1. 提交命令

Hue頁面提交做業

 

 

 

  1. 查看執行狀態

 

 

四.   總結

  1. EL函數

- 基本的EL函數

String firstNotNull(String value1, String value2)

String concat(String s1, String s2)

String replaceAll(String src, String regex, String replacement)

String appendAll(String src, String append, String delimeter)

String trim(String s)

String urlEncode(String s)

String timestamp()

String toJsonStr(Map) (since Oozie 3.3)

String toPropertiesStr(Map) (since Oozie 3.3)

String toConfigurationStr(Map) (since Oozie 3.3)

 

- WorkFlow EL

String wf:id() – 返回當前workflow做業ID

String wf:name() – 返回當前workflow做業NAME

String wf:appPath() – 返回當前workflow的路徑

String wf:conf(String name) – 獲取當前workflow的完整配置信息

String wf:user() – 返回啓動當前job的用戶

String wf:callback(String stateVar) – 返回結點的回調URL,其中參數爲動做指定的退出狀態

int wf:run() – 返回workflow的運行編號,正常狀態爲0

Map wf:actionData(String node) – 返回當前節點完成時輸出的信息

int wf:actionExternalStatus(String node) – 返回當前節點的狀態

String wf:lastErrorNode() – 返回最後一個ERROR狀態推出的節點名稱

String wf:errorCode(String node) – 返回指定節點執行job的錯誤碼,沒有則返回空

String wf:errorMessage(String message) – 返回執行節點執行job的錯誤信息,沒有則返回空

- HDFS EL

boolean fs:exists(String path)

boolean fs:isDir(String path)

long fs:dirSize(String path) – 目錄則返回目錄下全部文件字節數;不然返回-1

long fs:fileSize(String path) – 文件則返回文件字節數;不然返回-1

long fs:blockSize(String path) – 文件則返回文件塊的字節數;不然返回

  1. 注意事項

- job.properties文件能夠不上傳到hdfs中,是在執行oozie job ...... -config時,批定的linux本地路徑

- workflow.xml文件,必定要上傳到job.properties的oozie.wf.application.path對應的hdfs目錄下。

- job.properties中的oozie.use.system.libpath=true指定oozie使用系統的共享目錄。

- job.properties中的oozie.libpath=${nameNode}/user/${user.name}/apps/mymr,能夠用來執行mr時,做業導出的jar包存放位置,不然可能報找不到類的錯誤。

- oozie調度做業時,本質也是啓動一個mapreduce做業來調度,workflow.xml中設置的隊列名稱爲調度做業mr的隊列名稱。因此若是想讓做業運行在指定的隊列時,須要在mr或hive中指定好。

相關文章
相關標籤/搜索