分佈式調度框架TBSchedule使用方法

1、TBSchedule簡介

TBSchedule是來自淘寶的分佈式調度開源框架,基於Zookeeper純Java實現,其目的是讓一種批量任務或者不斷變化的任務,可以被動態的分配到多個主機的JVM中的不一樣線程組中並行執行。全部的任務可以被不重複,不遺漏的快速處理。這種框架任務的分配經過分片實現了不重複調度,又經過架構中Leader的選擇,存活的自我保證,完成了可用性和伸縮性的保障。
TBSchedule源碼地址:http://code.taobao.org/p/tbschedule/src/

2、開發環境

  1. WIN10,也可換爲Linux
  2. JDK 1.7
  3. Tomcat 8.5
  4. 安裝zookeeper

3、配置步驟

1.安裝zookeeperhtml

(1)下載zookeeperjava

http://zookeeper.apache.org/releases.htmlnode

下載3.4.11版本:web

http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz算法

 

(2)解壓至c:/prog/zookeeper/zookeeper-3.4.11spring

複製conf下的zoo_sample.cfg爲zoo.cfg數據庫

修改dataDir爲:apache

dataDir=/prog/zookeeper/data數組

 

tickTime單位爲毫秒,爲心跳間隔和最小的心跳超時間隔瀏覽器

clientPort是監聽客戶端鏈接的端口,默認爲2181

 

(3)建立目錄:c:/prog/zookeeper/data

 

2.啓動zookeeper

運行bin/zkServer.cmd

若是在Linux下,則執行:

[root@192.168.1.5]$ ./zkServer start

 

3.下載TBSchedule

採用svn來Checkout TBSchedule

svn地址:http://code.taobao.org/svn/tbschedule/

 

4.在Eclipse中導入項目:

右鍵工程區域(Package Explorer)->Import...->Maven-Existing Maven Projects

 

注意:TBSchedule編碼爲GBK,但引用TBSchedule的工程編碼爲UTF-8時,此處也要將TBSchedule工程的編碼設置爲UTF-8。

 

5.安裝Tomcat

(1)下載Tomcat

地址:https://tomcat.apache.org/download-80.cgi#8.5.27

 

(2)解壓Tomcat 8.5至c:\prog\tomcat\apache-tomcat-8.5.11

 

6.配置TBSchedule控制檯

(1)將TBSchedule工程中的console\ScheduleConsole.war拷貝至tomcat/webapps中

 

(2)啓動tomcat

 

(3)瀏覽器中打開:

http://localhost:8080/ScheduleConsole/schedule/index.jsp?manager=true

點擊保存會提示:

錯誤信息:Zookeeper connecting ......localhost:2181

 

如配置正確則能夠忽略上述提示,直接進入「管理主頁...」。

 

7.查看zookeeper中節點

運行zookeeper下的bin/zkClient.cmd

輸入ls /app-schedule/demo,顯示:

[strategy, baseTaskType, factory]

 

說明已經建立znode成功。

 

查看TBSchedule控制檯中的「Zookeeper數據」,也能看到相同數據。

 

8.在項目中使用TBSchedule

Eclipse中新建一個maven工程tbsdemo

GroupId:com.jf

Artifact Id:tbsdemo

 

9.在pom.xml中引入Spring、TBSchedule、Zookeeper

pom.xml內容爲:

 

   <modelVersion> 4.0 . 0 </modelVersion>
  
   <groupId>com.jf</groupId>
   <artifactId>tbsdemo</artifactId>
   <version> 0.0 . 1 -SNAPSHOT</version>
   <packaging>jar</packaging>
  
   <name>tbsdemo</name>
   <url>http: //maven.apache.org</url>
  
   <properties>
     <project.build.sourceEncoding>UTF- 8 </project.build.sourceEncoding>
<!-- spring版本號 -->
     <spring.version> 4.0 . 5 .RELEASE</spring.version>
<!-- mybatis版本號 -->
     <mybatis.version> 3.3 . 0 </mybatis.version>
<!-- log4j日誌文件管理包版本 -->
     <slf4j.version> 1.7 . 7 </slf4j.version>
     <log4j.version> 1.2 . 17 </log4j.version>
   </properties>
  
   <dependencies>
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <version> 4.11 </version>
       <scope>test</scope>
     </dependency>
     <!-- spring核心包 -->
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
     </dependency>
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
     </dependency>
     <dependency>
        <groupId>org.springframework</groupId>
         <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
     </dependency>
     <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
     </dependency>
     <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version> 3.4 . 11 </version>
     </dependency>
     <dependency>
        <groupId>com.taobao.pamirs.schedule</groupId>
         <artifactId>tbschedule</artifactId>
         <version> 3.3 . 3.2 </version>
     </dependency>
   </dependencies>
</project>

 

 

10.在src/main/resources下建立applicationContext.xml,輸入:

 

