分佈式開源調度框架TBSchedule

TBSchedule使用

1、下載TBSchedule 源碼java

http://code.taobao.org/svn/tbschedule/sql

2、編譯TBSchedule源碼造成jar包數據庫

mvn package/直接mvn deploye/install 一步完成服務器

3、安裝到本地的maven倉庫session

mvn deploy/installjvm

4、在項目pom配置文件中引用這個依賴文件maven

<dependency>ide

<groupId>com.taobao.pamirs.schedule</groupId>svn

<artifactId>tbschedule</artifactId>fetch

<version>3.3.3.2</version>

</dependency>

5、配置TBSchedule依賴的zookeeper配置,TBSchedule的調度依賴於zookeeper的節點配置和心跳連接。

 

# zooker service address

schedule.zookeeper.address=120.25.87.176:2181

# root path

schedule.root.path=/bbb/dd

# session timeout

schedule.session.timeout=60000

# userName

schedule.zookeeper.username=ScheduleAdmin

# password

schedule.zookeeper.password=password

 

<context:property-placeholder location="classpath:schedule.properties" ignore-resource-not-found="true" ignore-unresolvable="true"/>

<bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"

init-method="init">

<property name="zkConfig">

<map>

<entry key="zkConnectString" value="${schedule.zookeeper.address}" />

<entry key="rootPath" value="${schedule.root.path}" />//在控制檯鏈接zookeeper的路徑要和該配置的路徑一致,因爲配置時默認要檢查父節點是否爲空,因此最好先檢查該路徑是否存在。

<entry key="zkSessionTimeout" value="${schedule.session.timeout}" />

<entry key="userName" value="${schedule.zookeeper.username} 必須和控制檯的用戶名同樣" />

<entry key="password" value="${schedule.zookeeper.password} 必須和" />

<entry key="isCheckParentPath" value="true" />

</map>

</property>

</bean>

6、配置任務項和任務策略,將任務分片,查詢任務

六-1、經過代碼實現任務的分配

private Logger log = LoggerFactory

.getLogger(getClass());

TBScheduleManagerFactory scheduleManagerFactory;

/**

* 心跳鏈接時間

*/

private int heartBeatRate = 5*1000;

 

private int judgeDeadInterval = 1*60*1000;

 

private String taskParameter="";

 

private String strategyTaskParameter="";

 

/**

* jvm最大單線程數量

*/

private int numOfSingleServer = 2;

 

/**

* 最大線程組數量 總 最大線程數量 = jvm最大單線程數量 * 最大的線程組總量

* 將任務列表分紅幾個隊列來處理

*/

private int assignNum = 2;

 

private String[] iPLists={"127.0.0.1"};

 

private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};

 

private String[] baseTaskTypeNames;

 

private String[] dealBeanNames;

 

/**

* 開始時間

*/

private String permitRunStartTime;

 

/**

* 結束時間

*/

private String permitRunEndTime;

 

/**

* 批處理時 每次處理任務的數量

*/

private int executeNumber = 1;

 

/**

* 處理完一批數據後的休息時間

*/

private int sleepTimeInterval = 0;

 

/**

* 採集不到數據的休眠時間

*/

private int sleepTimeNoData = 500;

 

/**

* 每次採集任務的數量

*/

private int fetchDataNumber = 500;

 

@Autowired

public void setScheduleManagerFactory(

TBScheduleManagerFactory tbScheduleManagerFactory) {

this.scheduleManagerFactory = tbScheduleManagerFactory;

}

 

@Test

public void initialConfigData() throws Exception {

}

 

@Override

public void afterPropertiesSet() throws Exception {

if(baseTaskTypeNames.length != dealBeanNames.length) {

throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");

}

String baseTaskTypeName;

String dealBeanName;

for (int i = 0; i < baseTaskTypeNames.length; i++) {

baseTaskTypeName = baseTaskTypeNames[i];

dealBeanName= dealBeanNames[i];

configTasks(baseTaskTypeName, dealBeanName);

}

}

 

private void configTasks(String baseTaskTypeName, String dealBeanName)

throws Exception, InterruptedException {

while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){

Thread.sleep(1000);

}

scheduleManagerFactory.stopServer(null);

Thread.sleep(1000);

try {

this.scheduleManagerFactory.getScheduleDataManager()

.deleteTaskType(baseTaskTypeName);

} catch (Exception e) {

 

}

ScheduleTaskType baseTaskType = new ScheduleTaskType();

baseTaskType.setBaseTaskType(baseTaskTypeName);

baseTaskType.setDealBeanName(dealBeanName);

baseTaskType.setHeartBeatRate(heartBeatRate);

baseTaskType.setJudgeDeadInterval(judgeDeadInterval);

baseTaskType.setTaskParameter(taskParameter);

baseTaskType.setTaskItems(taskItems);

baseTaskType.setPermitRunStartTime(permitRunStartTime);

