【Oozie學習之一】Oozie

環境
  虛擬機:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客戶端:Xshell4
  FTP:Xftp4
  jdk8
  CM5.4html

 

同類產品:Azkabanjava

 

1、簡介
Oozie由Cloudera公司貢獻給Apache的基於工做流引擎的開源框架,是用於Hadoop平臺的開源的工做流調度引擎,是用來管理Hadoop做業,屬於web應用程序,由Oozie client和Oozie Server兩個組件構成,Oozie Server運行於Java Servlet容器(Tomcat)中的web程序。node

特色:
(1)實際上Oozie不是僅用來配置多個MR工做流的,它能夠是各類程序夾雜在一塊兒的工做流,好比執行一個MR1後,接着執行一個java腳本,再執行一個shell腳本,接着是Hive腳本,而後又是Pig腳本,最後又執行了一個MR2,使用Oozie能夠輕鬆完成這種多樣的工做流。使用Oozie時,若前一個任務執行失敗,後一個任務將不會被調度。
(2)Oozie的工做流必須是一個有向無環圖,實際上Oozie就至關於Hadoop的一個客戶端,當用戶須要執行多個關聯的MR任務時,只須要將MR執行順序寫入workflow.xml,而後使用Oozie提交本次任務,Oozie會託管此任務流。
(3)Oozie定義了控制流節點(Control Flow Nodes)和動做節點(Action Nodes),其中控制流節點定義了流程的開始和結束,以及控制流程的執行路徑(Execution Path),如decision,fork,join等;而動做節點包括Haoop map-reduce hadoop文件系統,Pig,SSH,HTTP,eMail和Oozie子流程linux

架構:web


- workflow:工做流,由咱們須要處理的每一個工做組成,進行需求的流式處理。
- coordinator: 協調器,可將多個工做流協調成一個工做流來進行處理:多個workflow能夠組成一個coordinator,能夠把前幾個workflow的輸出做爲後 一個workflow的輸入,也能夠定義workflow的觸發條件,來作定時觸發
- bundle:捆,束,將一堆的coordinator進行彙總處理,是對一堆coordinator的抽象sql

2、安裝配置
經過CM安裝Oozie服務或者手動安裝shell


一、Oozie WEB控制檯失效問題apache


解壓ext-2.2到/var/lib/oozie unzip ext-2.2.lib -d /var/lib/oozie
Oozie服務中配置啓用web控制檯
保存,重啓Oozie服務瀏覽器

Oozie配置
一、節點內存配置
二、oozie.service.callablequeueservice.callable.concurrency(節點併發)
三、oozie.service.callablequeueservice.queue.size(隊列大小)
四、oozie.service.ActionService.executor.ext.classes(擴展)bash

Oozie共享庫
–/user/oozie/share/lib

web管理地址
oozie自帶 WEBUI
http://oozie_host_ip:11000/oozie/

Hue UI:

3、客戶端經常使用命令
Oozie CLI 命令

 

#啓動任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –run
#提交任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –submit
#開始任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –start 0000003-150713234209387-oozie-oozi-W
#中止任務: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -kill 0000002-150713234209387-oozie-oozi-W
 #查看任務執行狀況: [root@node1 oozie] oozie job -oozie http://ip:11000/oozie/ -config job.properties –info 0000003-150713234209387-oozie-oozi-W

注意:啓動任務其實包含:提交任務和開始任務,兩個命令合成一個。

 

4、Oozie任務配置
一、Hue操做 workflows 

參考:

Hue中使用Oozie的workflow執行MR過程
經過hue提交oozie定時任務

 

二、經過配置文件使用
2.1兩個重要的配置文件:
job.properties

2.2workflow.xml
(1)版本信息
–<workflow-app xmlns="uri:oozie:workflow:0.4" name=「workflow name">
(2)EL函數
– 基本的EL函數
•String firstNotNull(String value1, String value2)
•String concat(String s1, String s2)
•String replaceAll(String src, String regex, String replacement)
•String appendAll(String src, String append, String delimeter)
•String trim(String s)
•String urlEncode(String s)
•String timestamp()
•String toJsonStr(Map) (since Oozie 3.3)
•String toPropertiesStr(Map) (since Oozie 3.3)
•String toConfigurationStr(Map) (since Oozie 3.3)

