TBSchedule是來自淘寶的分佈式調度開源框架,基於Zookeeper純Java實現,其目的是讓一種批量任務或者不斷變化的任務,可以被動態的分配到多個主機的JVM中的不一樣線程組中並行執行。全部的任務可以被不重複,不遺漏的快速處理。這種框架任務的分配經過分片實現了不重複調度,又經過架構中Leader的選擇,存活的自我保證,完成了可用性和伸縮性的保障。
TBSchedule源碼地址:http://code.taobao.org/p/tbschedule/src/
1.安裝zookeeperhtml
(1)下載zookeeperjava
http://zookeeper.apache.org/releases.htmlnode
下載3.4.11版本:web
http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz算法
(2)解壓至c:/prog/zookeeper/zookeeper-3.4.11spring
複製conf下的zoo_sample.cfg爲zoo.cfg數據庫
修改dataDir爲:apache
dataDir=/prog/zookeeper/data數組
tickTime單位爲毫秒,爲心跳間隔和最小的心跳超時間隔瀏覽器
clientPort是監聽客戶端鏈接的端口,默認爲2181
(3)建立目錄:c:/prog/zookeeper/data
2.啓動zookeeper
運行bin/zkServer.cmd
若是在Linux下,則執行:
[root@192.168.1.5]$ ./zkServer start
3.下載TBSchedule
採用svn來Checkout TBSchedule
svn地址:http://code.taobao.org/svn/tbschedule/
4.在Eclipse中導入項目:
右鍵工程區域(Package Explorer)->Import...->Maven-Existing Maven Projects
注意:TBSchedule編碼爲GBK,但引用TBSchedule的工程編碼爲UTF-8時,此處也要將TBSchedule工程的編碼設置爲UTF-8。
5.安裝Tomcat
(1)下載Tomcat
地址:https://tomcat.apache.org/download-80.cgi#8.5.27
(2)解壓Tomcat 8.5至c:\prog\tomcat\apache-tomcat-8.5.11
6.配置TBSchedule控制檯
(1)將TBSchedule工程中的console\ScheduleConsole.war拷貝至tomcat/webapps中
(2)啓動tomcat
(3)瀏覽器中打開:
http://localhost:8080/ScheduleConsole/schedule/index.jsp?manager=true
點擊保存會提示:
錯誤信息:Zookeeper connecting ......localhost:2181
如配置正確則能夠忽略上述提示,直接進入「管理主頁...」。
7.查看zookeeper中節點
運行zookeeper下的bin/zkClient.cmd
輸入ls /app-schedule/demo,顯示:
[strategy, baseTaskType, factory]
說明已經建立znode成功。
查看TBSchedule控制檯中的「Zookeeper數據」,也能看到相同數據。
8.在項目中使用TBSchedule
Eclipse中新建一個maven工程tbsdemo
GroupId:com.jf
Artifact Id:tbsdemo
9.在pom.xml中引入Spring、TBSchedule、Zookeeper
pom.xml內容爲:
<project xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>
4.0
.
0
</modelVersion>
<groupId>com.jf</groupId>
<artifactId>tbsdemo</artifactId>
<version>
0.0
.
1
-SNAPSHOT</version>
<packaging>jar</packaging>
<name>tbsdemo</name>
<url>http:
//maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-
8
</project.build.sourceEncoding>
<!-- spring版本號 -->
<spring.version>
4.0
.
5
.RELEASE</spring.version>
<!-- mybatis版本號 -->
<mybatis.version>
3.3
.
0
</mybatis.version>
<!-- log4j日誌文件管理包版本 -->
<slf4j.version>
1.7
.
7
</slf4j.version>
<log4j.version>
1.2
.
17
</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>
4.11
</version>
<scope>test</scope>
</dependency>
<!-- spring核心包 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>
3.4
.
11
</version>
</dependency>
<dependency>
<groupId>com.taobao.pamirs.schedule</groupId>
<artifactId>tbschedule</artifactId>
<version>
3.3
.
3.2
</version>
</dependency>
</dependencies>
</project>
|
10.在src/main/resources下建立applicationContext.xml,輸入:
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<beans xmlns=
"http://www.springframework.org/schema/beans"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:p=
"http://www.springframework.org/schema/p"
xsi:schemaLocation="http:
//www.springframework.org/schema/beans
http:
//www.springframework.org/schema/beans/spring-beans-4.0.xsd
http:
//www.springframework.org/schema/context
http:
//www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-
package
=
"com.jf"
/>
<!-- 引入配置文件 -->
<bean id=
"propertyConfigurer"
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<property name=
"locations"
>
<list>
<value>classpath:tbschedule.properties</value>
</list>
</property>
</bean>
<bean id=
"scheduleManagerFactory"
class
=
"com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
init-method=
"init"
>
<property name=
"zkConfig"
>
<map>
<entry key=
"zkConnectString"
value=
"${schedule.zookeeper.address}"
/>
<entry key=
"rootPath"
value=
"${schedule.root.catalog}"
/>
<entry key=
"zkSessionTimeout"
value=
"${schedule.timeout}"
/>
<entry key=
"userName"
value=
"${schedule.username}"
/>
<entry key=
"password"
value=
"${schedule.password}"
/>
<entry key=
"isCheckParentPath"
value=
"true"
/>
</map>
</property>
</bean>
</beans>
|
11.建立TBSchedule配置文件
在src/main/resources/中建立tbschedule.propertie
輸入:
#註冊中心地址
schedule.zookeeper.address=localhost:
2181
#定時任務根目錄,任意指定,調度控制檯配置時對應
schedule.root.catalog=/app-schedule/demo
#帳戶,任意指定,調度控制檯配置時對應
schedule.username=admin
#密碼,任意指定,調度控制檯配置時對應
schedule.password=password
#超時配置
schedule.timeout=
60000
|
注意schedule.username、schedule.password要與TBSchedule控制檯中設置的一致。
12.建立任務數據類TaskModel:
package
com.jf.tbsdemo.pojo;
public
class
TaskModel {
private
long
id;
private
String taskInfo;
public
TaskModel(
long
id, String taskInfo) {
this
.id = id;
this
.taskInfo = taskInfo;
}
public
long
getId() {
return
id;
}
public
void
setId(
long
id) {
this
.id = id;
}
public
String getTaskInfo() {
return
taskInfo;
}
public
void
setTaskInfo(String taskInfo) {
this
.taskInfo = taskInfo;
}
}
|
13.建立任務處理類IScheduleTaskDealSingleTest:
注意:任務處理分單任務和多任務(批處理),分別實現IScheduleTaskDealSingle<T>、IScheduleTaskDealMulti<T>接口,前者的execute()方法參數只有一個任務T,然後者的execute()方法參數爲List<T>,本文使用單任務模式。
package
com.jf.tbsdemo;
import
java.util.ArrayList;
import
java.util.Comparator;
import
java.util.Date;
import
java.util.List;
import
org.apache.log4j.Logger;
import
org.springframework.stereotype.Component;
import
com.jf.tbsdemo.pojo.TaskModel;
import
com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import
com.taobao.pamirs.schedule.TaskItemDefine;
public
class
IScheduleTaskDealSingleTest
implements
IScheduleTaskDealSingle<TaskModel> {
private
static
final
Logger logger = Logger.getLogger(IScheduleTaskDealSingleTest.
class
);
public
Comparator<TaskModel> getComparator() {
return
null
;
}
public
List<TaskModel> selectTasks(String taskParameter, String ownSign,
int
taskQueueNum,
List<TaskItemDefine> taskItemList,
int
eachFetchDataNum)
throws
Exception {
logger.info(
"IScheduleTaskDealSingleTest選擇任務列表開始.........."
);
List<TaskModel> models =
new
ArrayList<TaskModel>();
models.add(
new
TaskModel(
1
,
"task1"
));
models.add(
new
TaskModel(
2
,
"task2"
));
return
models;
}
public
boolean
execute(TaskModel model, String ownSign)
throws
Exception {
logger.info(
"IScheduleTaskDealSingleTest執行開始.........."
+
new
Date());
logger.info(
"任務"
+ model.getId() +
",內容:"
+ model.getTaskInfo());
return
true
;
}
}
|
其中,selectTasks()方法負責取得要處理的任務信息,execute()方法爲處理任務的方法。selectTasks()方法能夠理解爲生產者,execute()方法能夠理解爲消費者。
14.建立主程序類TaskCenter:
package
com.jf.tbsdemo;
import
org.apache.log4j.Logger;
import
org.springframework.context.ApplicationContext;
import
org.springframework.context.support.FileSystemXmlApplicationContext;
public
class
TaskCenter {
private
static
final
Logger logger = Logger.getLogger(TaskCenter.
class
);
public
static
void
main(String[] args)
throws
Exception {
// 初始化Spring
ApplicationContext ctx =
new
FileSystemXmlApplicationContext(
"classpath:applicationContext.xml"
);
logger.info(
"---------------task start------------------"
);
}
}
|
15.在Eclipse中運行主程序類TaskCenter
16.在TBSchedule中建立任務:
(1)進入TBSchedule的控制檯->任務管理
點擊「建立新任務…」
(2)配置任務屬性:
NOTSLEEP模式下線程在任務池中取不到任務時,將當即調用業務接口獲取待處理的任務。
SLEEP模式較爲簡單,由於取任務的線程同一時間只有一個,不易發生衝突,效率也會較低。NOTSLEEP模式開銷較大,也要防止發生重複獲取相同任務。
0,1,2,3,4,5,6,7,8,9
17.在TBSchedule中建立調度策略:
(1)進入TBSchedule的控制檯->調度策略
點擊「建立新策略…」
(2)填寫策略屬性:
注意任務名稱要與新建的任務名稱一致。
(3)點擊建立,將當即啓動調度任務
另外,除了在控制檯中配置調度策略、任務,還能夠經過經過代碼、Spring配置來設置任務調度參數,推薦採用Spring配置方式。
18.代碼方式
建立類TaskCenter:
package
com.jf.tbsdemo;
import
java.util.Properties;
import
javax.annotation.Resource;
import
org.apache.log4j.Logger;
import
org.springframework.context.ApplicationContext;
import
org.springframework.context.support.FileSystemXmlApplicationContext;
import
com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import
com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
import
com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
public
class
TaskCenter {
private
static
final
Logger logger = Logger.getLogger(TaskCenter.
class
);
// 初始化調度工廠
@Resource
TBScheduleManagerFactory scheduleManagerFactory =
new
TBScheduleManagerFactory();
private
void
startTask() {
// 初始化Spring
ApplicationContext ctx =
new
FileSystemXmlApplicationContext(
"classpath:applicationContext.xml"
);
Properties p =
new
Properties();
p.put(
"zkConnectString"
,
"localhost:2181"
);
p.put(
"rootPath"
,
"/app-schedule/demo"
);
p.put(
"zkSessionTimeout"
,
"60000"
);
p.put(
"userName"
,
"admin"
);
p.put(
"password"
,
"password"
);
p.put(
"isCheckParentPath"
,
"true"
);
scheduleManagerFactory.setApplicationContext(ctx);
try
{
scheduleManagerFactory.init(p);
// 建立任務調度任務的基本信息
String baseTaskTypeName =
"DemoTask"
;
ScheduleTaskType baseTaskType =
new
ScheduleTaskType();
baseTaskType.setBaseTaskType(baseTaskTypeName);
baseTaskType.setDealBeanName(
"demoTaskBean"
);
baseTaskType.setHeartBeatRate(
10000
);
baseTaskType.setJudgeDeadInterval(
100000
);
baseTaskType.setTaskParameter(
"AREA=BJ,YEAR>30"
);
baseTaskType.setTaskItems(ScheduleTaskType
.splitTaskItem(
"0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4},"
+
"4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8},"
+
"8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"
));
baseTaskType.setFetchDataNumber(
500
);
baseTaskType.setThreadNumber(
5
);
scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);
logger.info(
"建立調度任務成功:"
+ baseTaskType.toString());
// 建立任務的調度策略
String taskName = baseTaskTypeName;
String strategyName = taskName +
"-Strategy"
;
try
{
scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName,
true
);
}
catch
(Exception e) {
e.printStackTrace();
}
ScheduleStrategy strategy =
new
ScheduleStrategy();
strategy.setStrategyName(strategyName);
strategy.setKind(ScheduleStrategy.Kind.Schedule);
strategy.setTaskName(taskName);
strategy.setTaskParameter(
"china"
);
strategy.setNumOfSingleServer(
1
);
strategy.setAssignNum(
10
);
strategy.setIPList(
"127.0.0.1"
.split(
","
));
scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);
logger.info(
"建立調度策略成功:"
+ strategy.toString());
logger.info(
"---------------task start------------------"
);
}
catch
(Exception e) {
logger.error(
"出現異常"
, e);
}
}
public
static
void
main(String[] args)
throws
Exception {
TaskCenter taskCenter =
new
TaskCenter();
taskCenter.startTask();
}
}
|
19.Spring配置文件方式
(1)增長類AbstractBaseScheduleTask:
package
com.jf.tbsdemo;
import
com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import
com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import
com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
public
abstract
class
AbstractBaseScheduleTask<T>
implements
IScheduleTaskDealSingle<T> {
/**
* 調度任務的配置
*/
private
ScheduleTaskType scheduleTaskType;
/**
* 調度策略的配置
*/
private
ScheduleStrategy scheduleStrategy;
public
ScheduleTaskType getScheduleTaskType() {
return
scheduleTaskType;
}
public
void
setScheduleTaskType(ScheduleTaskType scheduleTaskType) {
this
.scheduleTaskType = scheduleTaskType;
}
public
ScheduleStrategy getScheduleStrategy() {
return
scheduleStrategy;
}
public
void
setScheduleStrategy(ScheduleStrategy scheduleStrategy) {
this
.scheduleStrategy = scheduleStrategy;
}
}
|
(2)修改IScheduleTaskDealSingleTest:
類聲明改成:
public class IScheduleTaskDealSingleTest extends AbstractBaseScheduleTask<TaskModel> {
(3)在applicationContext.xml中對聲明IScheduleTaskDealSingleTest的Bean並注入參數,內容爲:
<?xml version=
"1.0"
encoding=
"UTF-8"
?>
<beans xmlns=
"http://www.springframework.org/schema/beans"
xsi:schemaLocation="http:
//www.springframework.org/schema/beans
http:
//www.springframework.org/schema/beans/spring-beans-4.0.xsd
http:
//www.springframework.org/schema/context
http:
//www.springframework.org/schema/context/spring-context-4.0.xsd">
<context:component-scan base-
package
=
"com.jf"
/>
<!-- 引入配置文件 -->
<bean id=
"propertyConfigurer"
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<property name=
"locations"
>
<list>
<value>classpath:tbschedule.properties</value>
</list>
</property>
</bean>
<!--tbschedule管理器初始化(配置zookeeper,註冊調度任務和調度策略)-->
<bean id=
"systemTBScheduleManagerFactory"
class
=
"com.jf.tbsdemo.SystemTBScheduleManagerFactory"
>
<property name=
"zkConfig"
>
<map>
<entry key=
"zkConnectString"
value=
"${schedule.zookeeper.address}"
/>
<entry key=
"rootPath"
value=
"${schedule.root.catalog}"
/>
<entry key=
"zkSessionTimeout"
value=
"${schedule.timeout}"
/>
<entry key=
"userName"
value=
"${schedule.username}"
/>
<entry key=
"password"
value=
"${schedule.password}"
/>
<entry key=
"isCheckParentPath"
value=
"true"
/>
</map>
</property>
</bean>
<bean name=
"scheduleTaskType"
class
=
"com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType"
>
<!-- 心跳頻率(毫秒) -->
<property name=
"heartBeatRate"
value=
"5000"
/>
<!-- 假定服務死亡間隔(毫秒) -->
<property name=
"judgeDeadInterval"
value=
"60000"
/>
<!-- 處理模式 -->
<property name=
"processorType"
value=
"SLEEP"
/>
<!-- 線程數 -->
<property name=
"threadNumber"
value=
"5"
/>
<!--容許執行的開始時間-->
<property name=
"permitRunStartTime"
value=
""
/>
<!--容許執行的結束時間-->
<property name=
"permitRunEndTime"
value=
""
/>
<!--當沒有數據的時候,休眠的時間-->
<property name=
"sleepTimeNoData"
value=
"3000"
/>
<!--在每次數據處理完後休眠的時間-->
<property name=
"sleepTimeInterval"
value=
"1000"
/>
<!--每次獲取數據的數量-->
<property name=
"fetchDataNumber"
value=
"10"
/>
<!--任務項數組-->
<property name=
"taskItems"
>
<list>
<value>
0
:{TYPE=A,KIND=
1
}</value>
<value>
1
:{TYPE=B,KIND=
2
}</value>
<value>
2
:{TYPE=C,KIND=
3
}</value>
</list>
</property>
</bean>
<bean name=
"scheduleStrategy"
class
=
"com.taobao.pamirs.schedule.strategy.ScheduleStrategy"
>
<!--最大線程組數量-->
<property name=
"assignNum"
value=
"9"
/>
<!--單個機器(JVM)的線程組數量-->
<property name=
"numOfSingleServer"
value=
"3"
/>
<!--策略運行的機器(JVM)IP-->
<property name=
"IPList"
>
<list>
<value>
127.0
.
0.1
</value>
</list>
</property>
</bean>
<!--任務simpleTask-->
<bean id=
"simpleTask"
class
=
"com.jf.tbsdemo.IScheduleTaskDealSingleTest"
>
<property name=
"scheduleTaskType"
ref=
"scheduleTaskType"
/>
<property name=
"scheduleStrategy"
ref=
"scheduleStrategy"
/>
</bean>
</beans>
|
(4)打開控制檯,刪除全部已有的任務、調度策略,再啓動TaskCenter,刷新頁面則可看到當前出現了正在運行的任務和調度策略。
(5)也能夠在控制檯中修改策略,但重啓TaskCenter以後會恢復Spring中的配置信息。
20.數據分片方法
爲了不TBSchedule管理的多線程重複處理數據,須要採用分片,實現方法以下:
(1)在selectTasks()方法中實現分片獲取待處理數據
(2) selectTasks()方法的taskItemList參數爲當前線程分配到的可處理任務分片信息,所有任務分片信息由配置文件中的taskItem定義,每項任務信息爲TaskItemDefine類型,其中taskItemId標明瞭分片ID(即0,1,2),parameter爲自定義參數(即{TYPE=A,KIND=1},{TYPE=B,KIND=2},{TYPE=C,KIND=3})。
(3)根據上面算出的分片ID來取得相應的待處理任務,例如selectTasks()方法從數據庫中獲取待處理的交易請求記錄,能夠將記錄的主鍵或者其餘字段HashCode值的餘數做爲分片ID,在selectTasks()方法中只獲取與taskItemList中指定分片ID相同的任務,避免不一樣線程重複獲取同一任務。
(4)在系統運行過程當中,線程數量會有所變化,所以要在每一個selectTasks()方法執行開始先獲取taskItemList。
(5) 每次執行selectTasks()方法取得記錄條數不要超過eachFetchDataNum
(6)典型的分片代碼實現:
/**
* 根據條件,查詢當前調度服務器可處理的任務
* @param taskParameter 任務的自定義參數
* @param ownSign 當前環境名稱
* @param taskItemNum 當前任務類型的任務隊列數量
* @param taskItemList 當前調度服務器,分配到的可處理隊列
* @param eachFetchDataNum 每次獲取數據的數量
* @return
* @throws Exception
*/
public
List<Date> selectTasks(String taskParameter, String ownSign,
int
taskItemNum, List<TaskItemDefine> taskItemList,
int
eachFetchDataNum)
throws
Exception {
List<Date> dateList =
new
ArrayList<>();
List<Long> taskIdList =
new
ArrayList<>();
for
(TaskItemDefine t : taskItemList){
//肯定當前任務處理器需處理的任務項id
taskIdList.add(Long.valueOf(t.getTaskItemId()));
}
for
(
int
i=
0
;i<eachFetchDataNum;i++){
// 添加最多指定數量的待處理數據
Date date =
new
Date();
//生成待處理數據
Long remainder = date.getTime() % taskItemNum ;
if
(taskIdList.contains(remainder)){
//根據數據取模,判斷當前待處理數據,是否應由當前任務處理器處理
dateList.add(date);
}
TimeUnit.SECONDS.sleep(
1
);
}
return
dateList;
//返回當前任務處理器須要處理的數據
}
|
21.參數說明
(1)zookeeper參數
zkConnectString:zookeeper註冊中心地址
rootPath:定時任務根目錄,任意指定,調度控制檯配置時對應
zkSessionTimeout:超時時間
userName:帳戶,任意指定,調度控制檯配置時對應
password:密碼,任意指定,調度控制檯配置時對應
isCheckParentPath:設置爲true會檢查上級目錄是否已經被用做TBSchedule調度,若是是則啓動任務失敗。
(2)任務參數:
heartBeatRate:心跳頻率(毫秒)
judgeDeadInterval:假定服務死亡間隔(毫秒)
sleepTimeNoData: 當沒有數據的時候,休眠的時間
sleepTimeInterval:在每次數據處理完後休眠的時間
processorType:處理模式,可爲SLEEP或NOTSLEEP。
permitRunStartTime:執行開始時間若是爲空則不定時,直接執行。
permitRunEndTime:執行結束時間,與執行開始時間之間的時間才能夠執行任務。
taskItems: 任務項數組,例如:0:{TYPE=A,KIND=1},1:{TYPE=B,KIND=2},2:{TYPE=C,KIND=3}
在調度過程當中,某線程分得了獲取數據的任務,假設獲取第1項任務,則在selectTasks()方法的taskItemList參數中包含第1項任務的信息,TaskItemDefine類型,包含:taskItemId、parameter成員變量,分別爲:一、{TYPE=B,KIND=2}。可根據該信息取得相應的數據。
fetchDataNumber:selectTasks()方法每次獲取數據的數量
executeNumber:每次執行數量,即execute()方法每次取得的任務數量,只在bean實現IScheduleTaskDealMulti才生效。
threadNumber:每一個線程組中的線程數
maxTaskItemsOfOneThreadGroup:每一組線程能分配的最大任務數量,避免在隨着機器的減小把正常的服務器壓死,0或者空表示不限制
taskParameter:任務的自定義參數,可做爲selectTasks中的參數傳入。
(3)調度策略參數:
strategyName:策略名稱,必須填寫,不能有中文和特殊字符。
kind:任務類型,Schedule,Java,Bean 大小寫敏感。
taskName:要配置調度策略的任務名稱,與這一任務配置的名稱要一致。
taskParameter:任務參數,逗號分隔的Key-Value。對任務類型爲Java、Bean的有效,對任務類型爲Schedule的無效,須要經過任務管理來配置。
assignNum:最大線程組數量,是全部機器(JVM)總共運行的線程組的最大數量。
numOfSingleServer單個機器(JVM)的線程組數量,若是是0,則表示無限制。
IPList:策略運行的機器(JVM)IP列表,127.0.0.1或者localhost會在全部機器上運行。
當在控制檯點擊中止任務的按鈕時。會將任務池中未處理的任務清除,而中止前的在處理的任務將繼續執行。
也能夠只設置permitRunStartTime,將permitRunEndTime設置爲空或者-1。
fetchDataNumber >= threadNumber * 最少循環次數10,不然TBSchedule日誌會提示:參數設置不合理,系統性能不佳。
10.理論上單臺機器最大線程數爲:
線程數threadNumber*單個機器的線程組數量numOfSingleServer,而numOfSingleServer並非上限,僅有1臺機器時,該機器的線程組數量能達到assignNum。
11.TBSchedule給各機器以線程組爲單位進行分配,全部機器的線程組總數不會超過最大線程組數量assignNum。
12.通常來講在selectTasks()中獲取任務,而後在execute()方法中處理,在SLEEP處理模式下,最後一個活動線程纔會去獲取任務,所以不會出現重複執行任務的狀況。但若是在selectTasks()或execute()中再新建線程或線程池來處理任務,會出現新建線程未處理完成,但TBSchedule框架認爲已處理結束從而進行下一次獲取任務的操做,可能會重複取出正在處理的任務,所以應儘可能避免新建線程和線程池。
13.在selectTasks()中獲取到任務後或者在execute()中處理完任務後應更改狀態,防止下次再次取到,形成重複處理。
14.在SLEEP處理模式下,配置的分片數量應合理,分片較多則同一線程組分配過多分片,對不一樣分片分別查詢獲取任務則效率會下降,而分片較少則不利於擴展機器。
15.在SLEEP處理模式下,同一時間只會有一個線程執行selectTasks(),其餘線程均處於休眠狀態,所以不宜在selectTasks()中進行過多操做而讓其餘線程等待時間過長,處理工做應儘可能在execute()中進行。或者採用NOTSLEEP模式,讓多個線程能夠同時運行selectTasks()獲取不一樣分片的任務。
NOTSLEEP模式須要實現getComparator(),防止從任務池中取出的某項任務正在被本進程中的其餘線程處理。原理是在取任務前先取得正在運行的任務放入maybeRepeatTaskList中,取得任務放入任務池後,再與maybeRepeatTaskList中的每項任務對比。同時取任務時加鎖保證只有一個線程在取任務。
只有在NotSleep模式下getComparator()纔有效,在Sleep模式下無效。
執行getComparator()時會遍歷正在處理的任務池。
16.複雜任務能夠拆分紅多項子任務,並配置不一樣的策略,爲操做最複雜的子任務分配較多線程,從而提升整體的處理效率。
若是不進行拆分,則會有單個線程處理時間較長,併發的線程數較少,處理時間長短不一, 且任務分配不均勻等問題。例如任務爲:從FTP中取得不一樣大小的文件進行解析,將每行數據寫入分庫中。
若是在selectTasks中取得的每一個任務對應一個文件,在execute()中處理任務時(解析文件併入庫),效率會很是低。可對解析文件的任務作改造:
改造方案1:在execute()中解析文件後入庫時採用線程池處理。但這樣仍不能解決任務分配不勻的問題,且引入線程池會增長線程數量。尤爲是會形成框架錯誤判斷任務已結束,致使重複處理,所以本方案不合理。
改造方案2:將任務拆分爲兩個子任務,文件解析和入分庫。
子任務1:在原execute()中對文件解析後不直接入分庫,而是取1000條合成1條記錄存入本地單庫的中間表,解析文件耗時較短且記錄數較少能夠較快完成,且時間不都可以忽略。
子任務2:對中間表記錄按照自增主鍵ID分片,selectTasks()中取得記錄,而後拆分紅原始單條記錄返回,在execute()中對單條記錄進行入庫處理。
改造方案2的線程數較少,且任務分配會比較均勻,同時避免了單線程處理一個大任務等待時間過長的問題。
17.Zookeeper存儲的數據:機器、策略定義、任務定義、任務分片(包含當前屬於哪一個機器處理)
18.在zookeeper中每臺機器都可保存不一樣的線程數等配置,說明不一樣機器可使用不一樣的線程數等配置,但不建議使用不一樣配置。
19.在多任務模式下,executeNumber不要設置的太大,較小一些能夠減小等待最後一個活躍線程的時間,而且若是fetchDataNumber<線程數*executeNumber,則會有線程沒法分得任務。任務分配在本進程中進行,並不會請求zookeeper,所以設置的較小一些效率更高。
20.當須要重啓應用時,要在控制檯上先把機器所有中止,等待線程組消失,不然直接重啓應用時會出現新的機器實例,舊的機器實例未能釋放分片,致使新的機器獲取不到任務分片沒法執行,控制檯上會顯示新、舊線程組均爲紅色。
21.使用同一zookeeper目錄的多臺機器中,先啓動的機器通常爲leader,負責分片的分配。
22.控制檯顯示某線程組紅色異常,長時間未取數時,多是取任務的selectTasks()運行異常,或者每次取的任務數量過大,致使長時間未會處理完,能夠適當調小eachFetchDataNum。
也有多是由於在SLEEP模式下任務處理時間過長。
23.分片按線程組進行分配,同一機器中有多個線程組時,該機器分得多個分片,也會均勻分配給線程組,每一個線程組各自獨立取任務調度,不會同時取任務。
24.當加入新機器時,會請求得到分片。框架10秒掃描一次,若是發現機器數量有變化,且佔用分片較多的機器完成任務則會自動從新分配分片。
25.若是每次從數據庫裏取待處理記錄生成任務時,若是總記錄數較多,即便取到的有效記錄數較少,則掃描整張表花費時間較長,除了創建必要的索引,也應該減小無數據時掃描頻次,即下降sleepTimeInterval,也可在selectTasks()中在取到記錄後檢查數量,若是較少則sleep一段時間再返回任務,也應加大sleepTimeNoData。
26.若是任務處理結束後還要合併結果再進入下一輪處理,則最慢的機器會減慢總體速度,所以要儘可能保證任務分片均勻分給不一樣機器,分片數量要能被機器數量整除,也能被最大線程組數量assignNum整除,這樣每臺機器處理的任務數量大體相同。
27.在Zookeeper鏈接配置中保存時提示:
錯誤信息:Zookeeper connecting ......localhost:2181
同時沒法進入其餘頁面,可能因爲採用不一樣的用戶名密碼配置過同一目錄造zookeeper數據異常,能夠在zookeeper中手動刪除目錄數據,或者更換新目錄後重啓應用。
在zookeeper中刪除目錄方法:
[root@192.168.1.5]$ ./zkClient.sh
[zk: localhost:2181(CONNECTED) 0] addauth digest admin:password
[zk: localhost:2181(CONNECTED) 1] rmr /app-schedule/demo
在控制檯沒法修改目錄的帳戶密碼,可在zookeeper客戶端中刪除目錄後重建目錄及帳戶密碼。
28.每位用戶登陸控制檯後打開的配置信息均保存在bin目錄下的pamirsScheduleConfig.properties,所以在同一Tomcat下操做不一樣的TBSchedule目錄時會衝突,已修改TBSchedule的代碼解決了這一問題。
29.selectTasks()方法從數據庫中取得記錄時,能夠在select語句中對某字段進行mod取餘,這樣只獲取本線程組所分配的分片。通常有多個分庫時,同時也會採用mycat,主鍵ID沒法採用自增,經常使用雪花算法來生成不重複的ID,但對這種ID取模通常不容易均勻,所以可增長建立時間戳字段來用於取模,通常各機器取得的任務數較爲均勻。
30.若是使用zookeeper集羣,則在tbschedule.properties中配置schedule.zookeeper.address時,格式以下:
IP1:Port1,IP2:Port2,IP3:Port3
31.TBSchedule沒法實現任務均衡的轉移,即當一臺機器處理任務較多,其餘機器較閒時,不會轉到其餘機器。
32.若是使用數據庫鏈接池,則單個機器中的線程數量不要比鏈接池數量大太多,或者不高於,以防出現線程獲取不到數據庫鏈接的狀況出現。
33.Sleep模式在實現邏輯上相對簡單清晰,但存在一個大任務處理時間長,致使其它線程不工做的狀況。
在NotSleep模式下,減小了線程休眠的時間,避免大任務阻塞的狀況,但爲了不數據被重複處理,增長了CPU在數據比較上的開銷。
同時要求業務接口實現對象的比較接口。
若是對任務處理不容許停頓的狀況下建議用NotSleep模式,其它狀況建議用Sleep模式。
34.主機編號最小的爲Leader,若是是Leader第一次啓動則會清除全部垃圾數據。
35.若是任務是輪詢類型,可將permitRunStartTime、permitRunEndTime均設置爲空,將一直運行,可設置sleepTimeNoData、sleepTimeInterval來sleep。
若是要設置在必定時間作內輪詢,則能夠同時設置permitRunStartTime、permitRunEndTime,在這一時間段內會執行selectTasks()及execute()。
在到達結束時間時,會將任務池清空,並設置中止運行標誌,此時將沒法再啓動新的線程運行execute(),所以若是selectTasks()運行時間略長於permitRunEndTime-permitRunStartTime,則execute()可能會永遠都沒法被執行到。
例如:permitRunStartTime設置爲:0/10 * * * * ?
permitRunEndTime設置爲:5/10 * * * * ?
而selectTasks()執行時間爲6秒,則在第6秒時execute()沒有機會被執行。
所以對於輪詢任務,最好將permitRunStartTime、permitRunEndTime均設置爲空.
將permitRunEndTime設置爲-1與爲空做用一致。
36.若是任務是定時任務,則能夠只設置permitRunStartTime,而將permitRunEndTime設置爲空或-1,這樣在selectTasks()取得任務爲空時會sleep(),直到下一個開始時間時纔會執行。
例如:permitRunStartTime設置爲:0/10 * * * * ?
permitRunEndTime設置爲:-1
則在每10秒的第0秒開始執行selectTasks()取任務,若是取到任務則會交給其餘線程執行execute(),若是未取到則會sleep(),直到下一個開始時間時纔會執行。
若是隻但願同一時間僅有一個線程處理任務,則能夠只設置一個分片,並採用SLEEP模式,numOfSingleServer、assignNum均設置爲1。
37.每一個心跳週期都會向zookeeper更新心跳信息,若是超過judgeDeadInterval(假定服務死亡間隔)未更新過,則清除zookeeper上的任務信息及Server信息。每一個心跳週期也會從新分配分片。
也會清除zookeeper中分配給已消失機器上的任務信息。
38.若是有定時任務執行出現故障,或者因重啓錯過了執行時間,若是要在下一次時間點前再次執行,則能夠在控制檯上臨時增長任務類型、策略,來臨時定時執行一次,月日也加上防止忘記刪除任務致使屢次重複執行。執行完成後再刪除該任務類型、策略。
39.有時應用啓動後日志顯示正常,但不執行任務,有多是zookeeper中數據出現錯誤,可刪除該目錄,重啓應用便可。
40.在控制檯上點擊機器的中止按鈕時,會將zookeeper中該機器的運行狀態設置爲false,並清除本機器的任務池中未被處理的任務。在每臺機器進程中每2秒刷新一次運行狀態,當檢測到false,則在任務執行完畢後再也不取任務處理。
41.SystemTBScheduleManagerFactory也可取消,改用@Bean註解,例如:
ScheduleJobConfiguration.java:
package
com.jfbank.schedule.monitor.alarm.tbs;
import
java.util.HashMap;
import
java.util.Map;
import
org.springframework.beans.factory.annotation.Value;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
@Configuration
public
class
ScheduleJobConfiguration{
@Bean
(initMethod =
"init"
)
public
TBScheduleManagerFactory tbScheduleManagerFactory(
@Value
(
"${schedule.zookeeper.address}"
)String zkConnectString,
@Value
(
"${schedule.root.catalog}"
)String rootPath,
@Value
(
"${schedule.timeout}"
)String zkSessionTimeout,
@Value
(
"${schedule.username}"
)String userName,
@Value
(
"${schedule.password}"
)String password,
@Value
(
"${schedule.isCheckParentPath}"
)String isCheckParentPath) {
TBScheduleManagerFactory tbScheduleManagerFactory =
new
TBScheduleManagerFactory();
Map<String, String> zkConfig =
new
HashMap<String, String>();
zkConfig.put(
"zkConnectString"
, zkConnectString);
zkConfig.put(
"rootPath"
, rootPath);
zkConfig.put(
"zkSessionTimeout"
, zkSessionTimeout);
zkConfig.put(
"userName"
, userName);
zkConfig.put(
"password"
, password);
zkConfig.put(
"isCheckParentPath"
, isCheckParentPath);
tbScheduleManagerFactory.setZkConfig(zkConfig);
return
tbScheduleManagerFactory;
}
}
|