一,介紹html
Oozie是一個基於Hadoop的工做流調度器,它能夠經過Oozie Client 以編程的形式提交不一樣類型的做業,如MapReduce做業和Spark做業給底層的計算平臺(如 Cloudera Hadoop)執行。java
Quartz是一個開源的調度軟件,它爲任務的調度執行提供了各類觸發器以及監聽器git
下面使用Quartz + Oozie 將一個MapReduce程序提交給Cloudera Hadoop執行github
二,調度思路編程
①爲何要用Quartz呢?主要是藉助Quartz強大的觸發器功能。它能夠容許知足不一樣的調度需求,如每週執行做業一次、重複執行做業多少次。這裏有一個重要的問題:假設我有一個做業須要重複執行,當第一次把該做業提交到CDH上執行後,之後須要執行該做業時再也不是又一次把該做業上傳到CDH上而後執行,而是把提交過的做業記錄下來,下次須要運行時,直接讓CDH再運行該做業。ide
②使用Quartz還有一個好處就是:在做業提交的時候能夠作一些控制。好比,某種類型的做業提交的頻率很高,或者運行時間較短(根據它上次執行完的狀況來判斷),那麼下次運行它時,讓它具備更高的優先級。函數
③使用Oozie的目的很明確,就是讓它把做業發送給底層的計算平臺,如CDH去執行做業。oop
三,Eclipse開發環境搭建測試
主要是須要Quartz和Oozie的依賴包。具體以下:ui
四,實現思路
a) 調度系統目前只考慮調度兩種類型的做業:Mapreduce做業和Spark做業。先把這二種做業經過Quartz傳遞給Oozie,而後再讓Oozie把做業提交給CDH計算平臺去執行。
b) Quartz提供了一個公共的Job接口。裏面只有一個execute()方法,該方法負責完成Quartz所調度的做業的具體功能:把做業傳遞給Oozie
c) 定義一個抽象類BaseJob,它裏面定義了二個方法。這二個方法主要是用來作一些準備工做,即便用Quartz把做業傳遞給Oozie時須要找到做業在HDFS上的存儲目錄,並將之複製執行目錄下。
d) 最後是兩個具體的實現類,MRJob和SparkJob,它們分別表明Mapreduce做業和Spark做業。在實現類裏面完成做業的配置,而後將做業提交到CDH計算平臺上執行。
相關類圖以下:
五,具體代碼分析
MRJob.java
實現了org.quartz.Job接口的execute(),該方法當觸發器被觸發時,會自動地被Quartz Schedule 調度執行。這樣,就能夠根據須要定義觸發器,控制做業什麼時候提交給Oozie。
@Override public void execute(JobExecutionContext arg0) throws JobExecutionException { try{ String jobId = wc.run(conf);//submit job to oozie and get the jobId System.out.println("Workflow job submitted"); //wait until the workflow job finishes while(wc.getJobInfo(jobId).getStatus() == Status.RUNNING){ System.out.println("Workflow job running..."); try{ Thread.sleep(10*1000); }catch(InterruptedException e){e.printStackTrace();} } System.out.println("Workflow job completed!"); System.out.println(wc.getJobId(jobId)); }catch(OozieClientException e){e.printStackTrace();} }
測試的main函數程序以下:能夠看出對於客戶端而言,只須要按照編寫常規的Quartz做業方式,就能夠調試MapReduce做業了。要想運行該程序,固然還得提早準備到做業的運行環境。具體參考
import static org.quartz.JobBuilder.newJob; import static org.quartz.TriggerBuilder.newTrigger; import java.util.Date; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerFactory; import org.quartz.SimpleTrigger; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.quartz.job.MRJob; public class QuartzOozieJobTest { public static void main(String[] args) throws Exception{ QuartzOozieJobTest test = new QuartzOozieJobTest(); test.run(); } public void run() throws Exception{ Logger log = LoggerFactory.getLogger(QuartzOozieJobTest.class); log.info("------- Initializing ----------------------"); SchedulerFactory sf = new StdSchedulerFactory(); Scheduler sched = sf.getScheduler(); long startTime = System.currentTimeMillis() + 20000L; Date startTriggerTime = new Date(startTime); JobDetail jobDetail = newJob(MRJob.class).withIdentity("job", "group1").build(); SimpleTrigger trigger = (SimpleTrigger) newTrigger().withIdentity("trigger", "group1").startAt(startTriggerTime).build(); Date ft = sched.scheduleJob(jobDetail, trigger); log.info(jobDetail.getKey() + " will submit at " + ft + " only once."); sched.start(); // sched.shutdown(true); } }
整個項目的源代碼下載 :https://github.com/hapjin/JAVA/tree/master/oozie-quartz