elasticjob-2.0.5-SNAPSHOT源碼分析

本文討論均基於elasticjob的2.0.5-SNAPSHOT版本java

先從官網elastic-job-example-lite-java的demo開始分析,從com.dangdang.ddframe.job.example.JavaMain開始分析。linux

根據配置建立註冊中心

註冊中心uml

如今看來註冊中心的實現只有zk的實現,對於zk的操做使用了Apache的curator,demo中使用了內嵌TestingServergit

建立事件跟蹤寫庫config

  1. 定義了一個數據源DataSource
  2. 建立一個時間configJobEventRdbConfiguration

暫且不是很清楚其做用,從類名能夠大致猜出是把job的運行記錄寫入數據庫中。github

事件config

建立核心config

很喜歡Builder模式啊 :)數據庫

核心config包含建立出下述中不一樣job類型的公有屬性,是對config的更高一層抽象。根據其newBuilder方法可知,有三個參數是必須的apache

屬性名 類型 是否必須 說明 備註
jobName String 做業名稱 暫時不清楚是否須要惟一
cron String cron表達式
shardingTotalCount int 做業分片總數
shardingItemParameters String 自定義分片參數 後續詳細介紹
jobParameter String job參數
failover boolean 是否失敗重試 之前看到的是利用zk實現?
misfire boolean 是否啞火重試 利用quartz實現?
description String 描述
jobProperties JobProperties job額外的參數一個Map

建立不一樣job類型的config

jobtype

從上圖可知要建立出不一樣類型的job,先要建立不一樣類型的jobconfig,這裏先以SimpleJob爲例,建立SimpleJobConfiguration須要兩個參數,一個是上述中的JobCoreConfiguration;另外一個是你寫的業務邏輯的job實現類的權限定類名,由於這裏以SimpleJob爲例,因此這個類必須繼承SimpleJob,類繼承圖以下:json

jobextends

SimpleJobConfiguration屬性以下:服務器

屬性名 類型 是否必須 說明 備註
coreConfig JobCoreConfiguration 核心屬性
jobType JobType job類型 以Simple爲例則爲JobType.SIMPLE
jobClass String job的全限定類名

建立任務調度JobScheduler

建立JobScheduler須要註冊中心CoordinatorRegistryCenter,任務相關配置LiteJobConfiguration,事件配置相關JobEventConfiguration,job監聽器ElasticJobListener多線程

說說其中的LiteJobConfiguration,合分合,這樣作多是語義和邏輯上的清晰,其屬性具體意義可參考其Builder中的註釋。異步

建立過程過於複雜,不得不貼代碼分析了。

private JobScheduler(...) {
        jobName = liteJobConfig.getJobName();
        //1
        jobExecutor = new JobExecutor(regCenter, liteJobConfig, elasticJobListeners);
        //2
        jobFacade = new LiteJobFacade(regCenter, jobName, Arrays.asList(elasticJobListeners), jobEventBus);
        //3
        jobRegistry = JobRegistry.getInstance();
    }

建立了做業執行器JobExecutor,內部服務門面類LiteJobFacade以及做業註冊表JobRegistry

