一個簡單的使用Quartz和Oozie調度做業給大數據計算平臺執行

 

一,介紹html

Oozie是一個基於Hadoop的工做流調度器,它能夠經過Oozie Client 以編程的形式提交不一樣類型的做業,如MapReduce做業和Spark做業給底層的計算平臺(如 Cloudera Hadoop)執行。java

Quartz是一個開源的調度軟件,它爲任務的調度執行提供了各類觸發器以及監聽器git

下面使用Quartz + Oozie 將一個MapReduce程序提交給Cloudera Hadoop執行github

二,調度思路編程

①爲何要用Quartz呢?主要是藉助Quartz強大的觸發器功能。它能夠容許知足不一樣的調度需求,如每週執行做業一次、重複執行做業多少次。這裏有一個重要的問題:假設我有一個做業須要重複執行,當第一次把該做業提交到CDH上執行後,之後須要執行該做業時再也不是又一次把該做業上傳到CDH上而後執行,而是把提交過的做業記錄下來,下次須要運行時,直接讓CDH再運行該做業。ide

②使用Quartz還有一個好處就是:在做業提交的時候能夠作一些控制。好比,某種類型的做業提交的頻率很高,或者運行時間較短(根據它上次執行完的狀況來判斷),那麼下次運行它時,讓它具備更高的優先級。函數

③使用Oozie的目的很明確,就是讓它把做業發送給底層的計算平臺,如CDH去執行做業。oop

三,Eclipse開發環境搭建測試

主要是須要Quartz和Oozie的依賴包。具體以下:ui

四,實現思路

a) 調度系統目前只考慮調度兩種類型的做業:Mapreduce做業和Spark做業。先把這二種做業經過Quartz傳遞給Oozie,而後再讓Oozie把做業提交給CDH計算平臺去執行。

b) Quartz提供了一個公共的Job接口。裏面只有一個execute()方法,該方法負責完成Quartz所調度的做業的具體功能:把做業傳遞給Oozie

c) 定義一個抽象類BaseJob,它裏面定義了二個方法。這二個方法主要是用來作一些準備工做,即便用Quartz把做業傳遞給Oozie時須要找到做業在HDFS上的存儲目錄,並將之複製執行目錄下。

d) 最後是兩個具體的實現類,MRJob和SparkJob,它們分別表明Mapreduce做業和Spark做業。在實現類裏面完成做業的配置,而後將做業提交到CDH計算平臺上執行。

相關類圖以下:

 

 

五,具體代碼分析

MRJob.java

實現了org.quartz.Job接口的execute(),該方法當觸發器被觸發時,會自動地被Quartz Schedule 調度執行。這樣,就能夠根據須要定義觸發器,控制做業什麼時候提交給Oozie。

 

@Override
	public void execute(JobExecutionContext arg0) throws JobExecutionException {
		try{
			String jobId = wc.run(conf);//submit job to oozie and get the jobId
			System.out.println("Workflow job submitted");
			
			//wait until the workflow job finishes
			while(wc.getJobInfo(jobId).getStatus() == Status.RUNNING){
				System.out.println("Workflow job running...");
				try{
					Thread.sleep(10*1000);
				}catch(InterruptedException e){e.printStackTrace();}
			}
			System.out.println("Workflow job completed!");
			System.out.println(wc.getJobId(jobId));
		}catch(OozieClientException e){e.printStackTrace();}
	}

 

 

測試的main函數程序以下:能夠看出對於客戶端而言,只須要按照編寫常規的Quartz做業方式,就能夠調試MapReduce做業了。要想運行該程序,固然還得提早準備到做業的運行環境。具體參考

 

import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;

import java.util.Date;

import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleTrigger;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.quartz.job.MRJob;


public class QuartzOozieJobTest {
	public static void main(String[] args) throws Exception{
		QuartzOozieJobTest test = new QuartzOozieJobTest();
		test.run();
	}
	
	public void run() throws Exception{
		Logger log = LoggerFactory.getLogger(QuartzOozieJobTest.class);

		log.info("------- Initializing ----------------------");

		SchedulerFactory sf = new StdSchedulerFactory();
		Scheduler sched = sf.getScheduler();
		
		long startTime = System.currentTimeMillis() + 20000L;
		Date startTriggerTime = new Date(startTime);
		
		JobDetail jobDetail = newJob(MRJob.class).withIdentity("job", "group1").build();
	    SimpleTrigger trigger = (SimpleTrigger) newTrigger().withIdentity("trigger", "group1").startAt(startTriggerTime).build();
	    
	    Date ft = sched.scheduleJob(jobDetail, trigger);
	    
	    log.info(jobDetail.getKey() + " will submit at " + ft + " only once.");
	    
	    sched.start();
//	    sched.shutdown(true);
	}
}

整個項目的源代碼下載 :https://github.com/hapjin/JAVA/tree/master/oozie-quartz

相關文章
相關標籤/搜索