本文討論均基於elasticjob的2.0.5-SNAPSHOT版本java
先從官網elastic-job-example-lite-java的demo開始分析,從com.dangdang.ddframe.job.example.JavaMain開始分析。linux
如今看來註冊中心的實現只有zk的實現,對於zk的操做使用了Apache的curator,demo中使用了內嵌TestingServer
。git
DataSource
JobEventRdbConfiguration
暫且不是很清楚其做用,從類名能夠大致猜出是把job的運行記錄寫入數據庫中。github
很喜歡Builder模式啊 :)數據庫
核心config包含建立出下述中不一樣job類型的公有屬性,是對config的更高一層抽象。根據其newBuilder
方法可知,有三個參數是必須的apache
屬性名 | 類型 | 是否必須 | 說明 | 備註 |
---|---|---|---|---|
jobName | String | 是 | 做業名稱 | 暫時不清楚是否須要惟一 |
cron | String | 是 | cron表達式 | |
shardingTotalCount | int | 是 | 做業分片總數 | |
shardingItemParameters | String | 否 | 自定義分片參數 | 後續詳細介紹 |
jobParameter | String | 否 | job參數 | |
failover | boolean | 否 | 是否失敗重試 | 之前看到的是利用zk實現? |
misfire | boolean | 否 | 是否啞火重試 | 利用quartz實現? |
description | String | 否 | 描述 | |
jobProperties | JobProperties | 否 | job額外的參數一個Map |
從上圖可知要建立出不一樣類型的job,先要建立不一樣類型的jobconfig,這裏先以SimpleJob
爲例,建立SimpleJobConfiguration
須要兩個參數,一個是上述中的JobCoreConfiguration
;另外一個是你寫的業務邏輯的job實現類的權限定類名,由於這裏以SimpleJob
爲例,因此這個類必須繼承SimpleJob
,類繼承圖以下:json
SimpleJobConfiguration
屬性以下:服務器
屬性名 | 類型 | 是否必須 | 說明 | 備註 |
---|---|---|---|---|
coreConfig | JobCoreConfiguration | 是 | 核心屬性 | |
jobType | JobType | 是 | job類型 | 以Simple爲例則爲JobType.SIMPLE |
jobClass | String | 是 | job的全限定類名 |
建立JobScheduler
須要註冊中心CoordinatorRegistryCenter
,任務相關配置LiteJobConfiguration
,事件配置相關JobEventConfiguration
,job監聽器ElasticJobListener
。多線程
說說其中的LiteJobConfiguration
,合分合,這樣作多是語義和邏輯上的清晰,其屬性具體意義可參考其Builder中的註釋。異步
建立過程過於複雜,不得不貼代碼分析了。
private JobScheduler(...) { jobName = liteJobConfig.getJobName(); //1 jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners); //2 jobFacade = new LiteJobFacade(regCenter, jobName, Arrays.asList(elasticJobListeners), jobEventBus); //3 jobRegistry = JobRegistry.getInstance(); }
建立了做業執行器JobExecutor
,內部服務門面類LiteJobFacade
以及做業註冊表JobRegistry
。
public JobExecutor(...) { //... //1 setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList); //2 schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList); }
1處給全部實現AbstractDistributeOnceElasticJobListener
的類添加GuaranteeService
,每次執行先後** 全部 **分片項都註冊到zk上,若是都註冊成功,才執行實現類裏面的doBeforeJobExecutedAtLastStarted
方法或者doAfterJobExecutedAtLastCompleted
,而後清除zk上剛纔註冊的節點。
2處建立爲調度器提供內部服務的門面類SchedulerFacade
SchedulerFacade
建立public SchedulerFacade(...) { //1 configService = new ConfigurationService(regCenter, jobName); //2 leaderElectionService = new LeaderElectionService(regCenter, jobName); //3 serverService = new ServerService(regCenter, jobName); //4 shardingService = new ShardingService(regCenter, jobName); //5 executionService = new ExecutionService(regCenter, jobName); //6 monitorService = new MonitorService(regCenter, jobName); //7 listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners); }
建立了各類服務類,這些服務大部分都在zk上有對應的節點信息。zk節點詳細解釋參考官網說明
config
,值爲json格式的LiteJobConfiguration
leader
,用於選擇到底哪臺服務器執行servers
,先暫時用ip區分,記錄服務hostname,狀態及分片信息。execution
,根據分片號區分,存儲具體的分片執行狀況。 此處可看出misfire還要藉助zkecho dump|nc ip port
LiteJobFacade
public LiteJobFacade(...) { configService = new ConfigurationService(regCenter, jobName); shardingService = new ShardingService(regCenter, jobName); serverService = new ServerService(regCenter, jobName); executionContextService = new ExecutionContextService(regCenter, jobName); executionService = new ExecutionService(regCenter, jobName); failoverService = new FailoverService(regCenter, jobName); this.elasticJobListeners = elasticJobListeners; this.jobEventBus = jobEventBus; }
這也是一個門面類,相對於上面的SchedulerFacade
,多了幾個服務
FailoverService
失敗重試服務,在zk的根節點爲failover
,一旦該節點出現,相應的節點監聽器就會執行。JobEventBus
異步的事件總線,如今來看只註冊了記錄任務執行蹤影的寫數據庫事件JobRegistry
做業註冊表用map將jobName爲key,做業調度控制器JobScheduleController
爲value存於內存中,後期還能夠再用key取出控制器進行job的觸發、暫停等操做。可見這只是針對於某臺機器的,而對於分佈式job是有限制的,而且上述中提出的jobName是否可重複,在此處能夠推斷出設計上是要保證惟一性的。
public void init() { jobExecutor.init(); JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig(); JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName); jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron()); jobRegistry.addJobScheduleController(jobName, jobScheduleController); }
public void init() { schedulerFacade.clearPreviousServerStatus(); regCenter.addCacheData("/" + liteJobConfig.getJobName()); schedulerFacade.registerStartUpInfo(liteJobConfig); }
/** * 註冊Elastic-Job啓動信息. * * @param liteJobConfig 做業配置 */ public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) { listenerManager.startAllListeners(); leaderElectionService.leaderForceElection(); configService.persist(liteJobConfig); serverService.persistServerOnline(!liteJobConfig.isDisabled()); serverService.clearJobPausedStatus(); shardingService.setReshardingFlag(); monitorService.listen(); listenerManager.setCurrentShardingTotalCount(configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount()); }
LeaderElectionJobListener
->leaderElectionService
leader/host=ipShardingTotalCountChangedJobListener
和servers節點變化ListenServersChangedJobListener
JobCrashedJobListener
必須知足變化的節點以running結尾,而且是刪除事件&&execution/分片號/completed節點存在,配置信息failover爲true;FailoverJobCrashedJobListener
必須知足變化的節點以failover結尾,而且是刪除事件&&execution/分片號/completed節點存在,配置信息failover爲true;FailoverSettingsChangedJobListener觸發條件config變化ListenerManager
的總分片數。JobScheduleController
public void init() { //... JobScheduleController jobScheduleController = new JobScheduleController( createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName); //... }
Scheduler
private Scheduler createScheduler(final boolean isMisfire) { Scheduler result; try { StdSchedulerFactory factory = new StdSchedulerFactory(); factory.initialize(getBaseQuartzProperties(isMisfire)); result = factory.getScheduler(); result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener()); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } return result; }
從這裏能夠看出misfire的具體流程是quartz監聽到某任務發生misfire,觸發監聽器JobTriggerListener
的triggerMisfired方法,而後將misfire節點註冊到zk上,zk的TreeCach監聽到節點變化作出相應處理。
JobDetail
private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); //這個地方有什麼用???? Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; }
LiteJob
是繼承自quartz的Job類,實現了execute方法,有兩個屬性一個ElasticJob
是你本身實現的業務邏輯的類,另外一個是JobFacade
門面類。execute方法其實調用了executor的execute方法,下面是executor的繼承類。
分片等處理都已經在抽象類中實現好了,須要實現的是 protected abstract void process(ShardingContext shardingContext);
對於每一個分片是多線程調用,代碼以下:
for (final int each : items) { final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each); if (executorService.isShutdown()) { return; } executorService.submit(new Runnable() { @Override public void run() { try { process(shardingContexts, each, jobExecutionEvent); } finally { latch.countDown(); } } }); } try { latch.await(); } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); }
public void init() { //... jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron()); jobRegistry.addJobScheduleController(jobName, jobScheduleController); }
根據cron表達式開始啓動任務調度,而且把控制器放入註冊表中。