Elastic-job-lite 2.1.3 代碼詳解java
Elastic-Job是一個分佈式調度解決方案,由兩個獨立的子項目Elastic-Job-Lite和Elastic-Job-Cloud組成。node
Elastic-Job-Lite: 輕量級無中心化解決方案,使用jar包的形式提供分佈式任務的協調服務。spring
Elastic-Job-Cloud: Mesos + Docker 的解決方案,額外提供資源治理、應用分發以及進程隔離等服務。數據庫
Elastic-job並沒有做業調度中心節點,而是基於部署做業Quartz的程序在到達相應時間點時各自觸發調度。zookeeper用於做業註冊、信息存儲、任務執行過程的狀態標記等, 主做業實例在選舉過程當中產生後用於做業分片的計算。apache
zookeeper上建立節點樹,保存任務配置信息;各監聽TreeCacheListener託管於"/${jobName}"的TreeCache對象的ListenerContainer中。當zk的節點樹變化(add、remove、update...)TreeCache&TreeNode<implements org.apache.zookeeper.Watcher、api
org.apache.curator.framework.api.BackgroundCallback>處理watchedEvent的響應,TreeCache調用publishEvent方法異步喚醒全部TreeCacheListener。服務器
同時將當前TreeNode再次綁定爲TreeCache的path監聽:
client.checkExists().usingWatcher(this).inBackground(this).forPath(path);client.getData().usingWatcher(this).inBackground(this)).forPath(this.path);client.getChildren().usingWatcher(this).inBackground(this)).forPath(this.path);併發
印證:app
zookeeper在create、delete、setData、exists、getData、getACL、getChildren時都能定義AsyncCallback;但只有在 ZooKeeper構造、exists、getData、getChildren 能註冊Watcher.框架
package com.dangdang.ddframe.job.lite.internal.listener; import com.google.common.base.Charsets; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; import org.apache.curator.framework.recipes.cache.TreeCacheListener; /** * 做業註冊中心的監聽器. * * @author zhangliang */ public abstract class AbstractJobListener implements TreeCacheListener { @Override public final void childEvent(final CuratorFramework client, final TreeCacheEvent event) throws Exception { ChildData childData = event.getData(); if (null == childData) { return; } String path = childData.getPath(); if (path.isEmpty()) { return; } dataChanged(path, event.getType(), null == childData.getData() ? "" : new String(childData.getData(), Charsets.UTF_8)); } protected abstract void dataChanged(final String path, final Type eventType, final String data); }
TreeCache: private void publishEvent(final TreeCacheEvent event) { if (treeState.get() != TreeState.CLOSED) { LOG.debug("publishEvent: {}", event); executorService.submit(() -> { try { callListeners(event); } catch (Exception e) { ThreadUtils.checkInterrupted(e); handleException(e); } }); } } private void callListeners(final TreeCacheEvent event) { listeners.forEach(new Function<TreeCacheListener, Void>() { @Override public Void apply(TreeCacheListener listener) { try { listener.childEvent(client, event); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); handleException(e); } return null; } }); }
elastic-job-lite使用在 zookeeper-3.4.6.jar基礎上進行封裝curator框架(2.10.0) 來操做zookeeper節點。
構建項目時,使用curator的版本都應該一致:
<dependencies> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.3</version> </dependency> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.3</version> </dependency> </dependencies>
<quartz.version>2.2.1</quartz.version> <curator.version>2.10.0</curator.version> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-test</artifactId> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> </dependency>
elastic-job-lite-spring包下spring.handlers、spring.schemas文件聲明xml中命名空間和對應的標籤。RegNamespaceHandler 、JobNamespaceHandler: extends NamespaceHandlerSupport
job-ref配置優先級大於class屬性配置,在JobScheduler的createJobDetail方法中會斷定LitJob類屬性elasticJob實例的來源。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:reg="http://www.dangdang.com/schema/ddframe/reg" xmlns:job="http://www.dangdang.com/schema/ddframe/job" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.dangdang.com/schema/ddframe/reg http://www.dangdang.com/schema/ddframe/reg/reg.xsd http://www.dangdang.com/schema/ddframe/job http://www.dangdang.com/schema/ddframe/job/job.xsd "> <!--配置做業註冊中心 --> <reg:zookeeper id="regCenter" server-lists=" yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" /> <!-- 配置簡單做業 --> <job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> <bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob"> <property name="fooService" ref="xxx.FooService" /> </bean> <!-- 配置關聯Bean做業 --> <job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> <!-- 配置數據流做業 --> <job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" /> <!-- 配置腳本做業 --> <job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" /> <!-- 配置帶監聽的簡單做業 --> <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C"> <job:listener class="xx.MySimpleJobListener" /> <job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" /> </job:simple> <!-- 配置帶做業數據庫事件追蹤的簡單做業 --> <job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource"> </job:simple> </beans>
public class SmsNoticeTask implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { logger.info("任務執行分片信息爲:{}", shardingContext); //TODO do something }
做業一旦啓動成功後不能修改JobName,若是修更名稱則視爲新的做業實例。
${namespaces}/${JobName} 下持久化config、leader、servers、instances 、sharding主節點。
SchedulerFacade: /** * 註冊做業啓動信息. * * @param enabled 做業是否啓用 */ public void registerStartUpInfo(final boolean enabled) { listenerManager.startAllListeners(); leaderService.electLeader(); serverService.persistOnline(enabled); instanceService.persistOnline(); shardingService.setReshardingFlag(); monitorService.listen(); if (!reconcileService.isRunning()) { reconcileService.startAsync(); } }
ConfigurationService
持久化節點,保存任務的參數配置。若zk上已經持久化配置,且沒有設置overwrite爲true,以zk爲準。
JobScheduler.init()
-> schedulerFacade.updateJobConfiguration(liteJobConfig)
-> configService.persist(liteJobConfig)
/** * 持久化分佈式做業配置信息. * * @param liteJobConfig * 做業配置 */ public void persist(final LiteJobConfiguration liteJobConfig) { checkConflictJob(liteJobConfig);// 校驗JobClass; 校驗zk上若存在config節點但數據爲null,刪除Job整個節點 if (!jobNodeStorage.isJobNodeExisted(ConfigurationNode.ROOT) || liteJobConfig.isOverwrite()) { jobNodeStorage.replaceJobNode(ConfigurationNode.ROOT, LiteJobConfigurationGsonFactory.toJson(liteJobConfig)); } }
LeaderService
持久化 election、sharding、failover 子節點。
/** * 主節點路徑. * */ public final class LeaderNode { /** * 主節點根路徑. */ public static final String ROOT = "leader"; static final String ELECTION_ROOT = ROOT + "/election"; static final String INSTANCE = ELECTION_ROOT + "/instance"; static final String LATCH = ELECTION_ROOT + "/latch"; private final JobNodePath jobNodePath; .......... }
看成業初始化註冊或原主做業實例離線時,觸發選主過程。
LeaderElectionJobListener、 LeaderAbdicationJobListener
LeaderService: /** * 選舉主節點. */ public void electLeader() { log.debug("Elect a new leader now."); jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback()); log.debug("Leader election completed."); } @RequiredArgsConstructor class LeaderElectionExecutionCallback implements LeaderExecutionCallback { @Override public void execute() { if (!hasLeader()) { jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId()); } } } ----------------------------------------------------------------------------------------- JobNodeStorage: /** * 在主節點執行操做. * * @param latchNode 分佈式鎖使用的做業節點名稱 * @param callback 執行操做的回調 */ public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { latch.start(); latch.await(); callback.execute(); //CHECKSTYLE:OFF } catch (final Exception ex) { //CHECKSTYLE:ON handleException(ex); } }
LeaderLatch: void reset() throws Exception { setLeadership(false); setNode(null); BackgroundCallback callback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { if ( debugResetWaitLatch != null ) { debugResetWaitLatch.await(); debugResetWaitLatch = null; } if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { setNode(event.getName()); if ( state.get() == State.CLOSED ) { setNode(null); } else { getChildren(); } } else { log.error("getChildren() failed. rc = " + event.getResultCode()); } } }; client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id)); } public void await() throws InterruptedException, EOFException { synchronized(this) { while(this.state.get() == LeaderLatch.State.STARTED && !this.hasLeadership.get()) { this.wait(); } } if(this.state.get() != LeaderLatch.State.STARTED) { throw new EOFException(); } }
ShardingService
持久化necessary節點,當分片完成後將被刪除。
用於做業啓動、分片總數變動、做業服務器變更、或做業運行實例變更狀況下設置分片標記。
ShardingListenerManager: class ShardingTotalCountChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (configNode.isConfigPath(path) && 0 != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { int newShardingTotalCount = LiteJobConfigurationGsonFactory.fromJson(data).getTypeConfig() .getCoreConfig().getShardingTotalCount(); if (newShardingTotalCount != JobRegistry.getInstance().getCurrentShardingTotalCount(jobName)) { shardingService.setReshardingFlag(); JobRegistry.getInstance().setCurrentShardingTotalCount(jobName, newShardingTotalCount); } } } } class ListenServersChangedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (!JobRegistry.getInstance().isShutdown(jobName) && (isInstanceChange(eventType, path) || isServerChange(path))) { shardingService.setReshardingFlag(); } }
FailoverListenerManager private boolean isFailoverEnabled() { LiteJobConfiguration jobConfig = configService.load(true); return null != jobConfig && jobConfig.isFailover(); } class JobCrashedJobListener extends AbstractJobListener { @Override protected void dataChanged(final String path, final Type eventType, final String data) { if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) { String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1); if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) { return; } List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId); if (!failoverItems.isEmpty()) { for (int each : failoverItems) { failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } else { for (int each : shardingService.getShardingItems(jobInstanceId)) { failoverService.setCrashedFailoverFlag(each); failoverService.failoverIfNecessary(); } } } } }
ServerService
看成業服務註冊時,生成 服務器IP 持久化節點。因此按IP進行管理做業服務器。
InstanceService
看成業服務註冊時,生成臨時做業運行實例Id 臨時節點。該節點名稱規則:
eg: 192.168.42.1@-@6260
package com.dangdang.ddframe.job.lite.api.strategy; import com.dangdang.ddframe.job.util.env.IpUtils; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.RequiredArgsConstructor; import java.lang.management.ManagementFactory; /** * 做業運行實例. * * @author zhangliang */ @RequiredArgsConstructor @Getter @EqualsAndHashCode(of = "jobInstanceId") public final class JobInstance { private static final String DELIMITER = "@-@"; /** * 做業實例主鍵. */ private final String jobInstanceId; public JobInstance() { jobInstanceId = IpUtils.getIp() + DELIMITER + ManagementFactory.getRuntimeMXBean().getName().split("@")[0]; } /** * 獲取做業服務器IP地址. * * @return 做業服務器IP地址 */ public String getIp() { return jobInstanceId.substring(0, jobInstanceId.indexOf(DELIMITER)); } }
服務初次註冊、或服務實例發生變動、分片總數變動時促發分片。分片將在下次做業觸發時執行,只有主節點能夠分片,分片時的從節點都將阻塞。
ShardingService: /** * 若是須要分片且當前節點爲主節點, 則做業分片. * * <p> * 若是當前無可用節點則不分片. * </p> */ public void shardingIfNecessary() { List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances(); if (!isNeedSharding() || availableJobInstances.isEmpty()) { return; } if (!leaderService.isLeaderUntilBlock()) { blockUntilShardingCompleted(); return; } waitingOtherJobCompleted(); LiteJobConfiguration liteJobConfig = configService.load(false); int shardingTotalCount = liteJobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(); log.debug("Job '{}' sharding begin.", jobName); jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, ""); resetShardingInfo(shardingTotalCount); JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory .getStrategy(liteJobConfig.getJobShardingStrategyClass()); jobNodeStorage.executeInTransaction(new PersistShardingInfoTransactionExecutionCallback( jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount))); log.debug("Job '{}' sharding complete.", jobName); }