Schedulerx2.0是阿里中間件自研的基於akka架構的新一代分佈式任務調度平臺,提供定時、任務編排、分佈式跑批等功能,具備高可靠、海量任務、秒級調度等能力。java
Schedulerx2.0提供可視化的工做流進行任務編排,該文章將詳細介紹如何使用schedulerx2.0的工做流進行上下游任務的數據傳輸。架構
當前只有java任務支持數據傳輸,網格計算請使用MapReduce模型進行數據傳輸。less
/** * * @param status * @param result, the size should less than 1000 bytes * @throws Exception */ public ProcessResult(boolean status, String result) throws Exception;
在Processor結尾,經過該結果替代ProcessResult(boolean status),能夠返回執行結果。分佈式
result的長度不能超過1000個字節(注意,不是String的長度,若是有中文字符,可能會超過1000個字節!),若是超過1000個字節,任務會失敗。ide
List<JobInstanceData> upstreamDatas = JobContext.getUpstreamData();
在Processor裏,能夠經過該接口從JobContext中拿到上游的數據。上游的數據是一個list(可能有多個父節點),JobInstanceData裏有兩個屬性,分別是jobName和data(String類型)。url
首先咱們寫三個jobProcessorspa
public class TestSimpleJobA extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { System.out.println("TestSimpleJobA " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); return new ProcessResult(true, String.valueOf(1)); } }
public class TestSimpleJobB extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { System.out.println("TestSimpleJobB " + DateTime.now().toString("yyyy-MM-dd HH:mm:ss")); return new ProcessResult(true, String.valueOf(2)); } }
public class TestSimpleJobC extends JavaProcessor { @Override public ProcessResult process(JobContext context) throws Exception { List<JobInstanceData> upstreamDatas = context.getUpstreamData(); int sum = 0; for (JobInstanceData jobInstanceData : upstreamDatas) { System.out.println("jobName=" + jobInstanceData.getJobName() + ", data=" + jobInstanceData.getData()); sum += Integer.valueOf(jobInstanceData.getData()); } System.out.println("TestSimpleJobC sum=" + sum); return new ProcessResult(true, String.valueOf(sum)); } }
經過控制檯配置工做流以下圖所示:code
觸發一次該工做流,而後進入工做流實例圖,右鍵jobA的實例,進入詳情,能夠看到jobA實例結果=1,以下圖中間件
同理,能夠看到jobB的實例結果=2, jobC的實例結果=3blog
控制檯也能看到jobC的機器打印
jobName=jobB, data=2 jobName=jobA, data=1 TestSimpleJobC sum=3
原文連接 本文爲雲棲社區原創內容,未經容許不得轉載。