WorkFlow EL
•String wf:id() – 返回當前workflow做業ID
•String wf:name() – 返回當前workflow做業NAME
•String wf:appPath() – 返回當前workflow的路徑
•String wf:conf(String name) – 獲取當前workflow的完整配置信息
•String wf:user() – 返回啓動當前job的用戶
•String wf:callback(String stateVar) – 返回結點的回調URL,其中參數爲動做指定的退出狀態
•int wf:run() – 返回workflow的運行編號,正常狀態爲0
•Map wf:actionData(String node) – 返回當前節點完成時輸出的信息
•int wf:actionExternalStatus(String node) – 返回當前節點的狀態
•String wf:lastErrorNode() – 返回最後一個ERROR狀態推出的節點名稱
•String wf:errorCode(String node) – 返回指定節點執行job的錯誤碼,沒有則返回空
•String wf:errorMessage(String message) – 返回執行節點執行job的錯誤信息,沒有則返回空

– HDFS EL
•boolean fs:exists(String path)
•boolean fs:isDir(String path)
•long fs:dirSize(String path) – 目錄則返回目錄下全部文件字節數;不然返回-1
•long fs:fileSize(String path) – 文件則返回文件字節數;不然返回-1
•long fs:blockSize(String path) – 文件則返回文件塊的字節數;不然返回-1

(3)節點
– A、流程控制節點
•start – 定義workflow開始
•end – 定義workflow結束
•decision – 實現switch功能
•sub-workflow – 調用子workflow
•kill – 殺死workflow
•fork – 併發執行workflow
•join – 併發執行結束(與fork一塊兒使用)

– B、動做節點
•shell
•java
•fs
•MR
•hive
•sqoop

<decision name="[NODE-NAME]">
<switch>
<case to="[NODE_NAME]">[PREDICATE]</case>
...
<case to="[NODE_NAME]">[PREDICATE]</case>
<default to="[NODE_NAME]" />
</switch>
</decision>


<fork name="[FORK-NODE-NAME]">
<path start="[NODE-NAME]" />
...
<path start="[NODE-NAME]" />
</fork>
...
<join name="[JOIN-NODE-NAME]" to="[NODE-NAME]" />

5、示例
一、Oozie shell

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/workflow/oozie/shell

 

注意:job.properties文件能夠不上傳到hdfs中,是在執行oozie job ...... -config時,批定的linux本地路徑

(2)編寫workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.4" name="shell-wf">
  <start to="shell-node"/>
  <action name="shell-node">
  <shell xmlns="uri:oozie:shell-action:0.2">
  <job-tracker>${jobTracker}</job-tracker>
  <name-node>${nameNode}</name-node>
  <configuration>
  <property>
  <name>mapred.job.queue.name</name>
  <value>${queueName}</value>
  </property>
  </configuration>
  <exec>echo</exec>
  <argument>my_output=Hello Oozie</argument>
  <capture-output/>
  </shell>
  <ok to="check-output"/>
  <error to="fail"/>
  </action>
  <decision name="check-output">
  <switch>
  <case to="end">  ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'}  </case>
  <default to="fail-output"/>
  </switch>
  </decision>
  <kill name="fail">
  <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
  </kill>
  <kill name="fail-output">
  <message>Incorrect output, expected [Hello Oozie] but was [${wf:actionData('shell-node')['my_output']}]</message>
  </kill>
  <end name="end"/>
  </workflow-app>

 

文件上傳到HDFS路徑:hdfs://master:8020/user/workflow/oozie/shell 或者直接在Hue文件瀏覽器下建立和編輯workflow.xml

(3)CLI 執行啓動任務命令,返回一個job ID 

在UI裏查看:

點擊查看詳情:

查看Job DAG


二、Oozie fs

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/fs/workflow.xml

 