<?xml version= "1.0"  encoding= "UTF-8" ?>
     xsi:schemaLocation="http: //www.springframework.org/schema/beans 
                         http: //www.springframework.org/schema/beans/spring-beans-4.0.xsd 
                         http: //www.springframework.org/schema/context 
                         http: //www.springframework.org/schema/context/spring-context-4.0.xsd">
  
     <context:component-scan base- package = "com.jf"  />
     <!-- 引入配置文件 -->
     <bean id= "propertyConfigurer"
        class = "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
     <property name= "locations" >
            <list>
               <value>classpath:tbschedule.properties</value>
            </list>
        </property>
     </bean>
     <bean id= "scheduleManagerFactory"     class = "com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
        init-method= "init" >
        <property name= "zkConfig" >
            <map>
               <entry key= "zkConnectString"  value= "${schedule.zookeeper.address}"  />
               <entry key= "rootPath"  value= "${schedule.root.catalog}"  />
               <entry key= "zkSessionTimeout"  value= "${schedule.timeout}"  />
               <entry key= "userName"  value= "${schedule.username}"  />
               <entry key= "password"  value= "${schedule.password}"  />
               <entry key= "isCheckParentPath"  value= "true"  />
            </map>
        </property>
     </bean>
</beans>

 

 

11.建立TBSchedule配置文件

在src/main/resources/中建立tbschedule.propertie

輸入:

 

#註冊中心地址
schedule.zookeeper.address=localhost: 2181
#定時任務根目錄,任意指定,調度控制檯配置時對應
schedule.root.catalog=/app-schedule/demo
#帳戶,任意指定,調度控制檯配置時對應
schedule.username=admin
#密碼,任意指定,調度控制檯配置時對應
schedule.password=password
#超時配置
schedule.timeout= 60000

 

 

注意schedule.username、schedule.password要與TBSchedule控制檯中設置的一致。

 

12.建立任務數據類TaskModel:

 

package  com.jf.tbsdemo.pojo;
  
public  class  TaskModel {
     private  long  id;
     private  String taskInfo;
     public  TaskModel( long  id, String taskInfo) {
        this .id = id;
        this .taskInfo = taskInfo;
     }
     public  long  getId() {
        return  id;
     }
     public  void  setId( long  id) {
        this .id = id;
     }
     public  String getTaskInfo() {
         return  taskInfo;
     }
     public  void  setTaskInfo(String taskInfo) {
        this .taskInfo = taskInfo;
     }
}

 

 

13.建立任務處理類IScheduleTaskDealSingleTest:

注意:任務處理分單任務和多任務(批處理),分別實現IScheduleTaskDealSingle<T>、IScheduleTaskDealMulti<T>接口,前者的execute()方法參數只有一個任務T,然後者的execute()方法參數爲List<T>,本文使用單任務模式。

 

 

package  com.jf.tbsdemo;
  
import  java.util.ArrayList;
import  java.util.Comparator;
import  java.util.Date;
import  java.util.List;
  
import  org.apache.log4j.Logger;
import  org.springframework.stereotype.Component;
  
import  com.jf.tbsdemo.pojo.TaskModel;
import  com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import  com.taobao.pamirs.schedule.TaskItemDefine;
  
public  class  IScheduleTaskDealSingleTest  implements  IScheduleTaskDealSingle<TaskModel> {
     private  static  final  Logger logger = Logger.getLogger(IScheduleTaskDealSingleTest. class );
  
     public  Comparator<TaskModel> getComparator() {
         return  null ;
     }
  
     public  List<TaskModel> selectTasks(String taskParameter, String ownSign,  int  taskQueueNum,
             List<TaskItemDefine> taskItemList,  int  eachFetchDataNum)  throws  Exception {
  
         logger.info( "IScheduleTaskDealSingleTest選擇任務列表開始.........." );
         List<TaskModel> models =  new  ArrayList<TaskModel>();
         models.add( new  TaskModel( 1 "task1" ));
         models.add( new  TaskModel( 2 "task2" ));
  
         return  models;
     }
  
     public  boolean  execute(TaskModel model, String ownSign)  throws  Exception {
         logger.info( "IScheduleTaskDealSingleTest執行開始.........."  new  Date());
         logger.info( "任務"  + model.getId() +  ",內容:" + model.getTaskInfo());
         return  true ;
     }
}

 

 

其中,selectTasks()方法負責取得要處理的任務信息,execute()方法爲處理任務的方法。selectTasks()方法能夠理解爲生產者,execute()方法能夠理解爲消費者。

 

14.建立主程序類TaskCenter:

 

package  com.jf.tbsdemo;
  
import  org.apache.log4j.Logger;
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.FileSystemXmlApplicationContext;
  
public  class  TaskCenter {
     private  static  final  Logger logger = Logger.getLogger(TaskCenter. class );
  
     public  static  void  main(String[] args)  throws  Exception {
        // 初始化Spring
        ApplicationContext ctx =  new  FileSystemXmlApplicationContext( "classpath:applicationContext.xml" );
        logger.info( "---------------task start------------------" );
     }
}

 

 

15.在Eclipse中運行主程序類TaskCenter

 

16.在TBSchedule中建立任務:

