Elastic-job-lite 官方概述
Quartz-scheduler java
Job的執行器: LiteJob中execute方法實例化AbstractElasticJobExecutor。git
/** * 簡單分佈式做業接口. * */ public interface SimpleJob extends ElasticJob { /** * 執行做業. * * @param shardingContext 分片上下文 */ void execute(ShardingContext shardingContext); }
/** * 數據流分佈式做業接口. * * * @param <T> 數據類型 */ public interface DataflowJob<T> extends ElasticJob { /** * 獲取待處理數據. * * @param shardingContext 分片上下文 * @return 待處理的數據集合 */ List<T> fetchData(ShardingContext shardingContext); /** * 處理數據. * * @param shardingContext 分片上下文 * @param data 待處理數據集合 */ void processData(ShardingContext shardingContext, List<T> data); }
DataflowJobExecutor: @Override protected void process(final ShardingContext shardingContext) { DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig(); if (dataflowConfig.isStreamingProcess()) { streamingExecute(shardingContext); } else { oneOffExecute(shardingContext); } } private void streamingExecute(final ShardingContext shardingContext) { List<Object> data = fetchData(shardingContext); while (null != data && !data.isEmpty()) { processData(shardingContext, data); if (!getJobFacade().isEligibleForJobRunning()) { break; } data = fetchData(shardingContext); } } private void oneOffExecute(final ShardingContext shardingContext) { List<Object> data = fetchData(shardingContext); if (null != data && !data.isEmpty()) { processData(shardingContext, data); } }
namespace必定不能反斜槓開頭:
初始化curatorFramework實例NamespaceImpl對象時:PathUtils.validatePath("/" + namespace)。github
java.lang.IllegalArgumentException: Invalid namespace: /zookeeper/scheduler/namespace/local at org.apache.curator.framework.imps.NamespaceImpl.<init>(NamespaceImpl.java:48) at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:116) at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145) at com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter.init(ZookeeperRegistryCenter.java:97)
上圖中顯示的<init>標識的是class文件中表明對象的構造方法,在類實例化時調用。apache
public void init() { LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig); JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount()); JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName()); JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter); schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled()); jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron()); }
使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,層層嵌套。設置jobName、cron、shardingTotalCount、shardingItemParameters、failover、misfire、jobClass、monitorExecution等做業屬性。服務器
JobScheduler.init() -> schedulerFacade.registerStartUpInfo(liteJobConfig) 中註冊config節點時,經過overwrite屬性configService.persist(liteJobConfig)斷定是否須要覆蓋zk上的配置。若是設置overwrite爲false,將從zookeeper上獲取配置數據。框架
ConfigurationService: public void persist(final LiteJobConfiguration liteJobConfig) { checkConflictJob(liteJobConfig); if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) { jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig)); } }
啓動任務初始化zk節點信息,開啓zk節點事件監控; 開啓檢測分佈式做業服務分片含離線做業實例;
終止調度時,刪除leader/election/instance臨時節點,關閉監控ServerSocket,關閉檢測一致性服務。分佈式
/** * 註冊做業啓動信息. * * @param enabled 做業是否啓用 */ public void registerStartUpInfo(final boolean enabled) { listenerManager.startAllListeners(); leaderService.electLeader(); serverService.persistOnline(enabled); instanceService.persistOnline(); shardingService.setReshardingFlag(); monitorService.listen(); if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }
在JobRunShell初始化時經過quartz運行原理PropertySettingJobFactory.setBeanProps方法將JobDetail.getJobDataMap()被反射注入到LiteJob中。ide
在任務運行中提供對節點信息獲取或更新的服務。post
JobDetail中指定任務的執行類:quartz.Job。將成員變量的值按名稱存在JobDetail中的JobDataMap中。經過SimpleJobFactory構建,PropertySettingJobFactory設置成員屬性值。fetch
private JobDetail createJobDetail(final String jobClass) { JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build(); result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade); Optional<ElasticJob> elasticJobInstance = createElasticJobInstance(); if (elasticJobInstance.isPresent()) { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get()); } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { try { result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance()); } catch (final ReflectiveOperationException ex) { throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass); } } return result; }
保存scheduler、jobDetail、JobName間的關係。控制做業調度啓動、從新觸發、關閉調度能控制操做。
package com.dangdang.ddframe.job.lite.internal.schedule; import com.dangdang.ddframe.job.exception.JobSystemException; import lombok.RequiredArgsConstructor; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobDetail; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; /** * 做業調度控制器. * * @author zhangliang */ @RequiredArgsConstructor public final class JobScheduleController { private final Scheduler scheduler; // quartzScheduler private final JobDetail jobDetail; // LiteJob private final String triggerIdentity; // JobName /** * 調度做業. * * @param cron CRON表達式 */ public void scheduleJob(final String cron) { try { // RAMJobStore保存了Key的唯一信息 if (!scheduler.checkExists(jobDetail.getKey())) { scheduler.scheduleJob(jobDetail, createTrigger(cron)); } scheduler.start(); } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 從新調度做業. * * @param cron CRON表達式 */ public void rescheduleJob(final String cron) { try { CronTrigger trigger = (CronTrigger) scheduler.getTrigger(TriggerKey.triggerKey(triggerIdentity)); if (!scheduler.isShutdown() && null != trigger && !cron.equals(trigger.getCronExpression())) { scheduler.rescheduleJob(TriggerKey.triggerKey(triggerIdentity), createTrigger(cron)); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } private CronTrigger createTrigger(final String cron) { return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build(); } /** * 暫停做業. */ public void pauseJob() { try { if (!scheduler.isShutdown()) { scheduler.pauseAll(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 恢復做業. */ public void resumeJob() { try { if (!scheduler.isShutdown()) { scheduler.resumeAll(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 馬上啓動做業. */ public void triggerJob() { try { if (!scheduler.isShutdown()) { scheduler.triggerJob(jobDetail.getKey()); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } /** * 關閉調度器. */ public void shutdown() { try { if (!scheduler.isShutdown()) { scheduler.shutdown(); } } catch (final SchedulerException ex) { throw new JobSystemException(ex); } } }
init:
running:
org.quartz.core.QuartzSchedulerThread:
經過Object.wait(long timeout)的方式阻塞執行,循環建立org.quartz.core.JobRunShell對象並初始化JobExecutionContextImpl的過程。JobExecutionContextImpl對象中包含經過JobFactory建立出來的org.quartz.Job實例對象LitJob。經過JobDetail.getJobDataMap()反射注入的elasticJob和JobFacade調用JobExecutorFactory建立AbstractElasticJobExecutor實例,執行execute()。
AbstractElasticJobExecutor: /** * 執行做業. */ public final void execute() { try { jobFacade.checkJobExecutionEnvironment(); } catch (final JobExecutionEnvironmentException cause) { jobExceptionHandler.handleException(jobName, cause); } ShardingContexts shardingContexts = jobFacade.getShardingContexts(); if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName)); } if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) { if (shardingContexts.isAllowSendJobEvent()) { jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format( "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, shardingContexts.getShardingItemParameters().keySet())); } return; } try { jobFacade.beforeJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER); while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) { jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet()); execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE); } jobFacade.failoverIfNecessary(); try { jobFacade.afterJobExecuted(shardingContexts); //CHECKSTYLE:OFF } catch (final Throwable cause) { //CHECKSTYLE:ON jobExceptionHandler.handleException(jobName, cause); } }
配置信息保存在zk的${namespaces}/${jobUniqueId}/config節點上。
當config節點數據變更時,觸發CronSettingAndJobEventChangedJobListener事件響應,並從新生成org.quartz.impl.StdScheduler的CronTrigger。調用scheduler.rescheduleJob方法。
public class QuartzScheduler implements RemotableQuartzScheduler: public Date rescheduleJob(TriggerKey triggerKey, Trigger newTrigger) throws SchedulerException { validateState(); if (triggerKey == null) { throw new IllegalArgumentException("triggerKey cannot be null"); } if (newTrigger == null) { throw new IllegalArgumentException("newTrigger cannot be null"); } OperableTrigger trig = (OperableTrigger)newTrigger; Trigger oldTrigger = getTrigger(triggerKey); if (oldTrigger == null) { return null; } else { trig.setJobKey(oldTrigger.getJobKey()); } trig.validate(); Calendar cal = null; if (newTrigger.getCalendarName() != null) { cal = resources.getJobStore().retrieveCalendar( newTrigger.getCalendarName()); } Date ft = trig.computeFirstFireTime(cal); if (ft == null) { throw new SchedulerException( "Based on configured schedule, the given trigger will never fire."); } if (resources.getJobStore().replaceTrigger(triggerKey, trig)) { notifySchedulerThread(newTrigger.getNextFireTime().getTime()); notifySchedulerListenersUnscheduled(triggerKey); notifySchedulerListenersSchduled(newTrigger); } else { return null; } return ft; }
TriggerListenerManager管理JobTriggerStatusJobListener來響應手動觸發任務的執行。
class JobTriggerStatusJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { //斷定事件類型、觸發類型、觸發的執行實例是不是本機服務的實例 if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_UPDATED != eventType) { return; } instanceService.clearTriggerFlag(); if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) { // TODO 目前是做業運行時不能觸發, 將來改成堆積式觸發 JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob(); } } }
在AbstractElasticJobExecutor.execute()中
JobFacade /** * 做業執行前的執行的方法. * * @param shardingContexts 分片上下文 */ void beforeJobExecuted(ShardingContexts shardingContexts); /** * 做業執行後的執行的方法. * * @param shardingContexts 分片上下文 */ void afterJobExecuted(ShardingContexts shardingContexts); /** * 發佈執行事件. * * @param jobExecutionEvent 做業執行事件 */ void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent); /** * 發佈做業狀態追蹤事件. * * @param taskId 做業Id * @param state 做業執行狀態 * @param message 做業執行消息 */ void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message);
做業服務時按IP在${namespaces}/${jobUniqueId}/servers節點註冊。同名的做業只會生成唯一的jobInstanceId,生成規則與當前做業服務器JVM的進程ID有關。 JobRegistry用ConcurrentHashMap保存jobName與JobInstance、JobScheduleController 值對關係。會進行更行操做,雖然對象是新內存,但jobInstanceId和scheduler是同一個。
做業框架只負責將分片合理的分配給相關的做業服務器,做業服務器需根據所分配的分片匹配數據進行處理。將真實數據和邏輯分片對應,用於解耦做業框架和數據的關係。分片是發現服務器波動,或修改分片總數,將標記一個狀態,而非直接分片。
設置shardingTotalCount、shardingItemParameters 信息。
將分片項設置爲1,並使用多於1臺的服務器執行做業,做業將會以1主n從的方式執行。一旦執行做業的服務器崩潰,等待執行的服務器將會在下次做業啓動時替補執行。開啓失效轉移和監控功能,能夠保證在本次做業執行時崩潰,備機當即啓動替補執行(monitorExecution = true && failover =true)。
將任務拆分爲n個任務項後,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集羣,或現有服務器下線,在保留本次任務執行不變的狀況下,下次任務開始前觸發任務重分片。