一、JobExecutor`做業啓動器

public JobExecutor(...) {
		//...
        //1
        setGuaranteeServiceForElasticJobListeners(regCenter, elasticJobListenerList);
        //2
        schedulerFacade = new SchedulerFacade(regCenter, liteJobConfig.getJobName(), elasticJobListenerList);
    }

1處給全部實現AbstractDistributeOnceElasticJobListener的類添加GuaranteeService,每次執行先後** 全部 **分片項都註冊到zk上,若是都註冊成功,才執行實現類裏面的doBeforeJobExecutedAtLastStarted方法或者doAfterJobExecutedAtLastCompleted,而後清除zk上剛纔註冊的節點。

2處建立爲調度器提供內部服務的門面類SchedulerFacade

1.一、SchedulerFacade建立

public SchedulerFacade(...) {
		//1
        configService = new ConfigurationService(regCenter, jobName);
        //2
        leaderElectionService = new LeaderElectionService(regCenter, jobName);
        //3
        serverService = new ServerService(regCenter, jobName);
        //4
        shardingService = new ShardingService(regCenter, jobName);
        //5
        executionService = new ExecutionService(regCenter, jobName);
        //6
        monitorService = new MonitorService(regCenter, jobName);
        //7
        listenerManager = new ListenerManager(regCenter, jobName, elasticJobListeners);
    }

建立了各類服務類,這些服務大部分都在zk上有對應的節點信息。zk節點詳細解釋參考官網說明

  1. 配置服務,在zk上有具體的節點config,值爲json格式的LiteJobConfiguration
  2. 選主服務,在zk上有具體的節點leader,用於選擇到底哪臺服務器執行
  3. 服務器信息服務,在zk上有具體節點servers,先暫時用ip區分,記錄服務hostname,狀態及分片信息。
  4. 分片服務
  5. 執行服務,節點execution,根據分片號區分,存儲具體的分片執行狀況。 此處可看出misfire還要藉助zk
  6. 監控服務,外界能夠開一個socket鏈接,而後輸入dump獲取zk的全部註冊節點,linux下命令echo dump|nc ip port
  7. 監聽管理,監聽zk節點變化,好比選主、failover、分片等。** 這裏面的代碼也不少,到執行的時候應該還有,因此先不看了 **

1.二、LiteJobFacade

public LiteJobFacade(...) {
        configService = new ConfigurationService(regCenter, jobName);
        shardingService = new ShardingService(regCenter, jobName);
        serverService = new ServerService(regCenter, jobName);
        executionContextService = new ExecutionContextService(regCenter, jobName);
        executionService = new ExecutionService(regCenter, jobName);
        failoverService = new FailoverService(regCenter, jobName);
        this.elasticJobListeners = elasticJobListeners;
        this.jobEventBus = jobEventBus;
    }

這也是一個門面類,相對於上面的SchedulerFacade,多了幾個服務

  1. FailoverService失敗重試服務,在zk的根節點爲failover,一旦該節點出現,相應的節點監聽器就會執行。
  2. JobEventBus異步的事件總線,如今來看只註冊了記錄任務執行蹤影的寫數據庫事件

1.三、JobRegistry做業註冊表

用map將jobName爲key,做業調度控制器JobScheduleController爲value存於內存中,後期還能夠再用key取出控制器進行job的觸發、暫停等操做。可見這只是針對於某臺機器的,而對於分佈式job是有限制的,而且上述中提出的jobName是否可重複,在此處能夠推斷出設計上是要保證惟一性的

執行JobScheduler#init()方法,執行任務調度

public void init() {
        jobExecutor.init();
        JobTypeConfiguration jobTypeConfig = jobExecutor.getSchedulerFacade().loadJobConfiguration().getTypeConfig();
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
        jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
        jobRegistry.addJobScheduleController(jobName, jobScheduleController);
    }

一、jobExecutor初始化

public void init() {
        schedulerFacade.clearPreviousServerStatus();
        regCenter.addCacheData("/" + liteJobConfig.getJobName());
        schedulerFacade.registerStartUpInfo(liteJobConfig);
    }
  1. 清除servers/ip/status和servers/ip/shutdown
  2. 生成以/jobName爲根節點的TreeCache
  3. 註冊啓動信息

1.一、註冊啓動信息

/**
     * 註冊Elastic-Job啓動信息.
     * 
     * @param liteJobConfig 做業配置
     */
    public void registerStartUpInfo(final LiteJobConfiguration liteJobConfig) {
        listenerManager.startAllListeners();
        leaderElectionService.leaderForceElection();
        configService.persist(liteJobConfig);
        serverService.persistServerOnline(!liteJobConfig.isDisabled());
        serverService.clearJobPausedStatus();
        shardingService.setReshardingFlag();
        monitorService.listen();
        listenerManager.setCurrentShardingTotalCount(configService.load(false).getTypeConfig().getCoreConfig().getShardingTotalCount());
    }
  1. 開啓監聽,只要根節點下的節點發生變化就會觸發對應的監聽
  • 選主監聽:LeaderElectionJobListener->leaderElectionService leader/host=ip
  • 分片監聽:總分片ShardingTotalCountChangedJobListener和servers節點變化ListenServersChangedJobListener
  • 執行監聽:config節點變化監聽`MonitorExecutionChangedJobListener
  • 失敗重試監聽:JobCrashedJobListener必須知足變化的節點以running結尾,而且是刪除事件&&execution/分片號/completed節點存在,配置信息failover爲true;FailoverJobCrashedJobListener必須知足變化的節點以failover結尾,而且是刪除事件&&execution/分片號/completed節點存在,配置信息failover爲true;FailoverSettingsChangedJobListener觸發條件config變化
  • jobOperationListenerManager.start();
  • configurationListenerManager.start();
  • guaranteeListenerManager.start(); 太多了,本身看代碼吧。 :(
  1. 選主服務強制選主,建立一個臨時節點leader/host=ip
  2. config節點註冊,若是config的overwirte爲true會強制重置config節點的信息
  3. 註冊執行機器上線節點,能夠經過控制disabled屬性來動態控制某臺執行機器的上下線,另外還註冊了一個臨時節點servers/ip/status來表示機器的狀態
  4. 清除job的pause標記
  5. 設置resharding標誌
  6. 若是config中的monitorPort大於0,則開啓一個ServerSocket
  7. 設置ListenerManager的總分片數。

一、建立JobScheduleController

public void init() {
        //...
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(jobTypeConfig.getCoreConfig().isMisfire()), createJobDetail(jobTypeConfig.getJobClass()), jobExecutor.getSchedulerFacade(), jobName);
       //...
    }

1.一、先建立quartz的Scheduler

private Scheduler createScheduler(final boolean isMisfire) {
        Scheduler result;
        try {
            StdSchedulerFactory factory = new StdSchedulerFactory();
            factory.initialize(getBaseQuartzProperties(isMisfire));
            result = factory.getScheduler();
            result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
        } catch (final SchedulerException ex) {
            throw new JobSystemException(ex);
        }
        return result;
    }

從這裏能夠看出misfire的具體流程是quartz監聽到某任務發生misfire,觸發監聽器JobTriggerListener的triggerMisfired方法,而後將misfire節點註冊到zk上,zk的TreeCach監聽到節點變化作出相應處理。

1.三、建立quartz的JobDetail

private JobDetail createJobDetail(final String jobClass) {
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(jobName).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        //這個地方有什麼用????
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) {
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
            try {
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) {
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }

LiteJob是繼承自quartz的Job類,實現了execute方法,有兩個屬性一個ElasticJob是你本身實現的業務邏輯的類,另外一個是JobFacade門面類。execute方法其實調用了executor的execute方法,下面是executor的繼承類。

executor.png

分片等處理都已經在抽象類中實現好了,須要實現的是 protected abstract void process(ShardingContext shardingContext);

對於每一個分片是多線程調用,代碼以下:

for (final int each : items) {
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) {
                return;
            }
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally {
                        latch.countDown();
                    }
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }

1.四、任務調度執行

public void init() {
      	//...
        jobScheduleController.scheduleJob(jobTypeConfig.getCoreConfig().getCron());
        jobRegistry.addJobScheduleController(jobName, jobScheduleController);
    }

根據cron表達式開始啓動任務調度,而且把控制器放入註冊表中。

相關文章
相關標籤/搜索