Elastic-job-lite 2.1.3 代碼詳解

Elastic-job-lite 官方概述
Quartz-scheduler java

Job類型

Job的執行器: LiteJob中execute方法實例化AbstractElasticJobExecutor。git

  • ScriptJob  :  ScriptJobExecutor
  • SimpleJob  :   SimpleJobExecutor
    /**
     * 簡單分佈式做業接口.
     * 
     */
    public interface SimpleJob extends ElasticJob {
        
        /**
         * 執行做業.
         *
         * @param shardingContext 分片上下文
         */
        void execute(ShardingContext shardingContext);
    }
  • DataflowJob: DataflowJobExecutor。當開啓streamingProcess時,當fetchData方法獲取數據不爲空時,將循環執行。
  • /**
     * 數據流分佈式做業接口.
     * 
     * 
     * @param <T> 數據類型
     */
    public interface DataflowJob<T> extends ElasticJob {
        
        /**
         * 獲取待處理數據.
         *
         * @param shardingContext 分片上下文
         * @return 待處理的數據集合
         */
        List<T> fetchData(ShardingContext shardingContext);
        
        /**
         * 處理數據.
         *
         * @param shardingContext 分片上下文
         * @param data 待處理數據集合
         */
        void processData(ShardingContext shardingContext, List<T> data);
    }
    DataflowJobExecutor:
       @Override
        protected void process(final ShardingContext shardingContext) {
            DataflowJobConfiguration dataflowConfig = (DataflowJobConfiguration) getJobRootConfig().getTypeConfig();
            if (dataflowConfig.isStreamingProcess()) {
                streamingExecute(shardingContext);
            } else {
                oneOffExecute(shardingContext);
            }
        }
    
    private void streamingExecute(final ShardingContext shardingContext) {
        List<Object> data = fetchData(shardingContext);
        while (null != data && !data.isEmpty()) {
            processData(shardingContext, data);
            if (!getJobFacade().isEligibleForJobRunning()) {
                break;
            }
            data = fetchData(shardingContext);
        }
    }
    
    private void oneOffExecute(final ShardingContext shardingContext) {
        List<Object> data = fetchData(shardingContext);
        if (null != data && !data.isEmpty()) {
            processData(shardingContext, data);
        }
    }

初始化過程

ZookeeperRegistryCenter

  • ZookeeperConfiguration:設置serverLists<包括IP地址和端口號,多個地址用逗號分隔>、namespace、digest等鏈接zookeeper的屬性。
  • CuratorFramework: 鏈接zk客戶端

namespace必定不能反斜槓開頭:
初始化curatorFramework實例NamespaceImpl對象時:PathUtils.validatePath("/" + namespace)。
github

java.lang.IllegalArgumentException: Invalid namespace: /zookeeper/scheduler/namespace/local
	at org.apache.curator.framework.imps.NamespaceImpl.<init>(NamespaceImpl.java:48)
	at org.apache.curator.framework.imps.CuratorFrameworkImpl.<init>(CuratorFrameworkImpl.java:116)
	at org.apache.curator.framework.CuratorFrameworkFactory$Builder.build(CuratorFrameworkFactory.java:145)
	at com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter.init(ZookeeperRegistryCenter.java:97)

上圖中顯示的<init>標識的是class文件中表明對象的構造方法,在類實例化時調用。apache

JobScheduler<SpringJobScheduler>

  1. 建立config節點,保存配置信息。若是沒有設置overwrite爲true, 以zk爲準。
  2. 依quartz框架建立scheduler、JobDetail 實例對象,並封裝入JobScheduleController。
  3. JobRegistry保存任務的當前分片總數,保存<jobName, jobInstance>和<jobName, jobScheduleController>等映射。
  4. 持久化任務各功能節點,並給指定節點路徑爲"/${jobName}"的TreeCache添加各功能的監控TreeCacheListener。
  5. JobScheduleController調用scheduler.scheduleJob(jobDetail, createTrigger(cron))開始任務
public void init() {
	LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
	JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(),
			liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
	JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(),
			createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()),
			liteJobConfigFromRegCenter.getJobName());
	JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController,
			regCenter);
	schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
	jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
}

LiteJobConfiguration