(1)進入TBSchedule的控制檯->任務管理

點擊「建立新任務…」

 

(2)配置任務屬性:

  • 在任務處理的SpringBean中輸入:iScheduleTaskDealSingleTest
  • 處理模式分爲:SLEEP、NOTSLEEP,其中SLEEP模式是指當一個線程處理完任務後在任務池中取不到其餘任務時,會檢查其餘線程是否活動,若是是則本身休眠,不然說明本身是最後一位,則調用業務接口取得待處理的任務放入任務池,並喚醒其餘線程處理。

NOTSLEEP模式下線程在任務池中取不到任務時,將當即調用業務接口獲取待處理的任務。

SLEEP模式較爲簡單,由於取任務的線程同一時間只有一個,不易發生衝突,效率也會較低。NOTSLEEP模式開銷較大,也要防止發生重複獲取相同任務。

  • 設置執行開始時間結束時間:與Crontab格式一致,在本時間段內任務纔會執行。
  • 添加任務項:

0,1,2,3,4,5,6,7,8,9

 

17.在TBSchedule中建立調度策略:

(1)進入TBSchedule的控制檯->調度策略

點擊「建立新策略…」

 

(2)填寫策略屬性:

注意任務名稱要與新建的任務名稱一致。

 

 

(3)點擊建立,將當即啓動調度任務

 

另外,除了在控制檯中配置調度策略、任務,還能夠經過經過代碼、Spring配置來設置任務調度參數,推薦採用Spring配置方式。

 

18.代碼方式

建立類TaskCenter:

 

 

package  com.jf.tbsdemo;
  
import  java.util.Properties;
  
import  javax.annotation.Resource;
  
import  org.apache.log4j.Logger;
import  org.springframework.context.ApplicationContext;
import  org.springframework.context.support.FileSystemXmlApplicationContext;
  
import  com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import  com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
import  com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
  
public  class  TaskCenter {
     private  static  final  Logger logger = Logger.getLogger(TaskCenter. class );
  
     // 初始化調度工廠
     @Resource
     TBScheduleManagerFactory scheduleManagerFactory =  new  TBScheduleManagerFactory();
     private  void  startTask() {
        // 初始化Spring
        ApplicationContext ctx =  new  FileSystemXmlApplicationContext( "classpath:applicationContext.xml" );
  
  
        Properties p =  new  Properties();
        p.put( "zkConnectString" "localhost:2181" );
        p.put( "rootPath" "/app-schedule/demo" );
        p.put( "zkSessionTimeout" "60000" );
        p.put( "userName" "admin" );
        p.put( "password" "password" );
        p.put( "isCheckParentPath" "true" );
  
        scheduleManagerFactory.setApplicationContext(ctx);
  
        try  {
            scheduleManagerFactory.init(p);
    
            // 建立任務調度任務的基本信息
            String baseTaskTypeName =  "DemoTask" ;
            ScheduleTaskType baseTaskType =  new  ScheduleTaskType();
            baseTaskType.setBaseTaskType(baseTaskTypeName);
            baseTaskType.setDealBeanName( "demoTaskBean" );
            baseTaskType.setHeartBeatRate( 10000 );
            baseTaskType.setJudgeDeadInterval( 100000 );
            baseTaskType.setTaskParameter( "AREA=BJ,YEAR>30" );
            baseTaskType.setTaskItems(ScheduleTaskType
                   .splitTaskItem( "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4},"
                          "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8},"
                          "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}" ));
            baseTaskType.setFetchDataNumber( 500 );
            baseTaskType.setThreadNumber( 5 );
            scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);
            logger.info( "建立調度任務成功:"  + baseTaskType.toString());
    
            // 建立任務的調度策略
            String taskName = baseTaskTypeName;
            String strategyName = taskName +  "-Strategy" ;
            try  {
                scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName,  true );
            catch  (Exception e) {
               e.printStackTrace();
            }
            ScheduleStrategy strategy =  new  ScheduleStrategy();
            strategy.setStrategyName(strategyName);
            strategy.setKind(ScheduleStrategy.Kind.Schedule);
            strategy.setTaskName(taskName);
            strategy.setTaskParameter( "china" );
    
            strategy.setNumOfSingleServer( 1 );
            strategy.setAssignNum( 10 );
            strategy.setIPList( "127.0.0.1" .split( "," ));
            scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);
  
            logger.info( "建立調度策略成功:"  + strategy.toString());
  
            logger.info( "---------------task start------------------" );
        catch (Exception e) {
            logger.error( "出現異常" , e);
        }
     }
  
     public  static  void  main(String[] args)  throws  Exception {
        TaskCenter taskCenter =  new  TaskCenter();
        taskCenter.startTask();
     }
}

 

 

19.Spring配置文件方式

(1)增長類AbstractBaseScheduleTask:

 

package  com.jf.tbsdemo;
  
import  com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
import  com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
import  com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
  
