1、下載TBSchedule 源碼java
http://code.taobao.org/svn/tbschedule/sql
2、編譯TBSchedule源碼造成jar包數據庫
mvn package/直接mvn deploye/install 一步完成服務器
3、安裝到本地的maven倉庫session
mvn deploy/installjvm
4、在項目pom配置文件中引用這個依賴文件maven
<dependency>ide
<groupId>com.taobao.pamirs.schedule</groupId>svn
<artifactId>tbschedule</artifactId>fetch
<version>3.3.3.2</version>
</dependency>
5、配置TBSchedule依賴的zookeeper配置,TBSchedule的調度依賴於zookeeper的節點配置和心跳連接。
# zooker service address
schedule.zookeeper.address=120.25.87.176:2181
# root path
schedule.root.path=/bbb/dd
# session timeout
schedule.session.timeout=60000
# userName
schedule.zookeeper.username=ScheduleAdmin
# password
schedule.zookeeper.password=password
<context:property-placeholder location="classpath:schedule.properties" ignore-resource-not-found="true" ignore-unresolvable="true"/>
<bean id="scheduleManagerFactory" class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method="init">
<property name="zkConfig">
<map>
<entry key="zkConnectString" value="${schedule.zookeeper.address}" />
<entry key="rootPath" value="${schedule.root.path}" />//在控制檯鏈接zookeeper的路徑要和該配置的路徑一致,因爲配置時默認要檢查父節點是否爲空,因此最好先檢查該路徑是否存在。
<entry key="zkSessionTimeout" value="${schedule.session.timeout}" />
<entry key="userName" value="${schedule.zookeeper.username} 必須和控制檯的用戶名同樣" />
<entry key="password" value="${schedule.zookeeper.password} 必須和" />
<entry key="isCheckParentPath" value="true" />
</map>
</property>
</bean>
6、配置任務項和任務策略,將任務分片,查詢任務
六-1、經過代碼實現任務的分配
private Logger log = LoggerFactory
.getLogger(getClass());
TBScheduleManagerFactory scheduleManagerFactory;
/**
* 心跳鏈接時間
*/
private int heartBeatRate = 5*1000;
private int judgeDeadInterval = 1*60*1000;
private String taskParameter="";
private String strategyTaskParameter="";
/**
* jvm最大單線程數量
*/
private int numOfSingleServer = 2;
/**
* 最大線程組數量 總 最大線程數量 = jvm最大單線程數量 * 最大的線程組總量
* 將任務列表分紅幾個隊列來處理
*/
private int assignNum = 2;
private String[] iPLists={"127.0.0.1"};
private String[] taskItems={"0","1","2","3","4","5","6","7","8","9"};
private String[] baseTaskTypeNames;
private String[] dealBeanNames;
/**
* 開始時間
*/
private String permitRunStartTime;
/**
* 結束時間
*/
private String permitRunEndTime;
/**
* 批處理時 每次處理任務的數量
*/
private int executeNumber = 1;
/**
* 處理完一批數據後的休息時間
*/
private int sleepTimeInterval = 0;
/**
* 採集不到數據的休眠時間
*/
private int sleepTimeNoData = 500;
/**
* 每次採集任務的數量
*/
private int fetchDataNumber = 500;
@Autowired
public void setScheduleManagerFactory(
TBScheduleManagerFactory tbScheduleManagerFactory) {
this.scheduleManagerFactory = tbScheduleManagerFactory;
}
public void initialConfigData() throws Exception {
}
@Override
public void afterPropertiesSet() throws Exception {
if(baseTaskTypeNames.length != dealBeanNames.length) {
throw new RuntimeException("task definition error, baseTaskTypeNames length not equals dealBeanNames length");
}
String baseTaskTypeName;
String dealBeanName;
for (int i = 0; i < baseTaskTypeNames.length; i++) {
baseTaskTypeName = baseTaskTypeNames[i];
dealBeanName= dealBeanNames[i];
configTasks(baseTaskTypeName, dealBeanName);
}
}
private void configTasks(String baseTaskTypeName, String dealBeanName)
throws Exception, InterruptedException {
while(this.scheduleManagerFactory.isZookeeperInitialSucess() == false){
Thread.sleep(1000);
}
scheduleManagerFactory.stopServer(null);
Thread.sleep(1000);
try {
this.scheduleManagerFactory.getScheduleDataManager()
.deleteTaskType(baseTaskTypeName);
} catch (Exception e) {
}
ScheduleTaskType baseTaskType = new ScheduleTaskType();
baseTaskType.setBaseTaskType(baseTaskTypeName);
baseTaskType.setDealBeanName(dealBeanName);
baseTaskType.setHeartBeatRate(heartBeatRate);
baseTaskType.setJudgeDeadInterval(judgeDeadInterval);
baseTaskType.setTaskParameter(taskParameter);
baseTaskType.setTaskItems(taskItems);
baseTaskType.setPermitRunStartTime(permitRunStartTime);
baseTaskType.setPermitRunEndTime(permitRunEndTime);
baseTaskType.setExecuteNumber(executeNumber);
baseTaskType.setSleepTimeInterval(sleepTimeInterval);
baseTaskType.setSleepTimeNoData(sleepTimeNoData);
baseTaskType.setFetchDataNumber(fetchDataNumber);
this.scheduleManagerFactory.getScheduleDataManager()
.createBaseTaskType(baseTaskType);
String taskName = baseTaskTypeName + "$TEST";
String strategyName = baseTaskTypeName +"-Strategy";
try {
this.scheduleManagerFactory.getScheduleStrategyManager()
.deleteMachineStrategy(strategyName,true);
} catch (Exception e) {
e.printStackTrace();
}
ScheduleStrategy strategy = new ScheduleStrategy();
strategy.setStrategyName(strategyName);
strategy.setKind(ScheduleStrategy.Kind.Schedule);
strategy.setTaskName(taskName);
strategy.setTaskParameter(strategyTaskParameter);
strategy.setNumOfSingleServer(numOfSingleServer);
strategy.setAssignNum(assignNum);
strategy.setIPList(iPLists);
this.scheduleManagerFactory.getScheduleStrategyManager()
.createScheduleStrategy(strategy);
log.info("建立調度任務成功" + strategy.toString());
}
六-2、經過TBSchedule自帶的控制檯來實現任務的配置
7、任務的理解
一、同一個jvm中,不一樣線程之間如何防止任務被重複執行?一個scheduleServer的內部線程間如何進行任務分片?
答覆:一、數據分片是在不一樣的jvm,獲知同一個jvm中不一樣的線程組間起做用。在同一個線程組內的10個線程,是經過一個同步的任務隊列來實現的。二、每一個線程從隊列中取任務執行,若是沒有任務了,則由一個線程負責調用selectTasks方法再獲取一批新的任務。
主要是設置休眠時間:即selectTasks方法返回列表的size爲0後,進入休眠。休眠完成以後從新執行該定時任務
二、任務項設置的意義和selectTasks方法的參數含義
答覆:一、 任務項(0,1,2,3,4,5,6,7,8,9)就是任務分片的策略。這個配置就是把數據分紅10片。能夠表示 ID的最後一位,也能夠是一個獨立的字段。根據你的業務來定。
二、 若是隻有1組線程,則全部的任務片都分配給他。這時selectTasks方法的參數:taskItemNum =10, queryCondition由10個元素,分別對應0,1,2,3,4,5,6,7,8,9。
a) 若是隻有2組線程,則任務片被分紅兩份。這時
b) 一個線程組的selectTasks方法的參數:taskItemNum =10, queryCondition有5個元素(0,2,4,6,8)
三、 另一個線程組的selectTasks方法的參數:taskItemNum =10, queryCondition有5個元素(1,3,5,7,9)
四、 若是有10個線程組。則每組線程只會獲取到1個任務片。這時selectTasks方法的參數:taskItemNum =10, queryCondition只有一個元素,對應0到9中的一個。
一、 執行期間和時間的修改功能 a) 在建立任務和修改任務的時候,有兩個屬性(執行開始時間,執行結算時間)用於控制任務的執行時間。 b) 時間格式遵循標準的cron格式 http://dogstar.javaeye.com/blog/116130 還加強了原來不支持的倒數第幾天的能力。 c) 當時間到底開始時間的時候,就開始執行任務,到達結束時間則終止調度(不論是否全部的任務都處理完)。若是沒有設置執行結束時間。則一直運行,直到selectTasks返回的記錄數爲0,就終止執行。等待下個開始運行時間在啓動。 d) 若是要動態修改任務的執行時間區間,則先 點擊「暫停」按鈕,等全部的服務器都中止完畢(大概須要幾秒時間)。當再次單擊任務,出現以下情形表示中止完畢。而後修改執行開始時間,執行結算時間。在恢復任務調度,就能夠實現調度時間的修改 二、 任務處理的問題 a) Schedule主要是提供任務調度的分配管理。每個任務是否執行成功,是經過業務方的bean來實現的。 b) 你需求的例子,我理解的解決方案以下: i. 你從雲梯拉下來100萬數據放到保險應用的數據庫中。這個表中有兩個關鍵字段USER_ID和STS(狀態 0-未發送,1-已發送) ii. 在bean的selectTasks方法的查詢sql中除了根據任務進行分片外,還須要增長狀態條件。例如 USER_ID % 10 in( ?,?,?) AND sts =0 iii. 在bean的execute方法中,在發送完消息後,你還須要 修改數據狀態 update table STS =1 where USER_ID =? 。這樣下次就不會取到這條數據了。 iv. 這樣就能夠保障機器從新啓動後,也不會出現問題。你能夠參考DBDemoSingle.java的實現模式。你使用的接口應該是IScheduleTaskDealSingle。 若是旺旺的接口支持批量發送消息的時候,你才須要使用IScheduleTaskDealMulti接口。