baseTaskType.setPermitRunEndTime(permitRunEndTime);

baseTaskType.setExecuteNumber(executeNumber);

baseTaskType.setSleepTimeInterval(sleepTimeInterval);

baseTaskType.setSleepTimeNoData(sleepTimeNoData);

baseTaskType.setFetchDataNumber(fetchDataNumber);

this.scheduleManagerFactory.getScheduleDataManager()

.createBaseTaskType(baseTaskType);

String taskName = baseTaskTypeName + "$TEST";

String strategyName = baseTaskTypeName +"-Strategy";

try {

this.scheduleManagerFactory.getScheduleStrategyManager()

.deleteMachineStrategy(strategyName,true);

} catch (Exception e) {

e.printStackTrace();

}

ScheduleStrategy strategy = new ScheduleStrategy();

strategy.setStrategyName(strategyName);

strategy.setKind(ScheduleStrategy.Kind.Schedule);

strategy.setTaskName(taskName);

strategy.setTaskParameter(strategyTaskParameter);

strategy.setNumOfSingleServer(numOfSingleServer);

strategy.setAssignNum(assignNum);

strategy.setIPList(iPLists);

this.scheduleManagerFactory.getScheduleStrategyManager()

.createScheduleStrategy(strategy);

log.info("建立調度任務成功" + strategy.toString());

}

六-2、經過TBSchedule自帶的控制檯來實現任務的配置

 

7、任務的理解

一、同一個jvm中,不一樣線程之間如何防止任務被重複執行?一個scheduleServer的內部線程間如何進行任務分片?

答覆:一、數據分片是在不一樣的jvm,獲知同一個jvm中不一樣的線程組間起做用。在同一個線程組內的10個線程,是經過一個同步的任務隊列來實現的。二、每一個線程從隊列中取任務執行,若是沒有任務了,則由一個線程負責調用selectTasks方法再獲取一批新的任務。

主要是設置休眠時間:即selectTasks方法返回列表的size爲0後,進入休眠。休眠完成以後從新執行該定時任務

二、任務項設置的意義和selectTasks方法的參數含義

答覆:一、 任務項(0,1,2,3,4,5,6,7,8,9)就是任務分片的策略。這個配置就是把數據分紅10片。能夠表示 ID的最後一位,也能夠是一個獨立的字段。根據你的業務來定。

二、 若是隻有1組線程,則全部的任務片都分配給他。這時selectTasks方法的參數:taskItemNum =10, queryCondition由10個元素,分別對應0,1,2,3,4,5,6,7,8,9。

a) 若是隻有2組線程,則任務片被分紅兩份。這時

b) 一個線程組的selectTasks方法的參數:taskItemNum =10, queryCondition有5個元素(0,2,4,6,8)

三、 另一個線程組的selectTasks方法的參數:taskItemNum =10, queryCondition有5個元素(1,3,5,7,9)

四、 若是有10個線程組。則每組線程只會獲取到1個任務片。這時selectTasks方法的參數:taskItemNum =10, queryCondition只有一個元素,對應0到9中的一個。

一、 執行期間和時間的修改功能 a) 在建立任務和修改任務的時候,有兩個屬性(執行開始時間,執行結算時間)用於控制任務的執行時間。 b) 時間格式遵循標準的cron格式 http://dogstar.javaeye.com/blog/116130 還加強了原來不支持的倒數第幾天的能力。 c) 當時間到底開始時間的時候,就開始執行任務,到達結束時間則終止調度(不論是否全部的任務都處理完)。若是沒有設置執行結束時間。則一直運行,直到selectTasks返回的記錄數爲0,就終止執行。等待下個開始運行時間在啓動。 d) 若是要動態修改任務的執行時間區間,則先 點擊「暫停」按鈕,等全部的服務器都中止完畢(大概須要幾秒時間)。當再次單擊任務,出現以下情形表示中止完畢。而後修改執行開始時間,執行結算時間。在恢復任務調度,就能夠實現調度時間的修改 二、 任務處理的問題 a) Schedule主要是提供任務調度的分配管理。每個任務是否執行成功,是經過業務方的bean來實現的。 b) 你需求的例子,我理解的解決方案以下: i. 你從雲梯拉下來100萬數據放到保險應用的數據庫中。這個表中有兩個關鍵字段USER_ID和STS(狀態 0-未發送,1-已發送) ii. 在bean的selectTasks方法的查詢sql中除了根據任務進行分片外,還須要增長狀態條件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在發送完消息後,你還須要 修改數據狀態 update table STS =1 where USER_ID =? 。這樣下次就不會取到這條數據了。 iv. 這樣就能夠保障機器從新啓動後,也不會出現問題。你能夠參考DBDemoSingle.java的實現模式。你使用的接口應該是IScheduleTaskDealSingle。 若是旺旺的接口支持批量發送消息的時候,你才須要使用IScheduleTaskDealMulti接口。

相關文章
相關標籤/搜索