public  abstract  class  AbstractBaseScheduleTask<T>  implements  IScheduleTaskDealSingle<T> {
     /**
      * 調度任務的配置
      */
     private  ScheduleTaskType scheduleTaskType;
     /**
      * 調度策略的配置
      */
     private  ScheduleStrategy scheduleStrategy;
  
     public  ScheduleTaskType getScheduleTaskType() {
         return  scheduleTaskType;
     }
  
     public  void  setScheduleTaskType(ScheduleTaskType scheduleTaskType) {
         this .scheduleTaskType = scheduleTaskType;
     }
  
     public  ScheduleStrategy getScheduleStrategy() {
         return  scheduleStrategy;
     }
  
     public  void  setScheduleStrategy(ScheduleStrategy scheduleStrategy) {
         this .scheduleStrategy = scheduleStrategy;
     }
}

 

 

(2)修改IScheduleTaskDealSingleTest:

類聲明改成:

public class IScheduleTaskDealSingleTest extends AbstractBaseScheduleTask<TaskModel> {

 

(3)在applicationContext.xml中對聲明IScheduleTaskDealSingleTest的Bean並注入參數,內容爲:

 

 

<?xml version= "1.0"  encoding= "UTF-8" ?>
     xsi:schemaLocation="http: //www.springframework.org/schema/beans 
                         http: //www.springframework.org/schema/beans/spring-beans-4.0.xsd 
                         http: //www.springframework.org/schema/context 
                         http: //www.springframework.org/schema/context/spring-context-4.0.xsd">
  
     <context:component-scan base- package = "com.jf"  />
     <!-- 引入配置文件 -->
     <bean id= "propertyConfigurer"
        class = "org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" >
        <property name= "locations" >
            <list>
               <value>classpath:tbschedule.properties</value>
            </list>
        </property>
     </bean>
  
