Elastic-job 2.1.3 概述

Elastic-job-lite 2.1.3 代碼詳解java

框架

Elastic-Job是一個分佈式調度解決方案,由兩個獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。node

Elastic-Job-Lite: 輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。spring

Elastic-Job-Cloud:  Mesos + Docker 的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。數據庫

 

基本原理

Elastic-job並沒有做業調度中心節點,而是基於部署做業Quartz的程序在到達相應時間點時各自觸發調度。zookeeper用於做業註冊、信息存儲、任務執行過程的狀態標記等, 主做業實例在選舉過程當中產生後用於做業分片的計算。apache

zookeeper上建立節點樹,保存任務配置信息;各監聽TreeCacheListener託管於"/${jobName}"的TreeCache對象的ListenerContainer中。當zk的節點樹變化(add、remove、update...)TreeCache&TreeNode<implements org.apache.zookeeper.Watcher、api

org.apache.curator.framework.api.BackgroundCallback>處理watchedEvent的響應,TreeCache調用publishEvent方法異步喚醒全部TreeCacheListener。服務器

同時將當前TreeNode再次綁定爲TreeCache的path監聽:
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);client.getData().usingWatcher(this).inBackground(this)).forPath(this.path);client.getChildren().usingWatcher(this).inBackground(this)).forPath(this.path);併發

印證:app

zookeeper在create、delete、setData、exists、getData、getACL、getChildren時都能定義AsyncCallback;但只有在 ZooKeeper構造、exists、getData、getChildren 能註冊Watcher.框架

 

package com.dangdang.ddframe.job.lite.internal.listener;

import com.google.common.base.Charsets;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

/**
 * 做業註冊中心的監聽器.
 * 
 * @author zhangliang
 */
public abstract class AbstractJobListener implements TreeCacheListener {
    
    @Override
    public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception {
        ChildData childData = event.getData();
        if (null == childData) {
            return;
        }
        String path = childData.getPath();
        if (path.isEmpty()) {
            return;
        }
        dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8));
    }
    
    protected abstract void dataChanged(final String path, final Type eventType, final String data);
}
TreeCache:

 	private void publishEvent(final TreeCacheEvent event) {
		if (treeState.get() != TreeState.CLOSED) {
			LOG.debug("publishEvent: {}", event);
			executorService.submit(() -> {
				try {
					callListeners(event);
				} catch (Exception e) {
					ThreadUtils.checkInterrupted(e);
					handleException(e);
				}
			});
		}
	}

 private void callListeners(final TreeCacheEvent event)
 {
     listeners.forEach(new Function<TreeCacheListener, Void>()
     {
         @Override
         public Void apply(TreeCacheListener listener)
         {
             try
             {
                 listener.childEvent(client, event);
             }
             catch ( Exception e )
             {
                 ThreadUtils.checkInterrupted(e);
                 handleException(e);
             }
             return null;
         }
     });
 }

 

如何使用

maven

elastic-job-lite使用在 zookeeper-3.4.6.jar基礎上進行封裝curator框架(2.10.0) 來操做zookeeper節點。

構建項目時,使用curator的版本都應該一致:

<dependencies>     
    <dependency>
		<groupId>com.dangdang</groupId>
		<artifactId>elastic-job-lite-core</artifactId>
		<version>2.1.3</version>
	</dependency>
	<dependency>
		<groupId>com.dangdang</groupId>
		<artifactId>elastic-job-lite-spring</artifactId>
		<version>2.1.3</version>
	</dependency>
</dependencies>
<quartz.version>2.2.1</quartz.version>
      <curator.version>2.10.0</curator.version>

       <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
         </dependency>

spring接入