使用JobTypeConfiguration,JobTypeConfiguration使用JobCoreConfiguration,層層嵌套。設置jobName、cron、shardingTotalCount、shardingItemParameters、failover、misfire、jobClass、monitorExecution等做業屬性。服務器

JobScheduler.init() -> schedulerFacade.registerStartUpInfo(liteJobConfig) 中註冊config節點時,經過overwrite屬性configService.persist(liteJobConfig)斷定是否須要覆蓋zk上的配置。若是設置overwrite爲false,將從zookeeper上獲取配置數據。框架

ConfigurationService:

public void persist(final LiteJobConfiguration liteJobConfig) {
        checkConflictJob(liteJobConfig);
        if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
            jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
        }
    }

SchedulerFacade

啓動任務初始化zk節點信息,開啓zk節點事件監控; 開啓檢測分佈式做業服務分片含離線做業實例;
終止調度時,刪除leader/election/instance臨時節點,關閉監控ServerSocket,關閉檢測一致性服務。分佈式

/**
 * 註冊做業啓動信息.
 * 
 * @param enabled 做業是否啓用
 */
public void registerStartUpInfo(final boolean enabled) {
    listenerManager.startAllListeners();
    leaderService.electLeader();
    serverService.persistOnline(enabled);
    instanceService.persistOnline();
    shardingService.setReshardingFlag();
    monitorService.listen();
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

LiteJobFacade

在JobRunShell初始化時經過quartz運行原理PropertySettingJobFactory.setBeanProps方法將JobDetail.getJobDataMap()被反射注入到LiteJob中。ide

在任務運行中提供對節點信息獲取或更新的服務。post

LiteJob

JobDetail中指定任務的執行類:quartz.Job。將成員變量的值按名稱存在JobDetail中的JobDataMap中。經過SimpleJobFactory構建,PropertySettingJobFactory設置成員屬性值。fetch

private JobDetail createJobDetail(final String jobClass) {
     JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
     result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
     Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
     if (elasticJobInstance.isPresent()) {
         result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
     } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
         try {
             result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
         } catch (final ReflectiveOperationException ex) {
             throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
         }
     }
     return result;
 }

JobScheduleController

保存scheduler、jobDetail、JobName間的關係。控制做業調度啓動、從新觸發、關閉調度能控制操做。

package com.dangdang.ddframe.job.lite.internal.schedule;

import com.dangdang.ddframe.job.exception.JobSystemException;
import lombok.RequiredArgsConstructor;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.TriggerKey;