     <!--tbschedule管理器初始化(配置zookeeper,註冊調度任務和調度策略)-->
<bean id= "systemTBScheduleManagerFactory"  class = "com.jf.tbsdemo.SystemTBScheduleManagerFactory" >
    <property name= "zkConfig" >
             <map>
                 <entry key= "zkConnectString"  value= "${schedule.zookeeper.address}"  />
               <entry key= "rootPath"  value= "${schedule.root.catalog}"  />
               <entry key= "zkSessionTimeout"  value= "${schedule.timeout}"  />
               <entry key= "userName"  value= "${schedule.username}"  />
               <entry key= "password"  value= "${schedule.password}"  />
               <entry key= "isCheckParentPath"  value= "true"  />
             </map>
         </property>
     </bean>
    
<bean name= "scheduleTaskType"  class = "com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType" >
<!-- 心跳頻率(毫秒) -->
         <property name= "heartBeatRate"  value= "5000"  />
         <!-- 假定服務死亡間隔(毫秒) -->
    <property name= "judgeDeadInterval"  value= "60000"  />
     <!-- 處理模式 -->
         <property name= "processorType"  value= "SLEEP"  />
         <!-- 線程數 -->
         <property name= "threadNumber"  value= "5"  />
<!--容許執行的開始時間-->
         <property name= "permitRunStartTime"  value= ""  />
         <!--容許執行的結束時間-->
         <property name= "permitRunEndTime"  value= ""   />
<!--當沒有數據的時候,休眠的時間-->
<property name= "sleepTimeNoData"  value= "3000"   />
<!--在每次數據處理完後休眠的時間-->
<property name= "sleepTimeInterval"  value= "1000"   />
<!--每次獲取數據的數量-->
<property name= "fetchDataNumber"  value= "10"   />
<!--任務項數組-->
         <property name= "taskItems" >
             <list>
                 <value> 0 :{TYPE=A,KIND= 1 }</value>
                 <value> 1 :{TYPE=B,KIND= 2 }</value>
                 <value> 2 :{TYPE=C,KIND= 3 }</value>
             </list>
</property>
     </bean>
     <bean name= "scheduleStrategy"  class = "com.taobao.pamirs.schedule.strategy.ScheduleStrategy" >
         <!--最大線程組數量-->
         <property name= "assignNum"  value= "9"  />
         <!--單個機器(JVM)的線程組數量-->
         <property name= "numOfSingleServer"  value= "3"  />
         <!--策略運行的機器(JVM)IP-->
         <property name= "IPList" >
             <list>
                 <value> 127.0 . 0.1 </value>
             </list>
         </property>
</bean>
<!--任務simpleTask-->
     <bean id= "simpleTask"  class = "com.jf.tbsdemo.IScheduleTaskDealSingleTest"  >
         <property name= "scheduleTaskType"  ref= "scheduleTaskType"  />
         <property name= "scheduleStrategy"  ref= "scheduleStrategy"  />
     </bean>
</beans>

 

 

(4)打開控制檯,刪除全部已有的任務、調度策略,再啓動TaskCenter,刷新頁面則可看到當前出現了正在運行的任務和調度策略。

 

(5)也能夠在控制檯中修改策略,但重啓TaskCenter以後會恢復Spring中的配置信息。

 

20.數據分片方法

爲了不TBSchedule管理的多線程重複處理數據,須要採用分片,實現方法以下:

(1)在selectTasks()方法中實現分片獲取待處理數據

(2) selectTasks()方法的taskItemList參數爲當前線程分配到的可處理任務分片信息,所有任務分片信息由配置文件中的taskItem定義,每項任務信息爲TaskItemDefine類型,其中taskItemId標明瞭分片ID(即0,1,2),parameter爲自定義參數(即{TYPE=A,KIND=1},{TYPE=B,KIND=2},{TYPE=C,KIND=3})。

(3)根據上面算出的分片ID來取得相應的待處理任務,例如selectTasks()方法從數據庫中獲取待處理的交易請求記錄,能夠將記錄的主鍵或者其餘字段HashCode值的餘數做爲分片ID,在selectTasks()方法中只獲取與taskItemList中指定分片ID相同的任務,避免不一樣線程重複獲取同一任務。

(4)在系統運行過程當中,線程數量會有所變化,所以要在每一個selectTasks()方法執行開始先獲取taskItemList。

(5) 每次執行selectTasks()方法取得記錄條數不要超過eachFetchDataNum

(6)典型的分片代碼實現:

 

/**
  * 根據條件,查詢當前調度服務器可處理的任務
  * @param taskParameter 任務的自定義參數
  * @param ownSign 當前環境名稱
  * @param taskItemNum 當前任務類型的任務隊列數量
  * @param taskItemList 當前調度服務器,分配到的可處理隊列
  * @param eachFetchDataNum 每次獲取數據的數量
  * @return
  * @throws Exception
  */
public  List<Date> selectTasks(String taskParameter, String ownSign,  int  taskItemNum, List<TaskItemDefine> taskItemList,  int  eachFetchDataNum)  throws  Exception {
     List<Date> dateList =  new  ArrayList<>();
  
     List<Long> taskIdList =  new  ArrayList<>();
     for (TaskItemDefine t : taskItemList){  //肯定當前任務處理器需處理的任務項id
         taskIdList.add(Long.valueOf(t.getTaskItemId()));
     }
  
for ( int  i= 0 ;i<eachFetchDataNum;i++){  // 添加最多指定數量的待處理數據
     Date date =  new  Date();  //生成待處理數據
         Long remainder = date.getTime() % taskItemNum ;
         if (taskIdList.contains(remainder)){   //根據數據取模,判斷當前待處理數據,是否應由當前任務處理器處理
             dateList.add(date);
         }
         TimeUnit.SECONDS.sleep( 1 );
     }
     return  dateList;   //返回當前任務處理器須要處理的數據
}

 

 

21.參數說明

(1)zookeeper參數

zkConnectString:zookeeper註冊中心地址

rootPath:定時任務根目錄,任意指定,調度控制檯配置時對應

zkSessionTimeout:超時時間

userName:帳戶,任意指定,調度控制檯配置時對應

password:密碼,任意指定,調度控制檯配置時對應

isCheckParentPath:設置爲true會檢查上級目錄是否已經被用做TBSchedule調度,若是是則啓動任務失敗。

(2)任務參數:

heartBeatRate:心跳頻率(毫秒)

judgeDeadInterval:假定服務死亡間隔(毫秒)

sleepTimeNoData: 當沒有數據的時候,休眠的時間

sleepTimeInterval:在每次數據處理完後休眠的時間

processorType:處理模式,可爲SLEEP或NOTSLEEP。

permitRunStartTime:執行開始時間若是爲空則不定時,直接執行。

permitRunEndTime:執行結束時間,與執行開始時間之間的時間才能夠執行任務。

taskItems: 任務項數組,例如:0:{TYPE=A,KIND=1},1:{TYPE=B,KIND=2},2:{TYPE=C,KIND=3}

在調度過程當中,某線程分得了獲取數據的任務,假設獲取第1項任務,則在selectTasks()方法的taskItemList參數中包含第1項任務的信息,TaskItemDefine類型,包含:taskItemId、parameter成員變量,分別爲:一、{TYPE=B,KIND=2}。可根據該信息取得相應的數據。

fetchDataNumber:selectTasks()方法每次獲取數據的數量

executeNumber:每次執行數量,即execute()方法每次取得的任務數量,只在bean實現IScheduleTaskDealMulti才生效。

threadNumber:每一個線程組中的線程數

maxTaskItemsOfOneThreadGroup:每一組線程能分配的最大任務數量,避免在隨着機器的減小把正常的服務器壓死,0或者空表示不限制

taskParameter:任務的自定義參數,可做爲selectTasks中的參數傳入。

(3)調度策略參數:

strategyName:策略名稱,必須填寫,不能有中文和特殊字符。

kind:任務類型,Schedule,Java,Bean 大小寫敏感。

taskName:要配置調度策略的任務名稱,與這一任務配置的名稱要一致。

taskParameter:任務參數,逗號分隔的Key-Value。對任務類型爲Java、Bean的有效,對任務類型爲Schedule的無效,須要經過任務管理來配置。

assignNum:最大線程組數量,是全部機器(JVM)總共運行的線程組的最大數量。

numOfSingleServer單個機器(JVM)的線程組數量,若是是0,則表示無限制。

IPList:策略運行的機器(JVM)IP列表,127.0.0.1或者localhost會在全部機器上運行。

 

4、注意事項