elastic-job-lite-spring包下spring.handlers、spring.schemas文件聲明xml中命名空間和對應的標籤。RegNamespaceHandler 、JobNamespaceHandler: extends NamespaceHandlerSupport
  job-ref配置優先級大於class屬性配置,在JobScheduler的createJobDetail方法中會斷定LitJob類屬性elasticJob實例的來源。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
	xmlns:job="http://www.dangdang.com/schema/ddframe/job"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.dangdang.com/schema/ddframe/reg
                        http://www.dangdang.com/schema/ddframe/reg/reg.xsd
                        http://www.dangdang.com/schema/ddframe/job
                        http://www.dangdang.com/schema/ddframe/job/job.xsd
                        ">
    	<!--配置做業註冊中心 -->
	<reg:zookeeper id="regCenter" server-lists=" yourhost:2181"
		namespace="dd-job" base-sleep-time-milliseconds="1000"
		max-sleep-time-milliseconds="3000" max-retries="3" />

	<!-- 配置簡單做業 -->
	<job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

	<bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob">
		<property name="fooService" ref="xxx.FooService" />
	</bean>

	<!-- 配置關聯Bean做業 -->
	<job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

	<!-- 配置數據流做業 -->
	<job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />

	<!-- 配置腳本做業 -->
	<job:script id="scriptElasticJob" registry-center-ref="regCenter"
		cron="0/10 * * * * ?" sharding-total-count="3"
		sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />

	<!-- 配置帶監聽的簡單做業 -->
	<job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
		<job:listener class="xx.MySimpleJobListener" />
		<job:distributed-listener class="xx.MyOnceSimpleJobListener"
			started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
	</job:simple>

	<!-- 配置帶做業數據庫事件追蹤的簡單做業 -->
	<job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob"
		registry-center-ref="regCenter" cron="0/10 * * * * ?"
		sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"
		event-trace-rdb-data-source="yourDataSource">
	</job:simple>


</beans>
public  class SmsNoticeTask implements SimpleJob {

	@Override
	public void execute(ShardingContext shardingContext) {
		logger.info("任務執行分片信息爲:{}", shardingContext);
		//TODO do something

	}

 

node介紹

做業一旦啓動成功後不能修改JobName,若是修更名稱則視爲新的做業實例。

${namespaces}/${JobName} 下持久化config、leaderserversinstances 、sharding主節點。

SchedulerFacade:
/**
 * 註冊做業啓動信息.
 * 
 * @param enabled 做業是否啓用
 */
public void registerStartUpInfo(final boolean enabled) {
    listenerManager.startAllListeners();
    leaderService.electLeader();
    serverService.persistOnline(enabled);
    instanceService.persistOnline();
    shardingService.setReshardingFlag();
    monitorService.listen();
    if (!reconcileService.isRunning()) {
        reconcileService.startAsync();
    }
}

config

ConfigurationService
持久化節點,保存任務的參數配置。若zk上已經持久化配置,且沒有設置overwrite爲true,以zk爲準。

JobScheduler.init() 
        -> schedulerFacade.updateJobConfiguration(liteJobConfig)
                ->  configService.persist(liteJobConfig)

/**
 * 持久化分佈式做業配置信息.
 * 
 * @param liteJobConfig
 *            做業配置
 */
public void persist(final LiteJobConfiguration liteJobConfig) {
	checkConflictJob(liteJobConfig);// 校驗JobClass; 校驗zk上若存在config節點但數據爲null,刪除Job整個節點
	if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) {
		jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT,
				LiteJobConfigurationGsonFactory.toJson(liteJobConfig));
	}
}

leader

LeaderService

持久化 electionsharding、failover 子節點。 

election

/**
 * 主節點路徑.
 * 
 */
public final class LeaderNode {
    
    /**
     * 主節點根路徑.
     */
    public static final String ROOT = "leader";
    
    static final String ELECTION_ROOT = ROOT + "/election";
    
    static final String INSTANCE = ELECTION_ROOT + "/instance";
    
    static final String LATCH = ELECTION_ROOT + "/latch";
    
    private final JobNodePath jobNodePath;
    
   ..........
}