/**
 * 做業調度控制器.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
public final class JobScheduleController {
    
    private final Scheduler scheduler;  // quartzScheduler
    
    private final JobDetail jobDetail;  // LiteJob
    
    private final String triggerIdentity; // JobName
    
    /**
     * 調度做業.
     * 
     * @param cron CRON表達式
     */
    public void scheduleJob(final String cron) {
        try {
            // RAMJobStore保存了Key的唯一信息
            if (!scheduler.checkExists(jobDetail.getKey())) { 
                scheduler.scheduleJob(jobDetail, createTrigger(cron));
            }
            scheduler.start();
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    /**
     * 從新調度做業.
     * 
     * @param cron CRON表達式
     */
    public void rescheduleJob(final String cron) {
        try {
            CronTrigger trigger = (CronTrigger) scheduler.getTrigger(TriggerKey.triggerKey(triggerIdentity));
            if (!scheduler.isShutdown() && null != trigger && !cron.equals(trigger.getCronExpression())) {
                scheduler.rescheduleJob(TriggerKey.triggerKey(triggerIdentity), createTrigger(cron));
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    private CronTrigger createTrigger(final String cron) {
        return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build();
    }
    
    /**
     * 暫停做業.
     */
    public void pauseJob() {
        try {
            if (!scheduler.isShutdown()) {
                scheduler.pauseAll();
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    /**
     * 恢復做業.
     */
    public void resumeJob() {
        try {
            if (!scheduler.isShutdown()) {
                scheduler.resumeAll();
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    /**
     * 馬上啓動做業.
     */
    public void triggerJob() {
        try {
            if (!scheduler.isShutdown()) {
                scheduler.triggerJob(jobDetail.getKey());
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
    
    /**
     * 關閉調度器.
     */
    public void shutdown() {
        try {
            if (!scheduler.isShutdown()) {
                scheduler.shutdown();
            }
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
    }
}

 

執行過程

init:

running:


org.quartz.core.QuartzSchedulerThread:

經過Object.wait(long timeout)的方式阻塞執行,循環建立org.quartz.core.JobRunShell對象並初始化JobExecutionContextImpl的過程。JobExecutionContextImpl對象中包含經過JobFactory建立出來的org.quartz.Job實例對象LitJob。經過JobDetail.getJobDataMap()反射注入的elasticJob和JobFacade調用JobExecutorFactory建立AbstractElasticJobExecutor實例,執行execute()。

  1. 經過建立或更新「systemTime/current」節點獲取註冊中心建立時間。檢查本機與建立時間偏差秒數是否在容許範圍,不在則所拋出的異常;
  2. 斷定是否開啓失效轉移,若是開啓,獲取運行在本做業服務器的失效轉移序列號,不然執行分片過程,獲取過濾掉禁用的正常分片。
  3. 若是當前分片項仍在運行則設置任務被錯過執行的標記。
  4. 建立sharding/${itemIndex}/running節點。
  5. ExecutionSource.NORMAL_TRIGGER執行方式依據ShardingContexts來調用AbstractElasticJobExecutor的process方法,最終調用elasticJob的execute方法。當多個分片時,使用線程池處理,並CountDownLatch等待全部任務都執行完成;
  6. 斷定判斷做業是否須要執行錯過的任務,如果,則按ExecutionSource.MISFIRE執行方式,並清除任務被錯過執行的標記;
  7. 斷定是否須要分片轉移(leader/failover/items下存在分片節點,且當前服務空閒)。如果則執行FailoverLeaderExecutionCallback
    1. ${namespaces}/${jobUniqueId}/leader/failover/latch下生成失效節點。
    2. 填充sharding/${itemIndex}/failover數據爲當前執行的jobInstanceId;
    3. 清除leader/failover/items/${itemIndex}標記並從新觸發任務。
  8. finally:
    1. 刪除sharding/${itemIndex}/running節點;
    2. 如有failover,則刪除sharding/${itemIndex}/failover。
AbstractElasticJobExecutor:
/**
 * 執行做業.
 */
public final void execute() {
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobExceptionHandler.handleException(jobName, cause);
    }
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    if (shardingContexts.isAllowSendJobEvent()) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
    }
    if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        if (shardingContexts.isAllowSendJobEvent()) {
            jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
                    "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName, 
                    shardingContexts.getShardingItemParameters().keySet()));
        }
        return;
    }
    try {
        jobFacade.beforeJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
    execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
    }
    jobFacade.failoverIfNecessary();
    try {
        jobFacade.afterJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobExceptionHandler.handleException(jobName, cause);
    }
}

修改config配置

配置信息保存在zk的${namespaces}/${jobUniqueId}/config節點上。

當config節點數據變更時,觸發CronSettingAndJobEventChangedJobListener事件響應,並從新生成org.quartz.impl.StdScheduler的CronTrigger。調用scheduler.rescheduleJob方法。

public class QuartzScheduler implements RemotableQuartzScheduler:
 

public Date rescheduleJob(TriggerKey triggerKey,
            Trigger newTrigger) throws SchedulerException {
        validateState();

        if (triggerKey == null) {
            throw new IllegalArgumentException("triggerKey cannot be null");
        }
        if (newTrigger == null) {
            throw new IllegalArgumentException("newTrigger cannot be null");
        }

        OperableTrigger trig = (OperableTrigger)newTrigger;
        Trigger oldTrigger = getTrigger(triggerKey);
        if (oldTrigger == null) {
            return null;
        } else {
            trig.setJobKey(oldTrigger.getJobKey());
        }
        trig.validate();

        Calendar cal = null;
        if (newTrigger.getCalendarName() != null) {
            cal = resources.getJobStore().retrieveCalendar(
                    newTrigger.getCalendarName());
        }
        Date ft = trig.computeFirstFireTime(cal);

        if (ft == null) {
            throw new SchedulerException(
                    "Based on configured schedule, the given trigger will never fire.");
        }
        
        if (resources.getJobStore().replaceTrigger(triggerKey, trig)) {
            notifySchedulerThread(newTrigger.getNextFireTime().getTime());
            notifySchedulerListenersUnscheduled(triggerKey);
            notifySchedulerListenersSchduled(newTrigger);
        } else {
            return null;
        }

        return ft;
        
    }

 

手動觸發任務

TriggerListenerManager管理JobTriggerStatusJobListener來響應手動觸發任務的執行。

class JobTriggerStatusJobListener extends AbstractJobListener {
    
    @Override
    protected void dataChanged(final String path, final Type eventType, final String data) {
       //斷定事件類型、觸發類型、觸發的執行實例是不是本機服務的實例
	   if (!InstanceOperation.TRIGGER.name().equals(data) || !instanceNode.isLocalInstancePath(path) || Type.NODE_UPDATED != eventType) {
            return;
        }
        instanceService.clearTriggerFlag();
        if (!JobRegistry.getInstance().isShutdown(jobName) && !JobRegistry.getInstance().isJobRunning(jobName)) {
            // TODO 目前是做業運行時不能觸發, 將來改成堆積式觸發
            JobRegistry.getInstance().getJobScheduleController(jobName).triggerJob();
        }
    }
}

擴展接口

在AbstractElasticJobExecutor.execute()中   

  • ElasticJobListener  彈性化分佈式做業監聽器接口
    先執行JobFacade.beforeJobExecuted調用ElasticJobListener.beforeJobExecuted;再執行process();最後執行JobFacade.afterJobExecuted調用ElasticJobListener.afterJobExecuted。
  • JobEventBus 運行事件總線
    先註冊JobEventListener,方法上有@Subscribe的按< 參數對象(Event),Collection<實例JobEventListener對象,方法名稱>> 形式保存在EventBus中。在執行過程當中,提供JobExecutionEvent和JobStatusTraceEvent兩種JobEvent事件來publish到總線上進行處理。
JobFacade  

/**
   * 做業執行前的執行的方法.
   *
   * @param shardingContexts 分片上下文
   */
  void beforeJobExecuted(ShardingContexts shardingContexts);
  
  /**
   * 做業執行後的執行的方法.
   *
   * @param shardingContexts 分片上下文
   */
  void afterJobExecuted(ShardingContexts shardingContexts);
  
  /**
   * 發佈執行事件.
   *
   * @param jobExecutionEvent 做業執行事件
   */
  void postJobExecutionEvent(JobExecutionEvent jobExecutionEvent);
  
  /**
   * 發佈做業狀態追蹤事件.
   *
   * @param taskId 做業Id
   * @param state 做業執行狀態
   * @param message 做業執行消息
   */
  void postJobStatusTraceEvent(String taskId, JobStatusTraceEvent.State state, String message);

 

做業註冊不能同名

做業服務時按IP在${namespaces}/${jobUniqueId}/servers節點註冊。同名的做業只會生成唯一的jobInstanceId,生成規則與當前做業服務器JVM的進程ID有關。 JobRegistry用ConcurrentHashMap保存jobName與JobInstance、JobScheduleController 值對關係。會進行更行操做,雖然對象是新內存,但jobInstanceId和scheduler是同一個。

數據分片

做業框架只負責將分片合理的分配給相關的做業服務器,做業服務器需根據所分配的分片匹配數據進行處理。將真實數據和邏輯分片對應,用於解耦做業框架和數據的關係。分片是發現服務器波動,或修改分片總數,將標記一個狀態,而非直接分片。

設置shardingTotalCount、shardingItemParameters 信息。

做業高可用

將分片項設置爲1,並使用多於1臺的服務器執行做業,做業將會以1主n從的方式執行。一旦執行做業的服務器崩潰,等待執行的服務器將會在下次做業啓動時替補執行。開啓失效轉移和監控功能,能夠保證在本次做業執行時崩潰,備機當即啓動替補執行(monitorExecution = true && failover =true)。

彈性擴容縮容

將任務拆分爲n個任務項後,各個服務器分別執行各自分配到的任務項。一旦有新的服務器加入集羣,或現有服務器下線,在保留本次任務執行不變的狀況下,下次任務開始前觸發任務重分片。

相關文章
相關標籤/搜索