  1. 若是分配給某線程的任務還未執行完,重啓該線程所屬進程後,這些任務將會丟失,所以要自行實現冪等,且不要直接kill進程,而是發消息通知各線程執行完畢後安全退出。

當在控制檯點擊中止任務的按鈕時。會將任務池中未處理的任務清除,而中止前的在處理的任務將繼續執行。

  1. 若是要設置任務間隔必定時間運行一次,假設爲10秒,能夠將permitRunEndTime、permitRunStartTime設置爲空,將sleepTimeNoData、sleepTimeInterval均設置爲10000,這樣每一個線程運行完畢後無論有沒有任務均休眠10秒。

也能夠只設置permitRunStartTime,將permitRunEndTime設置爲空或者-1。

  1. 通常來講沒有任務時線程休眠時間間隔較大,而有任務時休眠時間間隔要較小,所以sleepTimeNoData通常都大於sleepTimeInterval。
  2. 使用同一個zookeeper的不一樣項目若是使用同一個zookeeper實例時,所使用的zookeeper根目錄不能有父子關係,即便是同一項目的不一樣實例(例如測試環境、開發環境、準生產環境各部署一套實例)也要使用不具備父子關係的不一樣根目錄。
  3. 任務中配置的每次獲取數據量(fetchDataNumber)要大於10倍的線程數(threadNumber),即:

fetchDataNumber >= threadNumber * 最少循環次數10,不然TBSchedule日誌會提示:參數設置不合理,系統性能不佳。