(2)編寫workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.2" name="fs">
<start to="fs-node"/>
<action name="fs-node">
<fs>
<delete path='/home/kongc/oozie'/>
<mkdir path='/home/kongc/oozie1'/>
<move source='/home/kongc/spark-application' target='/home/kongc/oozie1'/>
</fs>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

 

三、Oozie Sqoop

(1)編寫job.properties

nameNode=hdfs://master:8020 jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/sqoop

 

#編寫配置文件

#HSQL Database Engine 1.8.0.5 #Tue Oct 05 11:20:19 SGT 2010 hsqldb.script_format=0 runtime.gc_interval=0 sql.enforce_strict_size=false hsqldb.cache_size_scale=8 readonly=false hsqldb.nio_data_file=true hsqldb.cache_scale=14 version=1.8.0 hsqldb.default_table_type=memory hsqldb.cache_file_scale=1 hsqldb.log_size=200 modified=no hsqldb.cache_version=1.7.0 hsqldb.original_version=1.8.0 hsqldb.compatible_version=1.8.0

 

#編寫SQL

CREATE SCHEMA PUBLIC AUTHORIZATION DBA CREATE MEMORY TABLE TT(I INTEGER NOT NULL PRIMARY KEY,S VARCHAR(256)) CREATE USER SA PASSWORD "" GRANT DBA TO SA SET WRITE_DELAY 10
SET SCHEMA PUBLIC
INSERT INTO TT VALUES(1,'a') INSERT INTO TT VALUES(2,'a') INSERT INTO TT VALUES(3,'a')

 

(2)編寫workflow.xml

<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.2" name="sqoop-wf">
<start to="sqoop-node"/>
<action name="sqoop-node">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/sqoop"/>
<mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<command>import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir /user/oozie/${examplesRoot}/output-data/sqoop -m 1</command>
<file>db.hsqldb.properties#db.hsqldb.properties</file>
<file>db.hsqldb.script#db.hsqldb.script</file>
</sqoop>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

四、Oozie Java

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/java-main

(2)編寫workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.2" name="java-main-kc">
<start to="java-node"/>
<action name="java-node">
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>org.apache.oozie.example.DemoJavaMain</main-class>
<arg>Hello</arg>
<arg>Oozie!</arg>
</java>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

 

五、Oozie Hive

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/hive

 

(2)編寫workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.5" name="hive2-wf">
<start to="hive2-node"/>
<action name="hive2-node">
<hive2 xmlns="uri:oozie:hive2-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/hive2"/>
<mkdir path="${nameNode}/user/oozie/${examplesRoot}/output-data"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<jdbc-url>${jdbcURL}</jdbc-url>
<script>script.q</script>
<param>INPUT=/user/oozie/${examplesRoot}/input-data/table</param>
<param>OUTPUT=/user/oozie/${examplesRoot}/output-data/hive2</param>
</hive2>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>


編寫hive腳本

INSERT OVERWRITE DIRECTORY '${OUTPUT}' SELECT * FROM test_machine;

六、Oozie Impala

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/impala EXEC=impala.sh

 

(2)編寫workflow.xml

<workflow-app name="shell-impala" xmlns="uri:oozie:workflow:0.4">
<start to="shell-impala-invalidate"/>
<action name="shell-impala-invalidate">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<exec>${EXEC}</exec>
<file>${EXEC}#${EXEC}</file>
</shell>
<ok to="end"/>
<error to="kill"/>
</action>
<kill name="kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

 

(3)impala.sh

#!/bin/bash impala-shell -i slave2:21000 -q "select count(*) from test_machine" echo 'Hello Shell'

 

七、ozie MapReduce

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/map-reduce/workflow.xml outputDir=map-reduce

 

(2)編寫workflow.xml

