本文基於 Elastic-Job V2.1.5 版本分享
Elastic-Job-Cloud 源碼分析系列(6篇)傳送門html
🙂🙂🙂關注微信公衆號:【芋道源碼】有福利: java
- RocketMQ / MyCAT / Sharding-JDBC 全部源碼分析文章列表
- RocketMQ / MyCAT / Sharding-JDBC 中文註釋源碼 GitHub 地址
- 您對於源碼的疑問每條留言都將獲得認真回覆。甚至不知道如何讀源碼也能夠請教噢。
- 新的源碼解析文章實時收到通知。每週更新一篇左右。
- 認真的源碼交流微信羣。
本文主要分享 Elastic-Job-Cloud 調度主流程。對應到 Elastic-Job-Lite 源碼解析文章以下:git
若是你閱讀過如下文章,有助於對本文的理解:github
😈 另外,筆者假設你已經對 《Elastic-Job-Lite 源碼分析系列》 有必定的瞭解。算法
本文涉及到主體類的類圖以下( 打開大圖 ):spring
你行好事會由於獲得讚揚而愉悅
同理,開源項目貢獻者會由於 Star 而更加有動力
爲 Elastic-Job 點贊!傳送門shell
Elastic-Job-Cloud 基於 Mesos 實現分佈式做業調度,或者說 Elastic-Job-Cloud 是 Mesos 上的 框架( Framework )。apache
一個 Mesos 框架由兩部分組成:segmentfault
Elastic-Job-Cloud 由兩個項目組成:緩存
com.dangdang.ddframe.job.cloud.scheduler.mesos.SchedulerEngine
。com.dangdang.ddframe.job.cloud.executor.TaskExecutor
。本文略微「囉嗦」,請保持耐心。搭配《用Mesos框架構建分佈式應用》一塊兒閱讀,理解難度下降 99%。OK,開始咱們的 Cloud 之旅。
在 Elastic-Job-Cloud,做業執行分紅兩種類型:
常駐做業是做業一旦啓動,不管運行與否均佔用系統資源;
常駐做業適合初始化時間長、觸發間隔短、實時性要求高的做業,要求資源配備充足。
瞬時做業是在做業啓動時佔用資源,運行完成後釋放資源。
瞬時做業適合初始化時間短、觸發間隔長、容許延遲的做業,通常用於資源不太充分,或做業要求的資源多,適合資源錯峯使用的場景。
Elastic-Job-Cloud 不一樣於 Elastic-Job-Lite 去中心化執行調度,轉變爲 Mesos Framework 的中心節點調度。這裏不太理解,不要緊,下文看到具體代碼就能明白了。
常駐做業、瞬時做業在調度中會略有不一樣,大致粗略流程以下:
下面,咱們針對每一個過程一節一節解析。
在上文《Elastic-Job-Cloud 源碼分析 —— 做業配置》的「3.1.1 操做雲做業配置」能夠看到添加雲做業配置後,Elastic-Job-Cloud-Scheduler 會執行做業調度,實現代碼以下:
// ProducerManager.java
/** * 調度做業. * * @param jobConfig 做業配置 */
public void schedule(final CloudJobConfiguration jobConfig) {
// 應用 或 做業 被禁用,不調度
if (disableAppService.isDisabled(jobConfig.getAppName()) || disableJobService.isDisabled(jobConfig.getJobName())) {
return;
}
if (CloudJobExecutionType.TRANSIENT == jobConfig.getJobExecutionType()) { // 瞬時做業
transientProducerScheduler.register(jobConfig);
} else if (CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) { // 常駐做業
readyService.addDaemon(jobConfig.getJobName());
}
}複製代碼
常駐做業在調度時,直接添加到待執行做業隊列。What?豈不是立刻就運行了!No No No,答案在「5. TaskExecutor 執行任務」,這裏先打住。
// ReadyService.java
/** * 將常駐做業放入待執行隊列. * * @param jobName 做業名稱 */
public void addDaemon(final String jobName) {
if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
log.warn("Cannot add daemon job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
return;
}
Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
if (!cloudJobConfig.isPresent() || CloudJobExecutionType.DAEMON != cloudJobConfig.get().getJobExecutionType() || runningService.isJobRunning(jobName)) {
return;
}
// 添加到待執行隊列
regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}
// ReadyNode.java
final class ReadyNode {
static final String ROOT = StateNode.ROOT + "/ready";
private static final String READY_JOB = ROOT + "/%s"; // %s = ${JOB_NAME}
}複製代碼
待執行做業隊列存儲在註冊中心( Zookeeper )的持久數據節點 /${NAMESPACE}/state/ready/${JOB_NAME}
,存儲值爲待執行次數。例如此處,待執行次數爲 1
。使用 zkClient 查看以下:
[zk: localhost:2181(CONNECTED) 4] ls /elastic-job-cloud/state/ready
[test_job_simple]
[zk: localhost:2181(CONNECTED) 5] get /elastic-job-cloud/state/ready/test_job_simple
1複製代碼
在運維平臺,咱們能夠看到待執行做業隊列:
從官方的 RoadMap 來看,待執行做業隊列將來會使用 Redis 存儲以提升性能。
FROM elasticjob.io/docs/elasti…
Redis Based Queue Improvement
瞬時做業在調度時,使用發佈瞬時做業任務的調度器( TransientProducerScheduler )調度做業。當瞬時做業到達做業執行時間,添加到待執行做業隊列。
TransientProducerScheduler,發佈瞬時做業任務的調度器,基於 Quartz 實現對瞬時做業的調度。初始化代碼以下:
// TransientProducerScheduler.java
void start() {
scheduler = getScheduler();
try {
scheduler.start();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private Scheduler getScheduler() {
StdSchedulerFactory factory = new StdSchedulerFactory();
try {
factory.initialize(getQuartzProperties());
return factory.getScheduler();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private Properties getQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", Integer.toString(Runtime.getRuntime().availableProcessors() * 2)); // 線程池數量
result.put("org.quartz.scheduler.instanceName", "ELASTIC_JOB_CLOUD_TRANSIENT_PRODUCER");
result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}複製代碼
調用 TransientProducerScheduler#register(...)
方法,註冊瞬時做業。實現代碼以下:
// TransientProducerScheduler.java
private final TransientProducerRepository repository;
synchronized void register(final CloudJobConfiguration jobConfig) {
String cron = jobConfig.getTypeConfig().getCoreConfig().getCron();
// 添加 cron 做業集合
JobKey jobKey = buildJobKey(cron);
repository.put(jobKey, jobConfig.getJobName());
// 調度 做業
try {
if (!scheduler.checkExists(jobKey)) {
scheduler.scheduleJob(buildJobDetail(jobKey), buildTrigger(jobKey.getName()));
}
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}複製代碼
#buildJobKey(...)
方法,建立 Quartz JobKey。你會發現頗有意思的使用的是 cron
參數做爲主鍵。Why?在看下 !scheduler.checkExists(jobKey)
處,相同 JobKey( cron
) 的做業不重複註冊到 Quartz Scheduler。Why?此處是一個優化,相同 cron
使用同一個 Quartz Job,Elastic-Job-Cloud-Scheduler 可能會註冊大量的瞬時做業,若是一個瞬時做業建立一個 Quartz Job 太過浪費,特別是 cron
每分鐘、每5分鐘、每小時、天天已經覆蓋了大量的瞬時做業的狀況。所以,相同 cron
使用同一個 Quartz Job。調用 TransientProducerRepository#put(...)
以 Quartz JobKey 爲主鍵聚合做業。
final class TransientProducerRepository {
/** * cron 做業集合 * key:做業Key */
private final ConcurrentHashMap<JobKey, List<String>> cronTasks = new ConcurrentHashMap<>(256, 1);
synchronized void put(final JobKey jobKey, final String jobName) {
remove(jobName);
List<String> taskList = cronTasks.get(jobKey);
if (null == taskList) {
taskList = new CopyOnWriteArrayList<>();
taskList.add(jobName);
cronTasks.put(jobKey, taskList);
return;
}
if (!taskList.contains(jobName)) {
taskList.add(jobName);
}
}
}複製代碼
調用 #buildJobDetail(...)
建立 Quartz Job 信息。實現代碼以下:
private JobDetail buildJobDetail(final JobKey jobKey) {
JobDetail result = JobBuilder.newJob(ProducerJob.class) // ProducerJob.java
.withIdentity(jobKey).build();
result.getJobDataMap().put("repository", repository);
result.getJobDataMap().put("readyService", readyService);
return result;
}複製代碼
JobBuilder#newJob(...)
的參數是 ProducerJob,下文會講解到。調用 #buildTrigger(...)
建立 Quartz Trigger。實現代碼以下:
private Trigger buildTrigger(final String cron) {
return TriggerBuilder.newTrigger()
.withIdentity(cron)
.withSchedule(CronScheduleBuilder.cronSchedule(cron) // cron
.withMisfireHandlingInstructionDoNothing())
.build();
}複製代碼
ProducerJob,當 Quartz Job 到達 cron
執行時間( 即做業執行時間),將相應的瞬時做業添加到待執行做業隊列。實現代碼以下:
public static final class ProducerJob implements Job {
private TransientProducerRepository repository;
private ReadyService readyService;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
List<String> jobNames = repository.get(context.getJobDetail().getKey());
for (String each : jobNames) {
readyService.addTransient(each);
}
}
}複製代碼
調用 TransientProducerRepository#get(...)
方法,得到該 Job 對應的做業集合。實現代碼以下:
final class TransientProducerRepository {
/** * cron 做業集合 * key:做業Key */
private final ConcurrentHashMap<JobKey, List<String>> cronTasks = new ConcurrentHashMap<>(256, 1);
List<String> get(final JobKey jobKey) {
List<String> result = cronTasks.get(jobKey);
return null == result ? Collections.<String>emptyList() : result;
}
}複製代碼
調用 ReadyService#addTransient(...)
方法,添加瞬時做業到待執行做業隊列。實現代碼以下:
/** * 將瞬時做業放入待執行隊列. * * @param jobName 做業名稱 */
public void addTransient(final String jobName) {
//
if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
log.warn("Cannot add transient job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
return;
}
//
Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
if (!cloudJobConfig.isPresent() || CloudJobExecutionType.TRANSIENT != cloudJobConfig.get().getJobExecutionType()) {
return;
}
//
String readyJobNode = ReadyNode.getReadyJobNodePath(jobName);
String times = regCenter.getDirectly(readyJobNode);
if (cloudJobConfig.get().getTypeConfig().getCoreConfig().isMisfire()) {
regCenter.persist(readyJobNode, Integer.toString(null == times ? 1 : Integer.parseInt(times) + 1));
} else {
regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}
}複製代碼
不管是常駐做業仍是瞬時做業,都會加入到待執行做業隊列。目前咱們看到瞬時做業的每次調度是 TransientProducerScheduler 負責。那麼常駐做業的每次調度呢?「5. TaskExecutor 執行任務」會看到它的調度,這是 Elastic-Job-Cloud 設計巧妙有趣的地方。
TaskLaunchScheduledService,任務提交調度服務。它繼承 Guava AbstractScheduledService 實現定時將待執行做業隊列的做業提交到 Mesos 進行調度執行。實現定時代碼以下:
public final class TaskLaunchScheduledService extends AbstractScheduledService {
@Override
protected String serviceName() {
return "task-launch-processor";
}
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule(2, 10, TimeUnit.SECONDS);
}
@Override
protected void runOneIteration() throws Exception {
// .... 省略代碼
}
// ... 省略部分方法
}複製代碼
#runOneIteration()
)。對 Guava AbstractScheduledService 不瞭解的同窗,能夠閱讀完本文後 Google 下。#runOneIteration()
方法相對比較複雜,咱們一塊一塊拆解,耐心理解。實現代碼以下:
@Override
protected void runOneIteration() throws Exception {
try {
System.out.println("runOneIteration:" + new Date());
// 建立 Fenzo 任務請求
LaunchingTasks launchingTasks = new LaunchingTasks(facadeService.getEligibleJobContext());
List<TaskRequest> taskRequests = launchingTasks.getPendingTasks();
// 獲取全部正在運行的雲做業App https://github.com/Netflix/Fenzo/wiki/Constraints
if (!taskRequests.isEmpty()) {
AppConstraintEvaluator.getInstance().loadAppRunningState();
}
// 將任務請求分配到 Mesos Offer
Collection<VMAssignmentResult> vmAssignmentResults = taskScheduler.scheduleOnce(taskRequests, LeasesQueue.getInstance().drainTo()).getResultMap().values();
// 建立 Mesos 任務請求
List<TaskContext> taskContextsList = new LinkedList<>(); // 任務運行時上下文集合
Map<List<Protos.OfferID>, List<Protos.TaskInfo>> offerIdTaskInfoMap = new HashMap<>(); // Mesos 任務信息集合
for (VMAssignmentResult each: vmAssignmentResults) {
List<VirtualMachineLease> leasesUsed = each.getLeasesUsed();
List<Protos.TaskInfo> taskInfoList = new ArrayList<>(each.getTasksAssigned().size() * 10);
taskInfoList.addAll(getTaskInfoList(
launchingTasks.getIntegrityViolationJobs(vmAssignmentResults), // 得到做業分片不完整的做業集合
each, leasesUsed.get(0).hostname(), leasesUsed.get(0).getOffer()));
for (Protos.TaskInfo taskInfo : taskInfoList) {
taskContextsList.add(TaskContext.from(taskInfo.getTaskId().getValue()));
}
offerIdTaskInfoMap.put(getOfferIDs(leasesUsed), // 得到 Offer ID 集合
taskInfoList);
}
// 遍歷任務運行時上下文
for (TaskContext each : taskContextsList) {
// 將任務運行時上下文放入運行時隊列
facadeService.addRunning(each);
// 發佈做業狀態追蹤事件(State.TASK_STAGING)
jobEventBus.post(createJobStatusTraceEvent(each));
}
// 從隊列中刪除已運行的做業
facadeService.removeLaunchTasksFromQueue(taskContextsList);
// 提交任務給 Mesos
for (Entry<List<OfferID>, List<TaskInfo>> each : offerIdTaskInfoMap.entrySet()) {
schedulerDriver.launchTasks(each.getKey(), each.getValue());
}
} catch (Throwable throwable) {
log.error("Launch task error", throwable);
} finally {
// 清理 AppConstraintEvaluator 全部正在運行的雲做業App
AppConstraintEvaluator.getInstance().clearAppRunningState();
}
}複製代碼
// #runOneIteration()
LaunchingTasks launchingTasks = new LaunchingTasks(facadeService.getEligibleJobContext());
List<TaskRequest> taskRequests = launchingTasks.getPendingTasks();複製代碼
調用 FacadeService#getEligibleJobContext()
方法,獲取有資格運行的做業。
// FacadeService.java
/** * 獲取有資格運行的做業. * * @return 做業上下文集合 */
public Collection<JobContext> getEligibleJobContext() {
// 從失效轉移隊列中獲取全部有資格執行的做業上下文
Collection<JobContext> failoverJobContexts = failoverService.getAllEligibleJobContexts();
// 從待執行隊列中獲取全部有資格執行的做業上下文
Collection<JobContext> readyJobContexts = readyService.getAllEligibleJobContexts(failoverJobContexts);
// 合併
Collection<JobContext> result = new ArrayList<>(failoverJobContexts.size() + readyJobContexts.size());
result.addAll(failoverJobContexts);
result.addAll(readyJobContexts);
return result;
}複製代碼
FailoverService#getAllEligibleJobContexts()
方法,從失效轉移隊列中獲取全部有資格執行的做業上下文。TaskLaunchScheduledService 提交的任務還可能來自失效轉移隊列。本文暫時不解析失效轉移隊列相關實現,避免增長複雜度影響你們的理解,在《Elastic-Job-Cloud 源碼分析 —— 做業失效轉移》詳細解析。調用 ReadyService#getAllEligibleJobContexts(...)
方法,從待執行隊列中獲取全部有資格執行的做業上下文。
// ReadyService.java
/** * 從待執行隊列中獲取全部有資格執行的做業上下文. * * @param ineligibleJobContexts 無資格執行的做業上下文 * @return 有資格執行的做業上下文集合 */
public Collection<JobContext> getAllEligibleJobContexts(final Collection<JobContext> ineligibleJobContexts) {
// 不存在 待執行隊列
if (!regCenter.isExisted(ReadyNode.ROOT)) {
return Collections.emptyList();
}
// 無資格執行的做業上下文 轉換成 無資格執行的做業集合
Collection<String> ineligibleJobNames = Collections2.transform(ineligibleJobContexts, new Function<JobContext, String>() {
@Override
public String apply(final JobContext input) {
return input.getJobConfig().getJobName();
}
});
// 獲取 待執行隊列 有資格執行的做業上下文
List<String> jobNames = regCenter.getChildrenKeys(ReadyNode.ROOT);
List<JobContext> result = new ArrayList<>(jobNames.size());
for (String each : jobNames) {
if (ineligibleJobNames.contains(each)) {
continue;
}
// 排除 做業配置 不存在的做業
Optional<CloudJobConfiguration> jobConfig = configService.load(each);
if (!jobConfig.isPresent()) {
regCenter.remove(ReadyNode.getReadyJobNodePath(each));
continue;
}
if (!runningService.isJobRunning(each)) { // 排除 運行中 的做業
result.add(JobContext.from(jobConfig.get(), ExecutionType.READY));
}
}
return result;
}複製代碼
JobContext,做業運行上下文。實現代碼以下:
// JobContext.java
public final class JobContext {
private final CloudJobConfiguration jobConfig;
private final List<Integer> assignedShardingItems;
private final ExecutionType type;
/** * 經過做業配置建立做業運行上下文. * * @param jobConfig 做業配置 * @param type 執行類型 * @return 做業運行上下文 */
public static JobContext from(final CloudJobConfiguration jobConfig, final ExecutionType type) {
int shardingTotalCount = jobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount();
// 分片項
List<Integer> shardingItems = new ArrayList<>(shardingTotalCount);
for (int i = 0; i < shardingTotalCount; i++) {
shardingItems.add(i);
}
return new JobContext(jobConfig, shardingItems, type);
}
}複製代碼
LaunchingTasks,分配任務行爲包。建立 LaunchingTasks 代碼以下:
public final class LaunchingTasks {
/** * 做業上下文集合 * key:做業名 */
private final Map<String, JobContext> eligibleJobContextsMap;
public LaunchingTasks(final Collection<JobContext> eligibleJobContexts) {
eligibleJobContextsMap = new HashMap<>(eligibleJobContexts.size(), 1);
for (JobContext each : eligibleJobContexts) {
eligibleJobContextsMap.put(each.getJobConfig().getJobName(), each);
}
}
}複製代碼
調用 LaunchingTasks#getPendingTasks()
方法,得到待執行任務集合。這裏要注意,每一個做業若是有多個分片,則會生成多個待執行任務,即此處完成了做業分片。實現代碼以下:
// LaunchingTasks.java
/** * 得到待執行任務 * * @return 待執行任務 */
List<TaskRequest> getPendingTasks() {
List<TaskRequest> result = new ArrayList<>(eligibleJobContextsMap.size() * 10);
for (JobContext each : eligibleJobContextsMap.values()) {
result.addAll(createTaskRequests(each));
}
return result;
}
/** * 建立待執行任務集合 * * @param jobContext 做業運行上下文 * @return 待執行任務集合 */
private Collection<TaskRequest> createTaskRequests(final JobContext jobContext) {
Collection<TaskRequest> result = new ArrayList<>(jobContext.getAssignedShardingItems().size());
for (int each : jobContext.getAssignedShardingItems()) {
result.add(new JobTaskRequest(new TaskContext(jobContext.getJobConfig().getJobName(), Collections.singletonList(each), jobContext.getType()), jobContext.getJobConfig()));
}
return result;
}
// TaskContext.java
public final class TaskContext {
/** * 任務編號 */
private String id;
/** * 任務元信息 */
private final MetaInfo metaInfo;
/** * 執行類型 */
private final ExecutionType type;
/** * Mesos Slave 編號 */
private String slaveId;
/** * 是否閒置 */
@Setter
private boolean idle;
public static class MetaInfo {
/** * 做業名 */
private final String jobName;
/** * 做業分片項 */
private final List<Integer> shardingItems;
}
// ... 省略部分方法
}
// JobTaskRequest.JAVA
public final class JobTaskRequest implements TaskRequest {
private final TaskContext taskContext;
private final CloudJobConfiguration jobConfig;
@Override
public String getId() {
return taskContext.getId();
}
@Override
public double getCPUs() {
return jobConfig.getCpuCount();
}
@Override
public double getMemory() {
return jobConfig.getMemoryMB();
}
// ... 省略部分方法
}複製代碼
#createTaskRequests(...)
方法,將單個做業按照其做業分片總數拆分紅一個或多個待執行任務集合。LaunchingTasks#getPendingTasks()
方法的返回結果。友情提示,代碼可能比較多,請耐心觀看。
在說 AppConstraintEvaluator 以前,咱們先一塊兒了簡單解下 Netflix Fenzo。
FROM dockone.io/article/636
Fenzo是一個在Mesos框架上應用的通用任務調度器。它可讓你經過實現各類優化策略的插件,來優化任務調度,同時這也有利於集羣的自動縮放。
Elastic-Job-Cloud-Scheduler 基於 Fenzo 實現對 Mesos 的彈性資源分配。
例如,AppConstraintEvaluator,App 目標 Mesos Slave 適配度限制器,選擇 Slave 時須要考慮其上是否運行有 App 的 Executor,若是沒有運行 Executor 須要將其資源消耗考慮進適配計算算法中。它是 Fenzo ConstraintEvaluator 接口 在 Elastic-Job-Cloud-Scheduler 的自定義任務約束實現。經過這個任務約束,在下文調用 TaskScheduler#scheduleOnce(...)
方法調度任務所需資源時,會將 AppConstraintEvaluator 考慮進去。
那麼做業任務請求( JobTaskRequest ) 是怎麼關聯上 AppConstraintEvaluator 的呢?
// JobTaskRequest.java
public final class JobTaskRequest implements TaskRequest {
@Override
public List<? extends ConstraintEvaluator> getHardConstraints() {
return Collections.singletonList(AppConstraintEvaluator.getInstance());
}
}複製代碼
#getHardConstraints()
方法,關聯上 TaskRequest 和 ConstraintEvaluator。關聯上以後,任務匹配 Mesos Slave 資源時,調用 ConstraintEvaluator#evaluate(...)
實現方法判斷是否符合約束:
public interface ConstraintEvaluator {
public static class Result {
private final boolean isSuccessful;
private final String failureReason;
}
/** * Inspects a target to decide whether or not it meets the constraints appropriate to a particular task. * * @param taskRequest a description of the task to be assigned * @param targetVM a description of the host that is a potential match for the task * @param taskTrackerState the current status of tasks and task assignments in the system at large * @return a successful Result if the target meets the constraints enforced by this constraint evaluator, or * an unsuccessful Result otherwise */
public Result evaluate(TaskRequest taskRequest, VirtualMachineCurrentState targetVM, TaskTrackerState taskTrackerState);
}複製代碼
OK,簡單瞭解結束,有興趣瞭解更多的同窗,請點擊《Fenzo Wiki —— Constraints》。下面來看看 Elastic-Job-Cloud-Scheduler 自定義實現的任務約束 AppConstraintEvaluator。
調用 AppConstraintEvaluator#loadAppRunningState()
方法,加載當前運行中的雲做業App,爲 AppConstraintEvaluator#evaluate(...)
方法提供該數據。代碼實現以下:
// AppConstraintEvaluator.java
private final Set<String> runningApps = new HashSet<>();
void loadAppRunningState() {
try {
for (MesosStateService.ExecutorStateInfo each : facadeService.loadExecutorInfo()) {
runningApps.add(each.getId());
}
} catch (final JSONException | UniformInterfaceException | ClientHandlerException e) {
clearAppRunningState();
}
}複製代碼
調用 FacadeService#loadExecutorInfo()
方法,從 Mesos 獲取全部正在運行的 Mesos 執行器( Executor )的信息。執行器和雲做業App有啥關係?每一個雲做業App 便是一個 Elastic-Job-Cloud-Executor 實例。。FacadeService#loadExecutorInfo()
方法這裏就不展開了,有興趣的同窗本身看下,主要是對 Mesos 的 API操做,咱們來看下 runningApps
的結果:
調用 TaskScheduler#scheduleOnce(...)
方法調度提交任務所需資源時,會調用 ConstraintEvaluator#loadAppRunningState()
檢查分配的資源是否符合任務的約束條件。AppConstraintEvaluator#loadAppRunningState()
實現代碼以下:
// AppConstraintEvaluator.java
@Override
public Result evaluate(final TaskRequest taskRequest, final VirtualMachineCurrentState targetVM, final TaskTrackerState taskTrackerState) {
double assigningCpus = 0.0d;
double assigningMemoryMB = 0.0d;
final String slaveId = targetVM.getAllCurrentOffers().iterator().next().getSlaveId().getValue();
try {
// 判斷當前分配的 Mesos Slave 是否運行着該做業任務請求對應的雲做業App
if (isAppRunningOnSlave(taskRequest.getId(), slaveId)) {
return new Result(true, "");
}
// 判斷當前分配的 Mesos Slave 啓動雲做業App 是否超過資源限制
Set<String> calculatedApps = new HashSet<>(); // 已計算做業App集合
List<TaskRequest> taskRequests = new ArrayList<>(targetVM.getTasksCurrentlyAssigned().size() + 1);
taskRequests.add(taskRequest);
for (TaskAssignmentResult each : targetVM.getTasksCurrentlyAssigned()) { // 當前已經分配做業請求
taskRequests.add(each.getRequest());
}
for (TaskRequest each : taskRequests) {
assigningCpus += each.getCPUs();
assigningMemoryMB += each.getMemory();
if (isAppRunningOnSlave(each.getId(), slaveId)) { // 做業App已經啓動
continue;
}
CloudAppConfiguration assigningAppConfig = getAppConfiguration(each.getId());
if (!calculatedApps.add(assigningAppConfig.getAppName())) { // 是否已經計算該App
continue;
}
assigningCpus += assigningAppConfig.getCpuCount();
assigningMemoryMB += assigningAppConfig.getMemoryMB();
}
} catch (final LackConfigException ex) {
log.warn("Lack config, disable {}", getName(), ex);
return new Result(true, "");
}
if (assigningCpus > targetVM.getCurrAvailableResources().cpuCores()) { // cpu
log.debug("Failure {} {} cpus:{}/{}", taskRequest.getId(), slaveId, assigningCpus, targetVM.getCurrAvailableResources().cpuCores());
return new Result(false, String.format("cpu:%s/%s", assigningCpus, targetVM.getCurrAvailableResources().cpuCores()));
}
if (assigningMemoryMB > targetVM.getCurrAvailableResources().memoryMB()) { // memory
log.debug("Failure {} {} mem:{}/{}", taskRequest.getId(), slaveId, assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB());
return new Result(false, String.format("mem:%s/%s", assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB()));
}
log.debug("Success {} {} cpus:{}/{} mem:{}/{}", taskRequest.getId(), slaveId, assigningCpus, targetVM.getCurrAvailableResources()
.cpuCores(), assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB());
return new Result(true, String.format("cpus:%s/%s mem:%s/%s", assigningCpus, targetVM.getCurrAvailableResources()
.cpuCores(), assigningMemoryMB, targetVM.getCurrAvailableResources().memoryMB()));
}複製代碼
#isAppRunningOnSlave()
方法,判斷當前分配的 Mesos Slave 是否運行着該做業任務請求對應的雲做業App。若雲做業App未運行,則該做業任務請求提交給 Mesos 後,該 Mesos Slave 會啓動該雲做業 App,App 自己會佔用必定的 CloudAppConfiguration#cpu
和 CloudAppConfiguration#memory
,計算時須要統計,避免超過當前 Mesos Slave 剩餘 cpu
和 memory
。Result(true, ...)
;不然,返回 Result(false, ...)
。咱們先簡單瞭解下 Elastic-Job-Cloud-Scheduler 實現的 Mesos Scheduler 類 com.dangdang.ddframe.job.cloud.scheduler.mesos.SchedulerEngine
。調度器的主要職責之一:在接受到的 Offer 上啓動任務。SchedulerEngine 接收到資源 Offer,先存儲到資源預佔隊列( LeasesQueue ),等到做業被調度須要啓動任務時進行使用。存儲到資源預佔隊列實現代碼以下:
public final class SchedulerEngine implements Scheduler {
@Override
public void resourceOffers(final SchedulerDriver schedulerDriver, final List<Protos.Offer> offers) {
for (Protos.Offer offer: offers) {
log.trace("Adding offer {} from host {}", offer.getId(), offer.getHostname());
LeasesQueue.getInstance().offer(offer);
}
}
}複製代碼
org.apache.mesos.Scheduler
,Mesos 調度器接口,實現該接口成爲自定義 Mesos 調度器。實現 #resourceOffers(...)
方法,有新的資源 Offer 時,會進行調用。在 SchedulerEngine 會調用 #offer(...)
方法,存儲 Offer 到資源預佔隊列,實現代碼以下:
public final class LeasesQueue {
/** * 單例 */
private static final LeasesQueue INSTANCE = new LeasesQueue();
private final BlockingQueue<VirtualMachineLease> queue = new LinkedBlockingQueue<>();
/** * 獲取實例. * * @return 單例對象 */
public static LeasesQueue getInstance() {
return INSTANCE;
}
/** * 添加資源至隊列預佔. * * @param offer 資源 */
public void offer(final Protos.Offer offer) {
queue.offer(new VMLeaseObject(offer));
}
// ... 省略 #drainTo() 方法,下文解析。
}複製代碼
另外,可能有同窗對 Mesos Offer 理解比較生澀,Offer 定義以下:
FROM segmentfault.com/a/119000000…
Offer是Mesos資源的抽象,好比說有多少CPU、多少memory,disc是多少,都放在Offer裏,打包給一個Framework,而後Framework來決定到底怎麼用這個Offer。
OK,知識鋪墊完成,回到本小節的重心:
// #runOneIteration()
Collection<VMAssignmentResult> vmAssignmentResults = taskScheduler.scheduleOnce(taskRequests, LeasesQueue.getInstance().drainTo()).getResultMap().values();
// LeasesQueue.java
public final class LeasesQueue {
private final BlockingQueue<VirtualMachineLease> queue = new LinkedBlockingQueue<>();
public List<VirtualMachineLease> drainTo() {
List<VirtualMachineLease> result = new ArrayList<>(queue.size());
queue.drainTo(result);
return result;
}
}複製代碼
調用 TaskScheduler#scheduleOnce(...)
方法,將任務請求分配到 Mesos Offer。經過 Fenzo TaskScheduler 實現對多個任務分配到多個 Mesos Offer 的合理優化分配。這是一個相對複雜的問題。爲何這麼說呢?
FROM 《Mesos 框架構建分佈式應用》 P76
將任務匹配到 offer 上,首次適配一般是最好的算法。你可能會想,若是在更多的工做裏嘗試計算出匹配該 offer 的優化組合,可能比首次適配更能高效地利用 offer。這絕對是正確的,可是要考慮以下這些方面:對於啓動全部等待運行的任務來講,集羣裏要麼有充足的資源要麼沒有。若是資源不少,那麼首次適配確定一直都能保證每一個任務的啓動。若是資源不夠,怎麼都沒法啓動全部任務。所以,編寫代碼選擇接下來會運行哪一個任務是很天然的,這樣才能保證服務的質量。只有當資源剛夠用時,才須要更爲精細的打包算法。不幸的是,這裏的問題 —— 一般稱爲揹包問題( Knapsack problem ) —— 是一個衆所周知的 NP 徹底問題。NP 徹底問題指的是須要至關長時間才能找到最優解決方案的問題,而且沒有任何已知道技巧可以快速解決這類問題。
舉個簡單的例子,只考慮 memory
資源狀況下,有一臺 Slave 內存爲 8GB ,如今要運行三個 1GB 的做業和 5GB 的做業。其中 5GB 的做業在 1GB 運行屢次以後才執行。
實際狀況會比圖更加複雜的多的多。經過使用 Fenzo ,能夠很方便的,而且使人滿意的分配。爲了讓你對 Fenzo 有更加透徹的理解,這裏再引用一段對其的介紹:
FROM 《Mesos 框架構建分佈式應用》 P80
調用庫函數 Fenzo
Fenzo 是 Nettflix 在 2015 年夏天發佈的庫函數。Fenzo 爲基於 java 的調度器提供了完整的解決方案,完成 offer 緩衝,多任務啓動,以及軟和硬約束條件的匹配。就算不是全部的,也是不少調度器都可以受益於使用 Fenzo 來完成計算任務分配,而不用本身編寫 offer 緩衝、打包和放置路由等。
下面,來看兩次 TaskScheduler#scheduleOnce(...)
的返回:
com.netflix.fenzo.VMAssignmentResult
,每臺主機分配任務結果。實現代碼以下:
public class VMAssignmentResult {
/** * 主機 */
private final String hostname;
/** * 使用的 Mesos Offer */
private final List<VirtualMachineLease> leasesUsed;
/** * 分配的任務 */
private final Set<TaskAssignmentResult> tasksAssigned;
}複製代碼
受限於筆者的能力,建議你能夠在閱讀以下文章,更透徹的理解 TaskScheduler :
// #runOneIteration()
List<TaskContext> taskContextsList = new LinkedList<>(); // 任務運行時上下文集合
Map<List<Protos.OfferID>, List<Protos.TaskInfo>> offerIdTaskInfoMap = new HashMap<>(); // Mesos 任務信息集合
for (VMAssignmentResult each: vmAssignmentResults) {
List<VirtualMachineLease> leasesUsed = each.getLeasesUsed();
List<Protos.TaskInfo> taskInfoList = new ArrayList<>(each.getTasksAssigned().size() * 10);
taskInfoList.addAll(getTaskInfoList(
launchingTasks.getIntegrityViolationJobs(vmAssignmentResults), // 得到做業分片不完整的做業集合
each, leasesUsed.get(0).hostname(), leasesUsed.get(0).getOffer()));
for (Protos.TaskInfo taskInfo : taskInfoList) {
taskContextsList.add(TaskContext.from(taskInfo.getTaskId().getValue()));
}
offerIdTaskInfoMap.put(getOfferIDs(leasesUsed), // 得到 Offer ID 集合
taskInfoList);
}複製代碼
offerIdTaskInfoMap
,Mesos 任務信息集合。key 和 value 都爲相同 Mesos Slave Offer 和 任務。爲何?調用 SchedulerDriver#launchTasks(...)
方法提交一次任務時,必須保證全部任務和 Offer 在相同 Mesos Slave 上。
FROM FROM 《Mesos 框架構建分佈式應用》 P61
組合 offer
latchTasks 接受 offer 列表爲輸入,這就容許用戶將一些相同 slave 的 offer 組合起來,從而將這些 offer 的資源放到池裏。它還能接受任務列表爲輸入,這樣就可以啓動適合給定 offer 的足夠多的任務。注意全部任務和 offer 都必須是同一臺 slave —— 若是不在同一臺 slave 上,launchTasks 就會失敗。若是想在多臺 slave 上啓動任務,屢次調用 latchTasks 便可。
調用 LaunchingTasks#getIntegrityViolationJobs(...)
方法,得到做業分片不完整的做業集合。一個做業有多個分片,由於 Mesos Offer 不足,致使有部分分片不能執行,則整個做業都不進行執行。代碼實現以下:
// LaunchingTasks.java
/** * 得到做業分片不完整的做業集合 * * @param vmAssignmentResults 主機分配任務結果集合 * @return 做業分片不完整的做業集合 */
Collection<String> getIntegrityViolationJobs(final Collection<VMAssignmentResult> vmAssignmentResults) {
Map<String, Integer> assignedJobShardingTotalCountMap = getAssignedJobShardingTotalCountMap(vmAssignmentResults);
Collection<String> result = new HashSet<>(assignedJobShardingTotalCountMap.size(), 1);
for (Map.Entry<String, Integer> entry : assignedJobShardingTotalCountMap.entrySet()) {
JobContext jobContext = eligibleJobContextsMap.get(entry.getKey());
if (ExecutionType.FAILOVER != jobContext.getType() // 不包括 FAILOVER 執行類型的做業
&& !entry.getValue().equals(jobContext.getJobConfig().getTypeConfig().getCoreConfig().getShardingTotalCount())) {
log.warn("Job {} is not assigned at this time, because resources not enough to run all sharding instances.", entry.getKey());
result.add(entry.getKey());
}
}
return result;
}
/** * 得到每一個做業分片數集合 * key:做業名 * value:分片總數 * * @param vmAssignmentResults 主機分配任務結果集合 * @return 每一個做業分片數集合 */
private Map<String, Integer> getAssignedJobShardingTotalCountMap(final Collection<VMAssignmentResult> vmAssignmentResults) {
Map<String, Integer> result = new HashMap<>(eligibleJobContextsMap.size(), 1);
for (VMAssignmentResult vmAssignmentResult: vmAssignmentResults) {
for (TaskAssignmentResult tasksAssigned: vmAssignmentResult.getTasksAssigned()) {
String jobName = TaskContext.from(tasksAssigned.getTaskId()).getMetaInfo().getJobName();
if (result.containsKey(jobName)) {
result.put(jobName, result.get(jobName) + 1);
} else {
result.put(jobName, 1);
}
}
}
return result;
}複製代碼
調用 #getTaskInfoList(...)
方法,建立單個主機的 Mesos 任務信息集合。實現代碼以下:
private List<Protos.TaskInfo> getTaskInfoList(final Collection<String> integrityViolationJobs, final VMAssignmentResult vmAssignmentResult, final String hostname, final Protos.Offer offer) {
List<Protos.TaskInfo> result = new ArrayList<>(vmAssignmentResult.getTasksAssigned().size());
for (TaskAssignmentResult each: vmAssignmentResult.getTasksAssigned()) {
TaskContext taskContext = TaskContext.from(each.getTaskId());
String jobName = taskContext.getMetaInfo().getJobName();
if (!integrityViolationJobs.contains(jobName) // 排除做業分片不完整的任務
&& !facadeService.isRunning(taskContext) // 排除正在運行中的任務
&& !facadeService.isJobDisabled(jobName)) { // 排除被禁用的任務
// 建立 Mesos 任務
Protos.TaskInfo taskInfo = getTaskInfo(offer, each);
if (null != taskInfo) {
result.add(taskInfo);
// 添加任務主鍵和主機名稱的映射
facadeService.addMapping(taskInfo.getTaskId().getValue(), hostname);
// 通知 TaskScheduler 主機分配了這個任務
taskScheduler.getTaskAssigner().call(each.getRequest(), hostname);
}
}
}
return result;
}複製代碼
#getTaskInfo(...)
方法,建立單個 Mesos 任務,在「4.4.1 建立單個 Mesos 任務信息」詳細解析。調用 FacadeService#addMapping(...)
方法,添加任務主鍵和主機名稱的映射。經過該映射,能夠根據任務主鍵查詢到對應的主機名。實現代碼以下:
// FacadeService.java
/** * 添加任務主鍵和主機名稱的映射. * * @param taskId 任務主鍵 * @param hostname 主機名稱 */
public void addMapping(final String taskId, final String hostname) {
runningService.addMapping(taskId, hostname);
}
// RunningService.java
/** * 任務主鍵和主機名稱的映射 * key: 任務主鍵 * value: 主機名稱 */
private static final ConcurrentHashMap<String, String> TASK_HOSTNAME_MAPPER = new ConcurrentHashMap<>(TASK_INITIAL_SIZE);
public void addMapping(final String taskId, final String hostname) {
TASK_HOSTNAME_MAPPER.putIfAbsent(taskId, hostname);
}複製代碼
調用 TaskScheduler#getTaskAssigner()#call(...)
方法,通知 TaskScheduler 任務被確認分配到這個主機。TaskScheduler 作任務和 Offer 的匹配,對哪些任務運行在哪些主機是有依賴的,否則怎麼作匹配優化呢。在《Fenzo Wiki —— Notify the Scheduler of Assigns and UnAssigns of Tasks》能夠進一步瞭解。
調用 #getOfferIDs(...)
方法,得到 Offer ID 集合。實現代碼以下:
private List<Protos.OfferID> getOfferIDs(final List<VirtualMachineLease> leasesUsed) {
List<Protos.OfferID> result = new ArrayList<>();
for (VirtualMachineLease virtualMachineLease: leasesUsed) {
result.add(virtualMachineLease.getOffer().getId());
}
return result;
}複製代碼
調用 #getTaskInfo()
方法,建立單個 Mesos 任務信息。實現代碼以下:
以下會涉及大量的 Mesos API
private Protos.TaskInfo getTaskInfo(final Protos.Offer offer, final TaskAssignmentResult taskAssignmentResult) {
// 校驗 做業配置 是否存在
TaskContext taskContext = TaskContext.from(taskAssignmentResult.getTaskId());
Optional<CloudJobConfiguration> jobConfigOptional = facadeService.load(taskContext.getMetaInfo().getJobName());
if (!jobConfigOptional.isPresent()) {
return null;
}
CloudJobConfiguration jobConfig = jobConfigOptional.get();
// 校驗 做業配置 是否存在
Optional<CloudAppConfiguration> appConfigOptional = facadeService.loadAppConfig(jobConfig.getAppName());
if (!appConfigOptional.isPresent()) {
return null;
}
CloudAppConfiguration appConfig = appConfigOptional.get();
// 設置 Mesos Slave ID
taskContext.setSlaveId(offer.getSlaveId().getValue());
// 得到 分片上下文集合
ShardingContexts shardingContexts = getShardingContexts(taskContext, appConfig, jobConfig);
// 瞬時的腳本做業,使用 Mesos 命令行執行,無需使用執行器
boolean isCommandExecutor = CloudJobExecutionType.TRANSIENT == jobConfig.getJobExecutionType() && JobType.SCRIPT == jobConfig.getTypeConfig().getJobType();
String script = appConfig.getBootstrapScript();
if (isCommandExecutor) {
script = ((ScriptJobConfiguration) jobConfig.getTypeConfig()).getScriptCommandLine();
}
// 建立 啓動命令
Protos.CommandInfo.URI uri = buildURI(appConfig, isCommandExecutor);
Protos.CommandInfo command = buildCommand(uri, script, shardingContexts, isCommandExecutor);
// 建立 Mesos 任務信息
if (isCommandExecutor) {
return buildCommandExecutorTaskInfo(taskContext, jobConfig, shardingContexts, offer, command);
} else {
return buildCustomizedExecutorTaskInfo(taskContext, appConfig, jobConfig, shardingContexts, offer, command);
}
}複製代碼
調用 #getShardingContexts(...)
方法, 得到分片上下文集合。實現代碼以下:
private ShardingContexts getShardingContexts(final TaskContext taskContext, final CloudAppConfiguration appConfig, final CloudJobConfiguration jobConfig) {
Map<Integer, String> shardingItemParameters = new ShardingItemParameters(jobConfig.getTypeConfig().getCoreConfig().getShardingItemParameters()).getMap();
Map<Integer, String> assignedShardingItemParameters = new HashMap<>(1, 1);
int shardingItem = taskContext.getMetaInfo().getShardingItems().get(0); // 單個做業分片
assignedShardingItemParameters.put(shardingItem, shardingItemParameters.containsKey(shardingItem) ? shardingItemParameters.get(shardingItem) : "");
return new ShardingContexts(taskContext.getId(), jobConfig.getJobName(), jobConfig.getTypeConfig().getCoreConfig().getShardingTotalCount(),
jobConfig.getTypeConfig().getCoreConfig().getJobParameter(), assignedShardingItemParameters, appConfig.getEventTraceSamplingCount());
}複製代碼
調用 #buildURI(...)
方法,建立執行器的二進制文件下載地址。試下代碼以下:
private Protos.CommandInfo.URI buildURI(final CloudAppConfiguration appConfig, final boolean isCommandExecutor) {
Protos.CommandInfo.URI.Builder result = Protos.CommandInfo.URI.newBuilder()
.setValue(appConfig.getAppURL())
.setCache(appConfig.isAppCacheEnable()); // cache
if (isCommandExecutor && !SupportedExtractionType.isExtraction(appConfig.getAppURL())) {
result.setExecutable(true); // 是否可執行
} else {
result.setExtract(true); // 是否須要解壓
}
return result.build();
}複製代碼
CloudAppConfiguration.appURL
,經過 Mesos 實現文件的下載。雲做業應用配置 CloudAppConfiguration.appCacheEnable
,應用文件下載是否緩存。
FROM 《Mesos 框架構建分佈式應用》 P99
Fetcher 緩存
Mesos 0.23 裏發佈稱爲 fetcher 緩存的新功能。fetcher 緩存確保每一個 artifact 在每一個 slave 只會下載一次,即便多個執行器請求同一個 artifact,也只須要等待單詞下載完成便可。
調用 #buildCommand(...)
方法,建立執行器啓動命令。實現代碼以下:
private Protos.CommandInfo buildCommand(final Protos.CommandInfo.URI uri, final String script, final ShardingContexts shardingContexts, final boolean isCommandExecutor) {
Protos.CommandInfo.Builder result = Protos.CommandInfo.newBuilder().addUris(uri).setShell(true);
if (isCommandExecutor) {
CommandLine commandLine = CommandLine.parse(script);
commandLine.addArgument(GsonFactory.getGson().toJson(shardingContexts), false);
result.setValue(Joiner.on(" ").join(commandLine.getExecutable(), Joiner.on(" ").join(commandLine.getArguments())));
} else {
result.setValue(script);
}
return result.build();
}複製代碼
調用 #buildCommandExecutorTaskInfo(...)
方法,爲瞬時的腳本做業建立 Mesos 任務信息。實現代碼以下:
private Protos.TaskInfo buildCommandExecutorTaskInfo(final TaskContext taskContext, final CloudJobConfiguration jobConfig, final ShardingContexts shardingContexts, final Protos.Offer offer, final Protos.CommandInfo command) {
Protos.TaskInfo.Builder result = Protos.TaskInfo.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(taskContext.getId()).build())
.setName(taskContext.getTaskName()).setSlaveId(offer.getSlaveId())
.addResources(buildResource("cpus", jobConfig.getCpuCount(), offer.getResourcesList()))
.addResources(buildResource("mem", jobConfig.getMemoryMB(), offer.getResourcesList()))
.setData(ByteString.copyFrom(new TaskInfoData(shardingContexts, jobConfig).serialize())); //
return result.setCommand(command).build();
}複製代碼
調用 #buildCustomizedExecutorTaskInfo(...)
方法,建立 Mesos 任務信息。實現代碼以下:
private Protos.TaskInfo buildCustomizedExecutorTaskInfo(final TaskContext taskContext, final CloudAppConfiguration appConfig, final CloudJobConfiguration jobConfig, final ShardingContexts shardingContexts, final Protos.Offer offer, final Protos.CommandInfo command) {
Protos.TaskInfo.Builder result = Protos.TaskInfo.newBuilder().setTaskId(Protos.TaskID.newBuilder().setValue(taskContext.getId()).build())
.setName(taskContext.getTaskName()).setSlaveId(offer.getSlaveId())
.addResources(buildResource("cpus", jobConfig.getCpuCount(), offer.getResourcesList()))
.addResources(buildResource("mem", jobConfig.getMemoryMB(), offer.getResourcesList()))
.setData(ByteString.copyFrom(new TaskInfoData(shardingContexts, jobConfig).serialize()));
// ExecutorInfo
Protos.ExecutorInfo.Builder executorBuilder = Protos.ExecutorInfo.newBuilder().setExecutorId(Protos.ExecutorID.newBuilder()
.setValue(taskContext.getExecutorId(jobConfig.getAppName()))) // 執行器 ID
.setCommand(command)
.addResources(buildResource("cpus", appConfig.getCpuCount(), offer.getResourcesList()))
.addResources(buildResource("mem", appConfig.getMemoryMB(), offer.getResourcesList()));
if (env.getJobEventRdbConfiguration().isPresent()) {
executorBuilder.setData(ByteString.copyFrom(SerializationUtils.serialize(env.getJobEventRdbConfigurationMap()))).build();
}
return result.setExecutor(executorBuilder.build()).build();
}複製代碼
調用 Protos.ExecutorInfo.Builder#setValue(...)
方法,設置執行器編號。大多數在 Mesos 實現的執行器,一個任務對應一個執行器。而 Elastic-Job-Cloud-Executor 不一樣於大多數在 Mesos 上的執行器,一個執行器能夠對應多個做業。什麼意思?在一個 Mesos Slave,相同做業應用,只會啓動一個 Elastic-Job-Cloud-Scheduler。當該執行器不存在時,啓動一個。當該執行器已經存在,複用該執行器。那麼是如何實現該功能的呢?相同做業應用,在同一個 Mesos Slave,使用相同執行器編號。實現代碼以下:
/** * 獲取任務執行器主鍵. * * @param appName 應用名稱 * @return 任務執行器主鍵 */
public String getExecutorId(final String appName) {
return Joiner.on(DELIMITER).join(appName, slaveId);
}複製代碼
調用 FacadeService#addRunning(...)
方法,將任務運行時上下文放入運行時隊列。實現代碼以下:
// FacadeService.java
/** * 將任務運行時上下文放入運行時隊列. * * @param taskContext 任務運行時上下文 */
public void addRunning(final TaskContext taskContext) {
runningService.add(taskContext);
}
// RunningService.java
/** * 將任務運行時上下文放入運行時隊列. * * @param taskContext 任務運行時上下文 */
public void add(final TaskContext taskContext) {
if (!configurationService.load(taskContext.getMetaInfo().getJobName()).isPresent()) {
return;
}
// 添加到運行中的任務集合
getRunningTasks(taskContext.getMetaInfo().getJobName()).add(taskContext);
// 判斷是否爲常駐任務
if (!isDaemon(taskContext.getMetaInfo().getJobName())) {
return;
}
// 添加到運行中隊列
String runningTaskNodePath = RunningNode.getRunningTaskNodePath(taskContext.getMetaInfo().toString());
if (!regCenter.isExisted(runningTaskNodePath)) {
regCenter.persist(runningTaskNodePath, taskContext.getId());
}
}
// RunningNode.java
final class RunningNode {
static final String ROOT = StateNode.ROOT + "/running";
private static final String RUNNING_JOB = ROOT + "/%s"; // %s = ${JOB_NAME}
private static final String RUNNING_TASK = RUNNING_JOB + "/%s"; // %s = ${TASK_META_INFO}。${TASK_META_INFO}=${JOB_NAME}@-@${ITEM_ID}。
}複製代碼
調用 #getRunningTasks()
方法,得到運行中的任務集合,並將當前任務添加到其中。實現代碼以下:
public Collection<TaskContext> getRunningTasks(final String jobName) {
Set<TaskContext> taskContexts = new CopyOnWriteArraySet<>();
Collection<TaskContext> result = RUNNING_TASKS.putIfAbsent(jobName, taskContexts);
return null == result ? taskContexts : result;
}複製代碼
在運維平臺,咱們能夠看到當前任務正在運行中:
常駐做業會存儲在運行中做業隊列。運行中做業隊列存儲在註冊中心( Zookeeper )的持久數據節點 /${NAMESPACE}/state/running/${JOB_NAME}/${TASK_META_INFO}
,存儲值爲任務編號。使用 zkClient 查看以下:
[zk: localhost:2181(CONNECTED) 14] ls /elastic-job-cloud/state/running/test_job_simple
[test_job_simple@-@0, test_job_simple@-@1, test_job_simple@-@2]
[zk: localhost:2181(CONNECTED) 15] get /elastic-job-cloud/state/running/test_job_simple/test_job_simple@-@0
test_job_simple@-@0@-@READY@-@400197d9-76ca-464b-b2f0-e0fba5c2a598-S0@-@9780ed12-9612-45e3-ac14-feb2911896ff複製代碼
// #runOneIteration()
facadeService.removeLaunchTasksFromQueue(taskContextsList);
// FacadeService.java
/** * 從隊列中刪除已運行的做業. * * @param taskContexts 任務上下文集合 */
public void removeLaunchTasksFromQueue(final List<TaskContext> taskContexts) {
List<TaskContext> failoverTaskContexts = new ArrayList<>(taskContexts.size());
Collection<String> readyJobNames = new HashSet<>(taskContexts.size(), 1);
for (TaskContext each : taskContexts) {
switch (each.getType()) {
case FAILOVER:
failoverTaskContexts.add(each);
break;
case READY:
readyJobNames.add(each.getMetaInfo().getJobName());
break;
default:
break;
}
}
// 從失效轉移隊列中刪除相關任務
failoverService.remove(Lists.transform(failoverTaskContexts, new Function<TaskContext, TaskContext.MetaInfo>() {
@Override
public TaskContext.MetaInfo apply(final TaskContext input) {
return input.getMetaInfo();
}
}));
// 從待執行隊列中刪除相關做業
readyService.remove(readyJobNames);
}複製代碼
// #runOneIteration()
for (Entry<List<OfferID>, List<TaskInfo>> each : offerIdTaskInfoMap.entrySet()) {
schedulerDriver.launchTasks(each.getKey(), each.getValue());
}複製代碼
SchedulerDriver#launchTasks(...)
方法,提交任務給 Mesos Master。由 Mesos Master 調度任務給 Mesos Slave。Mesos Slave 提交執行器執行任務。TaskExecutor,實現了 Mesos Executor 接口 org.apache.mesos.Executor
。執行器的主要職責之一:執行調度器所請求的任務。TaskExecutor 接收到 Mesos Slave 提交的任務,調用 #launchTask(...)
方法,處理任務。實現代碼以下:
// DaemonTaskScheduler.java
@Override
public void launchTask(final ExecutorDriver executorDriver, final Protos.TaskInfo taskInfo) {
executorService.submit(new TaskThread(executorDriver, taskInfo));
}複製代碼
ExecutorService#submit(...)
方法,提交 TaskThread 到線程池,執行任務。@RequiredArgsConstructor
class TaskThread implements Runnable {
private final ExecutorDriver executorDriver;
private final TaskInfo taskInfo;
@Override
public void run() {
// 更新 Mesos 任務狀態,運行中。
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_RUNNING).build());
//
Map<String, Object> data = SerializationUtils.deserialize(taskInfo.getData().toByteArray());
ShardingContexts shardingContexts = (ShardingContexts) data.get("shardingContext");
@SuppressWarnings("unchecked")
JobConfigurationContext jobConfig = new JobConfigurationContext((Map<String, String>) data.get("jobConfigContext"));
try {
// 得到 分佈式做業
ElasticJob elasticJob = getElasticJobInstance(jobConfig);
// 調度器提供內部服務的門面對象
final CloudJobFacade jobFacade = new CloudJobFacade(shardingContexts, jobConfig, jobEventBus);
// 執行做業
if (jobConfig.isTransient()) {
// 執行做業
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
// 更新 Mesos 任務狀態,已完成。
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_FINISHED).build());
} else {
// 初始化 常駐做業調度器
new DaemonTaskScheduler(elasticJob, jobConfig, jobFacade, executorDriver, taskInfo.getTaskId()).init();
}
// CHECKSTYLE:OFF
} catch (final Throwable ex) {
// CHECKSTYLE:ON
log.error("Elastic-Job-Cloud-Executor error", ex);
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskInfo.getTaskId()).setState(Protos.TaskState.TASK_ERROR).setMessage(ExceptionUtil.transform(ex)).build());
executorDriver.stop();
throw ex;
}
}
}複製代碼
TaskInfo.data
屬性中,能夠得到提交任務附帶的數據,例如分片上下文集合( ShardingContexts ),內部的做業配置上下文( JobConfigurationContext )。調用 #getElasticJobInstance()
方法,得到任務須要執行的分佈式做業( Elastic-Job )。實現代碼以下:
private ElasticJob getElasticJobInstance(final JobConfigurationContext jobConfig) {
if (!Strings.isNullOrEmpty(jobConfig.getBeanName()) && !Strings.isNullOrEmpty(jobConfig.getApplicationContext())) { // spring 環境
return getElasticJobBean(jobConfig);
} else {
return getElasticJobClass(jobConfig);
}
}
/** * 從 Spring 容器中得到做業對象 * * @param jobConfig 做業配置 * @return 做業對象 */
private ElasticJob getElasticJobBean(final JobConfigurationContext jobConfig) {
String applicationContextFile = jobConfig.getApplicationContext();
if (null == applicationContexts.get(applicationContextFile)) {
synchronized (applicationContexts) {
if (null == applicationContexts.get(applicationContextFile)) {
applicationContexts.put(applicationContextFile, new ClassPathXmlApplicationContext(applicationContextFile));
}
}
}
return (ElasticJob) applicationContexts.get(applicationContextFile).getBean(jobConfig.getBeanName());
}
/** * 建立做業對象 * * @param jobConfig 做業配置 * @return 做業對象 */
private ElasticJob getElasticJobClass(final JobConfigurationContext jobConfig) {
String jobClass = jobConfig.getTypeConfig().getJobClass();
try {
Class<?> elasticJobClass = Class.forName(jobClass);
if (!ElasticJob.class.isAssignableFrom(elasticJobClass)) {
throw new JobSystemException("Elastic-Job: Class '%s' must implements ElasticJob interface.", jobClass);
}
if (elasticJobClass != ScriptJob.class) {
return (ElasticJob) elasticJobClass.newInstance();
}
return null;
} catch (final ReflectiveOperationException ex) {
throw new JobSystemException("Elastic-Job: Class '%s' initialize failure, the error message is '%s'.", jobClass, ex.getMessage());
}
}複製代碼
AbstractElasticJobExecutor#execute(...)
執行做業邏輯,並調用 ExecutorDriver#sendStatusUpdate(...)
發送狀態,更新 Mesos 任務已完成( Protos.TaskState.TASK_FINISHED )。AbstractElasticJobExecutor#execute(...)
實現代碼,在 Elastic-Job-Lite 和 Elastic-Job-Cloud 基本一致,在《Elastic-Job-Lite 源碼分析 —— 做業執行》有詳細解析。DaemonTaskScheduler#init()
方法,初始化做業調度,在「5.2 DaemonTaskScheduler」詳細解析。瞬時做業,經過 Elastic-Job-Cloud-Scheduler 調度任務,提交 Elastic-Job-Cloud-Executor 執行後,等待 Elastic-Job-Scheduler 進行下次調度。
常駐做業,經過 Elastic-Job-Scheduler 提交 Elastic-Job-Cloud-Executor 進行調度。Elastic-Job-Cloud-Executor 使用 DaemonTaskScheduler 不斷對常駐做業進行調度而無需 Elastic-Job-Cloud-Scheduler 參與其中。
這就是瞬時做業和常駐做業不一樣之處。
DaemonTaskScheduler,常駐做業調度器。調用 DaemonTaskScheduler#init()
方法,對一個做業初始化調度,實現代碼以下:
/** * 初始化做業. */
public void init() {
// Quartz JobDetail
JobDetail jobDetail = JobBuilder.newJob(DaemonJob.class)
.withIdentity(jobRootConfig.getTypeConfig().getCoreConfig().getJobName()).build();
jobDetail.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJob);
jobDetail.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
jobDetail.getJobDataMap().put(EXECUTOR_DRIVER_DATA_MAP_KEY, executorDriver);
jobDetail.getJobDataMap().put(TASK_ID_DATA_MAP_KEY, taskId);
try {
scheduleJob(initializeScheduler(), jobDetail, taskId.getValue(), jobRootConfig.getTypeConfig().getCoreConfig().getCron());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private Scheduler initializeScheduler() throws SchedulerException {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
return factory.getScheduler();
}
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1"); // 線程數:1
result.put("org.quartz.scheduler.instanceName", taskId.getValue());
if (!jobRootConfig.getTypeConfig().getCoreConfig().isMisfire()) {
result.put("org.quartz.jobStore.misfireThreshold", "1");
}
result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
private void scheduleJob(final Scheduler scheduler, final JobDetail jobDetail, final String triggerIdentity, final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(triggerIdentity, cron));
}
scheduler.start();
RUNNING_SCHEDULERS.putIfAbsent(scheduler.getSchedulerName(), scheduler);
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private CronTrigger createTrigger(final String triggerIdentity, final String cron) {
return TriggerBuilder.newTrigger()
.withIdentity(triggerIdentity)
.withSchedule(CronScheduleBuilder.cronSchedule(cron)
.withMisfireHandlingInstructionDoNothing())
.build();
}複製代碼
DaemonJob 實現代碼以下:
public static final class DaemonJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Setter
private ExecutorDriver executorDriver;
@Setter
private Protos.TaskID taskId;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
int jobEventSamplingCount = shardingContexts.getJobEventSamplingCount();
int currentJobEventSamplingCount = shardingContexts.getCurrentJobEventSamplingCount();
if (jobEventSamplingCount > 0 && ++currentJobEventSamplingCount < jobEventSamplingCount) {
shardingContexts.setCurrentJobEventSamplingCount(currentJobEventSamplingCount);
//
jobFacade.getShardingContexts().setAllowSendJobEvent(false);
// 執行做業
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
} else {
//
jobFacade.getShardingContexts().setAllowSendJobEvent(true);
//
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("BEGIN").build());
// 執行做業
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
//
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
//
shardingContexts.setCurrentJobEventSamplingCount(0);
}
}
}複製代碼
AbstractElasticJobExecutor#execute(...)
執行做業邏輯。AbstractElasticJobExecutor#execute(...)
實現代碼,在 Elastic-Job-Lite 和 Elastic-Job-Cloud 基本一致,在《Elastic-Job-Lite 源碼分析 —— 做業執行》有詳細解析。jobEventSamplingCount
來自應用配置 (CloudAppConfiguration.eventTraceSamplingCount
) 屬性,常駐做業事件採樣率統計條數,默認採樣所有記錄。爲避免數據量過大,可對頻繁調度的常駐做業配置採樣率,即做業每執行N次,纔會記錄做業執行及追蹤相關數據。
當知足採樣條件時,調用 ShardingContexts#setAllowSendJobEvent(true)
,標記要記錄做業事件。不然,調用 ShardingContexts#setAllowSendJobEvent(false)
,標記不記錄做業時間。做業事件追蹤在《Elastic-Job-Lite 源碼分析 —— 做業事件追蹤》有詳細解析。
另外,當知足採樣調試時,也會調用 ExecutorDriver#sendStatusUpdate(...)
方法,更新 Mesos 任務狀態爲運行中,並附帶 "BEGIN"
或 "COMPLETE"
消息。
Mesos 調度器的職責之一,處理任務的狀態,特別是響應任務和故障。所以在 Elastic-Job-Cloud-Executor 調用 ExecutorDriver#sendStatusUpdate(...)
方法,更新 Mesos 任務狀態時,觸發調用 Elastic-Job-Cloud-Scheduler 的 SchedulerEngine 的 #statusUpdate(...)
方法,實現代碼以下:
@Override
public void statusUpdate(final SchedulerDriver schedulerDriver, final Protos.TaskStatus taskStatus) {
String taskId = taskStatus.getTaskId().getValue();
TaskContext taskContext = TaskContext.from(taskId);
String jobName = taskContext.getMetaInfo().getJobName();
log.trace("call statusUpdate task state is: {}, task id is: {}", taskStatus.getState(), taskId);
jobEventBus.post(new JobStatusTraceEvent(jobName, taskContext.getId(), taskContext.getSlaveId(), Source.CLOUD_SCHEDULER,
taskContext.getType(), String.valueOf(taskContext.getMetaInfo().getShardingItems()), State.valueOf(taskStatus.getState().name()), taskStatus.getMessage()));
switch (taskStatus.getState()) {
case TASK_RUNNING:
if (!facadeService.load(jobName).isPresent()) {
schedulerDriver.killTask(Protos.TaskID.newBuilder().setValue(taskId).build());
}
if ("BEGIN".equals(taskStatus.getMessage())) {
facadeService.updateDaemonStatus(taskContext, false);
} else if ("COMPLETE".equals(taskStatus.getMessage())) {
facadeService.updateDaemonStatus(taskContext, true);
statisticManager.taskRunSuccessfully();
}
break;
case TASK_FINISHED:
facadeService.removeRunning(taskContext);
unAssignTask(taskId);
statisticManager.taskRunSuccessfully();
break;
case TASK_KILLED:
log.warn("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
facadeService.removeRunning(taskContext);
facadeService.addDaemonJobToReadyQueue(jobName);
unAssignTask(taskId);
break;
case TASK_LOST:
case TASK_DROPPED:
case TASK_GONE:
case TASK_GONE_BY_OPERATOR:
case TASK_FAILED:
case TASK_ERROR:
log.warn("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
facadeService.removeRunning(taskContext);
facadeService.recordFailoverTask(taskContext);
unAssignTask(taskId);
statisticManager.taskRunFailed();
break;
case TASK_UNKNOWN:
case TASK_UNREACHABLE:
log.error("task id is: {}, status is: {}, message is: {}, source is: {}", taskId, taskStatus.getState(), taskStatus.getMessage(), taskStatus.getSource());
statisticManager.taskRunFailed();
break;
default:
break;
}
}複製代碼
當更新 Mesos 任務狀態爲 TASK_RUNNING
時,根據附帶消息爲 "BEGIN"
或 "COMPLETE"
,分別調用 FacadeService#updateDaemonStatus(false / true)
方法,更新做業閒置狀態。實現代碼以下:
// FacadeService.java
/** * 更新常駐做業運行狀態. * * @param taskContext 任務運行時上下文 * @param isIdle 是否空閒 */
public void updateDaemonStatus(final TaskContext taskContext, final boolean isIdle) {
runningService.updateIdle(taskContext, isIdle);
}
// RunningService.java
/** * 更新做業閒置狀態. * @param taskContext 任務運行時上下文 * @param isIdle 是否閒置 */
public void updateIdle(final TaskContext taskContext, final boolean isIdle) {
synchronized (RUNNING_TASKS) {
Optional<TaskContext> taskContextOptional = findTask(taskContext);
if (taskContextOptional.isPresent()) {
taskContextOptional.get().setIdle(isIdle);
} else {
add(taskContext);
}
}
}複製代碼
若做業配置不存在時,調用 SchedulerDriver#killTask(...)
方法,殺死該 Mesos 任務。在《Elastic-Job-Cloud 源碼分析 —— 做業調度(二)》進一步解析。
當更新 Mesos 任務狀態爲 TASK_FINISHED
時,調用 FacadeService#removeRunning(...)
方法,將任務從運行時隊列刪除。實現代碼以下:
// FacadeService.java
/** * 將任務從運行時隊列刪除. * * @param taskContext 任務運行時上下文 */
public void removeRunning(final TaskContext taskContext) {
runningService.remove(taskContext);
}
// RunningService.java
/** * 將任務從運行時隊列刪除. * * @param taskContext 任務運行時上下文 */
public void remove(final TaskContext taskContext) {
// 移除運行中的任務集合
getRunningTasks(taskContext.getMetaInfo().getJobName()).remove(taskContext);
// 判斷是否爲常駐任務
if (!isDaemonOrAbsent(taskContext.getMetaInfo().getJobName())) {
return;
}
// 將任務從運行時隊列刪除
regCenter.remove(RunningNode.getRunningTaskNodePath(taskContext.getMetaInfo().toString()));
String jobRootNode = RunningNode.getRunningJobNodePath(taskContext.getMetaInfo().getJobName());
if (regCenter.isExisted(jobRootNode) && regCenter.getChildrenKeys(jobRootNode).isEmpty()) {
regCenter.remove(jobRootNode);
}
}複製代碼
當該做業對應的全部 Mesos 任務狀態都更新爲 TASK_FINISHED
後,做業能夠再次被 Elastic-Job-Cloud-Scheduler 調度。
調用 #unAssignTask(...)
方法,通知 TaskScheduler 任務被確認未分配到這個主機。TaskScheduler 作任務和 Offer 的匹配,對哪些任務運行在哪些主機是有依賴的,否則怎麼作匹配優化呢。在《Fenzo Wiki —— Notify the Scheduler of Assigns and UnAssigns of Tasks》能夠進一步瞭解。實現代碼以下:
private void unAssignTask(final String taskId) {
String hostname = facadeService.popMapping(taskId);
if (null != hostname) {
taskScheduler.getTaskUnAssigner().call(TaskContext.getIdForUnassignedSlave(taskId), hostname);
}
}複製代碼
當更新 Mesos 任務狀態爲 TASK_KILLED
時,調用 FacadeService#addDaemonJobToReadyQueue(...)
方法,將常駐做業放入待執行隊列。在《Elastic-Job-Cloud 源碼分析 —— 做業調度(二)》進一步解析。TODO
另外會調用 FacadeService#removeRunning(...)
、#unAssignTask(...)
方法。
當更新 Mesos 任務狀態爲 TASK_ERROR
等等時,調用 FacadeService#recordFailoverTask(...)
方法,在 《Elastic-Job-Cloud 源碼分析 —— 做業失效轉移》詳細解析。
另外會調用 FacadeService#removeRunning(...)
和 #unAssignTask(...)
方法。
旁白君:真的真的真的,好長好長好長啊。可是真的真的真的,乾貨!
芋道君:那必須的!
道友,趕忙上車,分享一波朋友圈!