  1. 假定服務死亡間隔judgeDeadInterval至少要大於心跳頻率heartBeatRate的5倍。
  2. 任務配置出錯時,在控制檯會對該任務加紅色高亮底色標識。
  3. 當線程組運行出現故障未及時取數時,在控制檯會對該線程組加紅色高亮底色標識。
  4. 當運行過程當中增長節點或修改配置,日誌中可能會出現提示Zookeeper節點不存在的NullPointerException異常,不用理會。

10.理論上單臺機器最大線程數爲:

線程數threadNumber*單個機器的線程組數量numOfSingleServer,而numOfSingleServer並非上限,僅有1臺機器時,該機器的線程組數量能達到assignNum。

11.TBSchedule給各機器以線程組爲單位進行分配,全部機器的線程組總數不會超過最大線程組數量assignNum。

12.通常來講在selectTasks()中獲取任務,而後在execute()方法中處理,在SLEEP處理模式下,最後一個活動線程纔會去獲取任務,所以不會出現重複執行任務的狀況。但若是在selectTasks()或execute()中再新建線程或線程池來處理任務,會出現新建線程未處理完成,但TBSchedule框架認爲已處理結束從而進行下一次獲取任務的操做,可能會重複取出正在處理的任務,所以應儘可能避免新建線程和線程池。

13.在selectTasks()中獲取到任務後或者在execute()中處理完任務後應更改狀態,防止下次再次取到,形成重複處理。

14.在SLEEP處理模式下,配置的分片數量應合理,分片較多則同一線程組分配過多分片,對不一樣分片分別查詢獲取任務則效率會下降,而分片較少則不利於擴展機器。

15.在SLEEP處理模式下,同一時間只會有一個線程執行selectTasks(),其餘線程均處於休眠狀態,所以不宜在selectTasks()中進行過多操做而讓其餘線程等待時間過長,處理工做應儘可能在execute()中進行。或者採用NOTSLEEP模式,讓多個線程能夠同時運行selectTasks()獲取不一樣分片的任務。

NOTSLEEP模式須要實現getComparator(),防止從任務池中取出的某項任務正在被本進程中的其餘線程處理。原理是在取任務前先取得正在運行的任務放入maybeRepeatTaskList中,取得任務放入任務池後,再與maybeRepeatTaskList中的每項任務對比。同時取任務時加鎖保證只有一個線程在取任務。

只有在NotSleep模式下getComparator()纔有效,在Sleep模式下無效。

執行getComparator()時會遍歷正在處理的任務池。

16.複雜任務能夠拆分紅多項子任務,並配置不一樣的策略,爲操做最複雜的子任務分配較多線程,從而提升整體的處理效率。

若是不進行拆分,則會有單個線程處理時間較長,併發的線程數較少,處理時間長短不一, 且任務分配不均勻等問題。例如任務爲:從FTP中取得不一樣大小的文件進行解析,將每行數據寫入分庫中。

若是在selectTasks中取得的每一個任務對應一個文件,在execute()中處理任務時(解析文件併入庫),效率會很是低。可對解析文件的任務作改造:

改造方案1:在execute()中解析文件後入庫時採用線程池處理。但這樣仍不能解決任務分配不勻的問題,且引入線程池會增長線程數量。尤爲是會形成框架錯誤判斷任務已結束,致使重複處理,所以本方案不合理。

改造方案2:將任務拆分爲兩個子任務,文件解析和入分庫。

子任務1:在原execute()中對文件解析後不直接入分庫,而是取1000條合成1條記錄存入本地單庫的中間表,解析文件耗時較短且記錄數較少能夠較快完成,且時間不都可以忽略。

子任務2:對中間表記錄按照自增主鍵ID分片,selectTasks()中取得記錄,而後拆分紅原始單條記錄返回,在execute()中對單條記錄進行入庫處理。

 

改造方案2的線程數較少,且任務分配會比較均勻,同時避免了單線程處理一個大任務等待時間過長的問題。

17.Zookeeper存儲的數據:機器、策略定義、任務定義、任務分片(包含當前屬於哪一個機器處理)

18.在zookeeper中每臺機器都可保存不一樣的線程數等配置,說明不一樣機器可使用不一樣的線程數等配置,但不建議使用不一樣配置。

19.在多任務模式下,executeNumber不要設置的太大,較小一些能夠減小等待最後一個活躍線程的時間,而且若是fetchDataNumber<線程數*executeNumber,則會有線程沒法分得任務。任務分配在本進程中進行,並不會請求zookeeper,所以設置的較小一些效率更高。

20.當須要重啓應用時,要在控制檯上先把機器所有中止,等待線程組消失,不然直接重啓應用時會出現新的機器實例,舊的機器實例未能釋放分片,致使新的機器獲取不到任務分片沒法執行,控制檯上會顯示新、舊線程組均爲紅色。

21.使用同一zookeeper目錄的多臺機器中,先啓動的機器通常爲leader,負責分片的分配。

22.控制檯顯示某線程組紅色異常,長時間未取數時,多是取任務的selectTasks()運行異常,或者每次取的任務數量過大,致使長時間未會處理完,能夠適當調小eachFetchDataNum。

也有多是由於在SLEEP模式下任務處理時間過長。

23.分片按線程組進行分配,同一機器中有多個線程組時,該機器分得多個分片,也會均勻分配給線程組,每一個線程組各自獨立取任務調度,不會同時取任務。

24.當加入新機器時,會請求得到分片。框架10秒掃描一次,若是發現機器數量有變化,且佔用分片較多的機器完成任務則會自動從新分配分片。

25.若是每次從數據庫裏取待處理記錄生成任務時,若是總記錄數較多,即便取到的有效記錄數較少,則掃描整張表花費時間較長,除了創建必要的索引,也應該減小無數據時掃描頻次,即下降sleepTimeInterval,也可在selectTasks()中在取到記錄後檢查數量,若是較少則sleep一段時間再返回任務,也應加大sleepTimeNoData。

26.若是任務處理結束後還要合併結果再進入下一輪處理,則最慢的機器會減慢總體速度,所以要儘可能保證任務分片均勻分給不一樣機器,分片數量要能被機器數量整除,也能被最大線程組數量assignNum整除,這樣每臺機器處理的任務數量大體相同。

27.在Zookeeper鏈接配置中保存時提示:

錯誤信息:Zookeeper connecting ......localhost:2181

同時沒法進入其餘頁面,可能因爲採用不一樣的用戶名密碼配置過同一目錄造zookeeper數據異常,能夠在zookeeper中手動刪除目錄數據,或者更換新目錄後重啓應用。

在zookeeper中刪除目錄方法:

[root@192.168.1.5]$ ./zkClient.sh

[zk: localhost:2181(CONNECTED) 0] addauth digest admin:password

[zk: localhost:2181(CONNECTED) 1] rmr /app-schedule/demo

 

在控制檯沒法修改目錄的帳戶密碼,可在zookeeper客戶端中刪除目錄後重建目錄及帳戶密碼。

 

28.每位用戶登陸控制檯後打開的配置信息均保存在bin目錄下的pamirsScheduleConfig.properties,所以在同一Tomcat下操做不一樣的TBSchedule目錄時會衝突,已修改TBSchedule的代碼解決了這一問題。

29.selectTasks()方法從數據庫中取得記錄時,能夠在select語句中對某字段進行mod取餘,這樣只獲取本線程組所分配的分片。通常有多個分庫時,同時也會採用mycat,主鍵ID沒法採用自增,經常使用雪花算法來生成不重複的ID,但對這種ID取模通常不容易均勻,所以可增長建立時間戳字段來用於取模,通常各機器取得的任務數較爲均勻。

30.若是使用zookeeper集羣,則在tbschedule.properties中配置schedule.zookeeper.address時,格式以下:

IP1:Port1,IP2:Port2,IP3:Port3

31.TBSchedule沒法實現任務均衡的轉移,即當一臺機器處理任務較多,其餘機器較閒時,不會轉到其餘機器。

32.若是使用數據庫鏈接池,則單個機器中的線程數量不要比鏈接池數量大太多,或者不高於,以防出現線程獲取不到數據庫鏈接的狀況出現。

33.Sleep模式在實現邏輯上相對簡單清晰,但存在一個大任務處理時間長,致使其它線程不工做的狀況。
在NotSleep模式下,減小了線程休眠的時間,避免大任務阻塞的狀況,但爲了不數據被重複處理,增長了CPU在數據比較上的開銷。
同時要求業務接口實現對象的比較接口。
若是對任務處理不容許停頓的狀況下建議用NotSleep模式,其它狀況建議用Sleep模式。
34.主機編號最小的爲Leader,若是是Leader第一次啓動則會清除全部垃圾數據。
35.若是任務是輪詢類型,可將permitRunStartTime、permitRunEndTime均設置爲空,將一直運行,可設置sleepTimeNoData、sleepTimeInterval來sleep。

若是要設置在必定時間作內輪詢,則能夠同時設置permitRunStartTime、permitRunEndTime,在這一時間段內會執行selectTasks()及execute()。

在到達結束時間時,會將任務池清空,並設置中止運行標誌,此時將沒法再啓動新的線程運行execute(),所以若是selectTasks()運行時間略長於permitRunEndTime-permitRunStartTime,則execute()可能會永遠都沒法被執行到。

例如:permitRunStartTime設置爲:0/10 * * * * ?