<workflow-app xmlns="uri:oozie:workflow:0.2" name="map-reduce-wyl">
<start to="mr-node"/>
<action name="mr-node">
<map-reduce>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/${outputDir}"/>
</prepare>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
<property>
<name>mapred.mapper.class</name>
<value>org.apache.oozie.example.SampleMapper</value>
</property>
<property>
<name>mapred.reducer.class</name>
<value>org.apache.oozie.example.SampleReducer</value>
</property>
<property>
<name>mapred.map.tasks</name>
<value>1</value>
</property>
<property>
<name>mapred.input.dir</name>
<value>/user/oozie/${examplesRoot}/input-data/text</value>
</property>
<property>
<name>mapred.output.dir</name>
<value>/user/oozie/${examplesRoot}/output-data/${outputDir}</value>
</property>
</configuration>
</map-reduce>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Map/Reduce failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

 

八、Oozie Spark

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples #指定oozie使用系統的共享目錄 oozie.use.system.libpath=true #指定workflow.xml所在目錄 oozie.wf.application.path=${nameNode}/user/examples/apps/spark

 

(2)編寫workflow.xml

<workflow-app xmlns='uri:oozie:workflow:0.5' name='SparkFileCopy'>
<start to='spark-node' />
<action name='spark-node'>
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<prepare>
<delete path="${nameNode}/user/oozie/${examplesRoot}/output-data/spark"/>
</prepare>
<master>${master}</master>
<name>Spark-FileCopy</name>
<class>org.apache.oozie.example.SparkFileCopy</class>
<jar>${nameNode}/user/oozie/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>
<arg>${nameNode}/user/oozie/${examplesRoot}/input-data/text/data.txt</arg>
<arg>${nameNode}/user/oozie/${examplesRoot}/output-data/spark</arg>
</spark>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message>
</kill>
<end name='end' />
</workflow-app>

 

九、Oozie 定時任務

 

(1)編寫job.properties

nameNode=hdfs://master:8020
jobTracker=master:8032 queueName=default examplesRoot=examples oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/aggregator/coordinator.xml start=2019-01-01T01:00Z end=2019-01-01T03:00Z

 

(2)編寫coordinator.xml

<coordinator-app name="aggregator-coord" frequency="${coord:hours(1)}" start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.2">
<controls>
<concurrency>1</concurrency>
</controls>
<datasets>
<dataset name="raw-logs" frequency="${coord:minutes(20)}" initial-instance="2010-01-01T00:00Z" timezone="UTC">
<uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
</dataset>
<dataset name="aggregated-logs" frequency="${coord:hours(1)}" initial-instance="2010-01-01T01:00Z" timezone="UTC">
<uri-template>${nameNode}/user/${coord:user()}/${examplesRoot}/output-data/aggregator/aggregatedLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="input" dataset="raw-logs">
<start-instance>${coord:current(-2)}</start-instance>
<end-instance>${coord:current(0)}</end-instance>
</data-in>
</input-events>
<output-events>
<data-out name="output" dataset="aggregated-logs">
<instance>${coord:current(0)}</instance>
</data-out>
</output-events>
<action>
<workflow>
<app-path>${nameNode}/user/${coord:user()}/${examplesRoot}/apps/aggregator</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
<property>
<name>inputData</name>
<value>${coord:dataIn('input')}</value>
</property>
<property>
<name>outputData</name>
<value>${coord:dataOut('output')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>

 

注意事項:
- job.properties文件能夠不上傳到hdfs中,是在執行oozie job ...... -config時,批定的linux本地路徑

- workflow.xml文件,必定要上傳到job.properties的oozie.wf.application.path對應的hdfs目錄下。

- job.properties中的oozie.use.system.libpath=true指定oozie使用系統的共享目錄。

- job.properties中的oozie.libpath=${nameNode}/user/${user.name}/apps/mymr,能夠用來執行mr時,做業導出的jar包存放位置,不然可能報找不到類的錯誤。

- oozie調度做業時,本質也是啓動一個mapreduce做業來調度,workflow.xml中設置的隊列名稱爲調度做業mr的隊列名稱。因此若是想讓做業運行在指定的隊列時,須要在mr或hive中指定好。

 

 

參考:
工做流調度引擎---Oozie
Oozie基礎入門

Hadoop 調度框架
Azkaban 簡介
Azkaban

Azkaban的基礎介紹

相關文章
相關標籤/搜索