分佈式定時任務框架Elastic-Job的使用

1、前言java

    Elastic-Job是一個優秀的分佈式做業調度框架。框架

    Elastic-Job是一個分佈式調度解決方案,由兩個相互獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。運維

    Elastic-Job-Lite定位爲輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。分佈式

    Elastic-Job-Cloud使用Mesos + Docker的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。ide

1. Elastic-Job-Liteui

  • 分佈式調度協調this

  • 彈性擴容縮容spa

  • 失效轉移線程

  • 錯過執行做業重觸發code

  • 做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例

  • 自診斷並修復分佈式不穩定形成的問題

  • 支持並行調度

  • 支持做業生命週期操做

  • 豐富的做業類型

  • Spring整合以及命名空間提供

  • 運維平臺

2. Elastic-Job-Cloud

  • 應用自動分發

  • 基於Fenzo的彈性資源分配

  • 分佈式調度協調

  • 彈性擴容縮容

  • 失效轉移

  • 錯過執行做業重觸發

  • 做業分片一致性,保證同一分片在分佈式環境中僅一個執行實例

  • 支持並行調度

  • 支持做業生命週期操做

  • 豐富的做業類型

  • Spring整合

  • 運維平臺

  • 基於Docker的進程隔離(TBD)

2、導讀

    一、Elastic-Job的核心思想

    二、Elastic-Job的基本使用

3、Elastic-Job的核心思想

    對於分佈式計算而言,分片是最基本的思想,Elastic-Job也是沿用了這個思想,每一個job跑部分數據,全部job執行完成,即是全量數據,官網給出的SimpleJob例子以下:

public class MyElasticJob implements SimpleJob {
    
    @Override
    public void execute(ShardingContext context) {
        switch (context.getShardingItem()) {
            case 0: 
                // do something by sharding item 0
                break;
            case 1: 
                // do something by sharding item 1
                break;
            case 2: 
                // do something by sharding item 2
                break;
            // case n: ...
        }
    }
}

    用switch case循環來對應分片的業務邏輯,case分片的index,進入業務邏輯執行。固然這裏也有不適應的場景,相似於MapReduce須要shuffle的場景就不適合了,比方說,要根據某一個字段全局分組聚合求結果,這時候怎麼分片均可能會不合理,由於每一個分片只能處理N分之一的數據,沒辦法shuffle再聚合,這一點,也要根據具體的業務來使用。

   那麼ShardingContext能夠拿到那些信息呢?源碼以下

    

public final class ShardingContext {
    
    /**
     * 做業名稱.
     */
    private final String jobName;
    
    /**
     * 做業任務ID.
     */
    private final String taskId;
    
    /**
     * 分片總數.
     */
    private final int shardingTotalCount;
    
    /**
     * 做業自定義參數.
     * 能夠配置多個相同的做業, 可是用不一樣的參數做爲不一樣的調度實例.
     */
    private final String jobParameter;
    
    /**
     * 分配於本做業實例的分片項.
     */
    private final int shardingItem;
    
    /**
     * 分配於本做業實例的分片參數.
     */
    private final String shardingParameter;
    
    public ShardingContext(final ShardingContexts shardingContexts, final int shardingItem) {
        jobName = shardingContexts.getJobName();
        taskId = shardingContexts.getTaskId();
        shardingTotalCount = shardingContexts.getShardingTotalCount();
        jobParameter = shardingContexts.getJobParameter();
        this.shardingItem = shardingItem;
        shardingParameter = shardingContexts.getShardingItemParameters().get(shardingItem);
    }
}

    以上代碼,jobParameter和shardingItem是最有用的參數,shardingItem決定switch case循環的走向,shardingParameter能夠用業務的查詢條件,也能夠用字符串拼接的方式組裝很複雜的參數用於特定的業務。

4、Elastic-Job的基本使用

    一、Job配置項

public class ElasticJobConfig {
	private static CoordinatorRegistryCenter createRegistryCenter() {

		ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job");
		CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
		regCenter.init();
		return regCenter;
	}

	private static LiteJobConfiguration createJobConfiguration() {

		JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("jobdemo", "0/5 * * * * ?", 3)
				.shardingItemParameters("0=A,1=A,2=B").failover(true).misfire(true).build();
		SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig,
				MyElasticJob.class.getCanonicalName());
		LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true)
				.build();
		return simpleJobRootConfig;
	}

	public static void main(String[] args) {
		new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
	}
}

    幾點說明:

    註冊中心配置項,設置zookeeper集羣地址,我這裏用的本地單節點,因此只有一個,固然能夠配置任務名稱,命名空間(namespace,本質上會在zk裏生成一個目錄),超時時間,最大重試次數等等

    LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build()中,overwrite參數很是重要,設置這個參數爲true,修改過job配置信息纔會覆蓋zookeeper裏的數據,要否則不會生效。

    二、SimpleJob的實現

public class MyElasticJob implements SimpleJob {

	@Override
	public void execute(ShardingContext shardingContext) {
		switch (shardingContext.getShardingItem()) {
		case 0: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		case 1: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		case 2: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		default: {
			System.out.println("當前分片:" + shardingContext.getShardingItem() + "=====" + "參數:"
					+ shardingContext.getShardingParameter() + " =====" + Thread.currentThread());
			break;
		}
		}
	}
}

    上面設置每5秒鐘執行一次,執行ElasticJobConfig的main方法,執行結果以下:

    

    從上面的結果,能夠看出,執行每一個分片的任務,實際上是放到一個線程池去執行的,對應的分片信息和參數信息在shardingContext能夠拿到,實現業務很是方便。

    最後,若是啓動多個JVM,那麼這些任務就分散到各個節點裏,若是一個節點宕機,下次觸發任務時,將把該分片任務丟到健康機器執行,這裏作到了節點容錯。可是某個分片的任務在執行過程當中失敗了,那麼這裏是不會從新觸發改分片任務的執行的。

相關文章
相關標籤/搜索