    permitRunEndTime設置爲:5/10 * * * * ?

而selectTasks()執行時間爲6秒,則在第6秒時execute()沒有機會被執行。

所以對於輪詢任務,最好將permitRunStartTime、permitRunEndTime均設置爲空.

將permitRunEndTime設置爲-1與爲空做用一致。
36.若是任務是定時任務,則能夠只設置permitRunStartTime,而將permitRunEndTime設置爲空或-1,這樣在selectTasks()取得任務爲空時會sleep(),直到下一個開始時間時纔會執行。

例如:permitRunStartTime設置爲:0/10 * * * * ?

    permitRunEndTime設置爲:-1

則在每10秒的第0秒開始執行selectTasks()取任務,若是取到任務則會交給其餘線程執行execute(),若是未取到則會sleep(),直到下一個開始時間時纔會執行。

若是隻但願同一時間僅有一個線程處理任務,則能夠只設置一個分片,並採用SLEEP模式,numOfSingleServer、assignNum均設置爲1。

37.每一個心跳週期都會向zookeeper更新心跳信息,若是超過judgeDeadInterval(假定服務死亡間隔)未更新過,則清除zookeeper上的任務信息及Server信息。每一個心跳週期也會從新分配分片。
也會清除zookeeper中分配給已消失機器上的任務信息。
38.若是有定時任務執行出現故障,或者因重啓錯過了執行時間,若是要在下一次時間點前再次執行,則能夠在控制檯上臨時增長任務類型、策略,來臨時定時執行一次,月日也加上防止忘記刪除任務致使屢次重複執行。執行完成後再刪除該任務類型、策略。
39.有時應用啓動後日志顯示正常,但不執行任務,有多是zookeeper中數據出現錯誤,可刪除該目錄,重啓應用便可。
40.在控制檯上點擊機器的中止按鈕時,會將zookeeper中該機器的運行狀態設置爲false,並清除本機器的任務池中未被處理的任務。在每臺機器進程中每2秒刷新一次運行狀態,當檢測到false,則在任務執行完畢後再也不取任務處理。
41.SystemTBScheduleManagerFactory也可取消,改用@Bean註解,例如:
ScheduleJobConfiguration.java:

 

package  com.jfbank.schedule.monitor.alarm.tbs;
  
import  java.util.HashMap;
import  java.util.Map;
  
import  org.springframework.beans.factory.annotation.Value;
import  org.springframework.context.annotation.Bean;
import  org.springframework.context.annotation.Configuration;
  
import  com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
  
@Configuration 
public  class  ScheduleJobConfiguration{ 
  
     @Bean (initMethod =  "init"
     public  TBScheduleManagerFactory tbScheduleManagerFactory( 
             @Value ( "${schedule.zookeeper.address}" )String zkConnectString,  
             @Value ( "${schedule.root.catalog}" )String rootPath, 
             @Value ( "${schedule.timeout}" )String zkSessionTimeout, 
             @Value ( "${schedule.username}" )String userName, 
             @Value ( "${schedule.password}" )String password, 
             @Value ( "${schedule.isCheckParentPath}" )String isCheckParentPath) { 
         TBScheduleManagerFactory tbScheduleManagerFactory =  new  TBScheduleManagerFactory(); 
         Map<String, String> zkConfig =  new  HashMap<String, String>(); 
         zkConfig.put( "zkConnectString" , zkConnectString); 
         zkConfig.put( "rootPath" , rootPath); 
         zkConfig.put( "zkSessionTimeout" , zkSessionTimeout); 
         zkConfig.put( "userName" , userName); 
         zkConfig.put( "password" , password); 
         zkConfig.put( "isCheckParentPath" , isCheckParentPath); 
         tbScheduleManagerFactory.setZkConfig(zkConfig); 
         return  tbScheduleManagerFactory; 
     }
}
相關文章
相關標籤/搜索