看成業初始化註冊或原主做業實例離線時,觸發選主過程。 
 LeaderElectionJobListener、 LeaderAbdicationJobListener

  • latch: 持久化子節點。在選主過程當中,因多節點分佈式服務建立臨時有序子節點來鎖限制。

    經過LeaderLatch來分佈式併發鎖定選主過程。建立的有序臨時節點當序號最小時獲取到執行權。LeaderLatch.start() : 重置leadership,checkLeadership方法中斷定當前本身節點序號是否最小,如果,設置Leadership = true。不然再次註冊Watcher和BackgroundCallback來斷定;
    LeaderLatch.
    wait() : 斷定 leadership == false則Object.wait()。

    LeaderService.electLeader() -> jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()) 
    LeaderService:
    
     /**
      * 選舉主節點.
      */
    public void electLeader() {
        log.debug("Elect a new leader now.");
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }
    
    
    @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback {
    
    	@Override
    	public void execute() {
    		if (!hasLeader()) {
    			jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE,
    					JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
    		}
    	}
    }
    
    -----------------------------------------------------------------------------------------
    
    JobNodeStorage:
    
    /**
     * 在主節點執行操做.
     * 
     * @param latchNode 分佈式鎖使用的做業節點名稱
     * @param callback 執行操做的回調
     */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) {
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) {
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) {
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }
    LeaderLatch:
    void reset() throws Exception
    {
        setLeadership(false);
        setNode(null);
    
        BackgroundCallback callback = new BackgroundCallback()
        {
            @Override
            public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
            {
                if ( debugResetWaitLatch != null )
                {
                    debugResetWaitLatch.await();
                    debugResetWaitLatch = null;
                }
    
                if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                {
                    setNode(event.getName());
                    if ( state.get() == State.CLOSED )
                    {
                        setNode(null);
                    }
                    else
                    {
                        getChildren();
                    }
                }
                else
                {
                    log.error("getChildren() failed. rc = " + event.getResultCode());
                }
            }
        };
        client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
    		.inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
    }
    
    
    public void await() throws InterruptedException, EOFException {
        synchronized(this) {
            while(this.state.get() == LeaderLatch.State.STARTED && !this.hasLeadership.get()) {
                this.wait();
            }
        }
    
        if(this.state.get() != LeaderLatch.State.STARTED) {
            throw new EOFException();
        }
    }
  • instance: 臨時子節點。當選主完成後生成節點並保存主服務jobInstanceId<做業運行實例Id>。
  • sharding

    ShardingService

    持久化necessary節點,當分片完成後將被刪除。
    用於做業啓動、分片總數變動、做業服務器變更、或做業運行實例變更狀況下設置分片標記。

    ShardingListenerManager:
    
    class ShardingTotalCountChangedJobListener extends AbstractJobListener {
    
    	@Override
    	protected void dataChanged(final String path, final Type eventType, final String data) {
    		if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
    			int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig()
    					.getCoreConfig().getShardingTotalCount();
    			if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) {
    				shardingService.setReshardingFlag();
    				JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount);
    			}
    		}
    	}
    }
    
    class ListenServersChangedJobListener extends AbstractJobListener {
    
    	@Override
    	protected void dataChanged(final String path, final Type eventType, final String data) {
    		if (!JobRegistry.getInstance().isShutdown(jobName)
    				&& (isInstanceChange(eventType, path) || isServerChange(path))) {
    			shardingService.setReshardingFlag();
    		}
    	}

failover

  • FailoverService
  • FailoverListenerManager
    • FailoverSettingsChangedJobListener :
       設置failover=false時,移除全部分片sharding/${itemIndex}/failover節點。
    • JobCrashedJobListener:
        斷定:failover == ture
                  && 事件源爲instances節點下做業實例子節點刪除<做業實例有離線>
                  &&  離線的jobInstanceId不爲本機  
      則獲取失效jobInstanceId處理的含sharding/${itemIndex}/failover標記的分片;若爲空,則獲取該失效jobInstanceId的全部正常分片 。將這些分片設置標記leader/failover/items/${itemIndex}。
      以leaderLatch的方式執行FailoverLeaderExecutionCallback (集羣每次只能當前一個處理完成才能處理下一個,多個服務實例的處理線程會分佈式鎖競爭等待)
FailoverListenerManager

private boolean isFailoverEnabled() {
	LiteJobConfiguration jobConfig = configService.load(true);
	return null != jobConfig && jobConfig.isFailover();
}

class JobCrashedJobListener extends AbstractJobListener {

	@Override
	protected void dataChanged(final String path, final Type eventType, final String data) {
		if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) {
			String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
			if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) {
				return;
			}
			List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
			if (!failoverItems.isEmpty()) {
				for (int each : failoverItems) {
					failoverService.setCrashedFailoverFlag(each);
					failoverService.failoverIfNecessary();
				}
			} else {
				for (int each : shardingService.getShardingItems(jobInstanceId)) {
					failoverService.setCrashedFailoverFlag(each);
					failoverService.failoverIfNecessary();
				}
			}
		}
	}
}

servers

ServerService

看成業服務註冊時,生成  服務器IP 持久化節點。因此按IP進行管理做業服務器。

instances

InstanceService

看成業服務註冊時,生成臨時做業運行實例Id 臨時節點。該節點名稱規則:

eg: 192.168.42.1@-@6260

package com.dangdang.ddframe.job.lite.api.strategy;

import com.dangdang.ddframe.job.util.env.IpUtils;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.lang.management.ManagementFactory;

/**
 * 做業運行實例.
 * 
 * @author zhangliang
 */
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode(of = "jobInstanceId")
public final class JobInstance {

	private static final String DELIMITER = "@-@";

	/**
	 * 做業實例主鍵.
	 */
	private final String jobInstanceId;

	public JobInstance() {
		jobInstanceId = IpUtils.getIp() + DELIMITER + 
                       ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
	}

	/**
	 * 獲取做業服務器IP地址.
	 * 
	 * @return 做業服務器IP地址
	 */
	public String getIp() {
		return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER));
	}
}

sharding

  • ShardingService
  • ShardingListenerManager
    • ShardingTotalCountChangedJobListener
    • ListenServersChangedJobListener

服務初次註冊、或服務實例發生變動、分片總數變動時促發分片。分片將在下次做業觸發時執行,只有主節點能夠分片,分片時的從節點都將阻塞。

  1. 斷定有可用的做業實例服務,且有分片標記。
  2. 斷定當前是否爲有主服務,沒有則觸發選舉,並等待選主完成。
  3. 斷定本身是否主服務,若是不是,則等待直到主服務分片完成。
  4. 斷定是否還有在執行過程當中的分片,如有,則等待完成。
  5. 建立 leader/sharding/processing  臨時節點。
  6. 清空原有的分片節點,並按現有的節點建立各sharding/${itemIndex} 持久化節點。
  7. 根據指定的策略進行分片,在一個事務中建立sharding/${itemIndex}/instance 節點 並填充JobInstanceId;並刪除 leader/sharding/necessary  和 leader/sharding/processing  節點。
ShardingService:

/**
 * 若是須要分片且當前節點爲主節點, 則做業分片.
 * 
 * <p>
 * 若是當前無可用節點則不分片.
 * </p>
 */
public void shardingIfNecessary() {
	List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
	if (!isNeedSharding() || availableJobInstances.isEmpty()) {
		return;
	}
	if (!leaderService.isLeaderUntilBlock()) {
		blockUntilShardingCompleted();
		return;
	}
	waitingOtherJobCompleted();
	LiteJobConfiguration liteJobConfig = configService.load(false);
	int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
	log.debug("Job '{}' sharding begin.", jobName);
	jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
	resetShardingInfo(shardingTotalCount);
	JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory
			.getStrategy(liteJobConfig.getJobShardingStrategyClass());
	jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback(
			jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
	log.debug("Job '{}' sharding complete.", jobName);
}
相關文章
相關標籤/搜索