深刻理解分佈式調度框架TBSchedule及源碼分析

簡介

  因爲最近工做比較忙,前先後後花了兩個月的時間把TBSchedule的源碼翻了個底朝天。關於TBSchedule的使用,網上也有不少參考資料,這裏不作過多的闡述。本文着重介紹TBSchedule的運行機制,架構設計以及優化建議。經過學習別人的經驗,來提升本身的技術能力,感覺阿里人的智慧,也向阿里空玄,阿里玄難爲開源貢獻致敬。java

zookeeper依賴

  TBSchedule依賴於ZK存儲調度數據,在使用中充當着nosql的角色,zk的watch機制只用於zk重連,提升可靠性。下圖是zk與tbschedule的部署圖。算法

  TBSchedule有不少特性,包括批量任務,多主機,多線程,動態擴展,實時或定時任務,分片,併發,不重複執行。在介紹這些特性以前,先來了解一下整個zk目錄結構,有助於理解整個調度過程。下圖是zk調度數據結構圖。其中()內表示zk目錄保存的數據。sql

 

TBSchedule原理

1)TBSchedule在zookeeper初始化完成以後初始化數據,其中建立basetasktype,stractegy,factory目錄。調用registerManagerFactory,在factory目錄下建立瞬時有序節點,節點名稱(IP+$+HostName+$+UUID+$Sequence),而後根據ip是否在ip管理範圍內,在strategy目錄下添加或刪除對應的(IP+$+HostName+$+UUID+$Sequence)瞬時目錄節點。最後啓動默認的refresh()操做。數組

2)TBSchedule在每2s中zk正常狀況下執行一次refresh操做,該操做若是查詢zk管理信息異常則中止全部調度任務後從新註冊管理器工廠,若是管理器start狀態=false,則中止全部調度任務。具體實如今TBScheduleManagerFactory的reRegisterManagerFactory()中。具體代碼以下:緩存

public void reRegisterManagerFactory() throws Exception {
        // 根據UUID,在/factory目錄下查找對應目錄,並在/strategy目錄下更具IP數組,
        //肯定可管理的strtegyName下建立(IP+$+HostName+$+UUID+$Sequence)目錄
        // 返回不可管理的調度策略類型名稱,並中止對應的調度處理器
        List<String> stopList = this.getScheduleStrategyManager()
                .registerManagerFactory(this);
        for (String strategyName : stopList) {
            this.stopServer(strategyName);  //中止對應的調度處理器
        }
        //根據策略從新分配調度任務機器的任務數,並在zk上更新對應的ScheduleStrategyRunntime中的AssignNum
        this.assignScheduleServer(); 
        //注意,一個strategyName下只有惟一表示當前調度服務器的節點(IP+$+HostName+$+UUID+$Sequence)
        //同時一個strategyName對應該調度服務器多個IStrategyTask任務管理器,一個taskItem對應一個任務管理器
        //多則刪停,少則加起
        this.reRunScheduleServer(); 
    }

這邊再介紹下tbschedule的任務分配策略,列如當前有4臺機器(A,B,C,D),共10個任務(0,1..9)。首先將10個任務均等分,每一個服務器能夠分配到2個任務,最後剩餘兩個任務將給A,B服務器得到,具體算法以下:安全

/**
     * 分配任務數量
     * @param serverNum 總的服務器數量
     * @param taskItemNum 任務項數量
     * @param maxNumOfOneServer 每一個server最大任務項數目
     * @param maxNum 總的任務數量
     * @return
     */
    public static int[] assignTaskNumber(int serverNum,int taskItemNum,int maxNumOfOneServer){
        int[] taskNums = new int[serverNum];
        int numOfSingle = taskItemNum / serverNum;
        int otherNum = taskItemNum % serverNum;
        //20150323 刪除, 任務分片保證分配到全部的線程組數上。 開始
//        if (maxNumOfOneServer >0 && numOfSingle >= maxNumOfOneServer) {
//            numOfSingle = maxNumOfOneServer;
//            otherNum = 0;
//        }
        //20150323 刪除, 任務分片保證分配到全部的線程組數上。 結束
        for (int i = 0; i < taskNums.length; i++) {
            if (i < otherNum) {
                taskNums[i] = numOfSingle + 1;
            } else {
                taskNums[i] = numOfSingle;
            }
        }
        return taskNums;
    }

3)接下來根據每一個strategyName下得到的任務數,來建立對應任務調度管理器數。具體實如今reRunScheduleServer()方法中,循環建立IStrategyTask,根據調度類型Schedule,Java,bean實例化不一樣的任務管理器TBScheduleManagerStatic,也包括自定義管理器,只要繼承IStrategyTask接口就能夠了。列如自定義管理器,須要配置taskname爲java全類名或者bean的名稱。服務器

package com.taobao.pamirs.schedule.test;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.taobao.pamirs.schedule.strategy.IStrategyTask;
/**
 * 自定義任務管理器,調度類型爲Java,Bean
 * @author Administrator
 *
 */
public class JavaTaskDemo implements IStrategyTask,Runnable {
    protected static transient Logger log = LoggerFactory.getLogger(JavaTaskDemo.class);


    private String parameter;
    private boolean stop = false;
    public void initialTaskParameter(String strategyName,String taskParameter) {
        parameter = taskParameter;
        new Thread(this).start();
    }

    @Override
    public void stop(String strategyName) throws Exception {
        this.stop = true;
    }

    @Override
    public void run() {
        while(stop == false){
            log.error("執行任務:"  + this.parameter);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

}
View Code

對於經常使用的Schedule調度類型,使用的是TBScheduleManagerStatic管理器。網絡

4)任務調度分配器TBScheduleManager,可以使得任務分片被不重複,不遺漏的快速處理。該功能也是TBSchedule的核心實現,一個JVM能夠包含不一樣taskType的多個任務調度分配器。也就是說能夠有相同任務taskType的多個任務管理器,也能夠存在不一樣的tasktype的任務管理器。每一個任務管理器包含一個任務處理器IScheduleProcessor,IScheduleProcessor其實是個Runnnable對象,根據任務類型的線程數來初始化調度線程。任務處理器分爲SLEEP和NotSleep模式。數據結構

下面是建立TBScheduleManager的操做。多線程

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
        this.factory = aFactory;
        this.currentSerialNumber = serialNumber();
        this.scheduleCenter = aScheduleCenter;
        this.taskTypeInfo = this.scheduleCenter.loadTaskTypeBaseInfo(baseTaskType);
        log.info("create TBScheduleManager for taskType:"+baseTaskType);
        //清除已通過期1天的TASK,OWN_SIGN的組合。超過一天沒有活動server的視爲過時
        this.scheduleCenter.clearExpireTaskTypeRunningInfo(baseTaskType,ScheduleUtil.getLocalIP() + "清除過時OWN_SIGN信息",this.taskTypeInfo.getExpireOwnSignInterval());
        
        Object dealBean = aFactory.getBean(this.taskTypeInfo.getDealBeanName());
        if (dealBean == null) {
            throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 不存在");
        }
        if (dealBean instanceof IScheduleTaskDeal == false) {
            throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 沒有實現 IScheduleTaskDeal接口");
        }
        this.taskDealBean = (IScheduleTaskDeal)dealBean;

        if(this.taskTypeInfo.getJudgeDeadInterval() < this.taskTypeInfo.getHeartBeatRate() * 5){
            throw new Exception("數據配置存在問題,死亡的時間間隔,至少要大於心跳線程的5倍。當前配置數據:JudgeDeadInterval = "
                    + this.taskTypeInfo.getJudgeDeadInterval() 
                    + ",HeartBeatRate = " + this.taskTypeInfo.getHeartBeatRate());
        }
        //生成ScheduleServer信息。
        this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
        //設置ScheduleServer的ManagerFactoryUUID
        this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
        //在/server下注冊ScheduleServer信息,實際上能夠當作在server目錄下的每個子節點表示一個任務調度管理器
        scheduleCenter.registerScheduleServer(this.currenScheduleServer);
        this.mBeanName = "pamirs:name=" + "schedule.ServerMananger." +this.currenScheduleServer.getUuid();
        this.heartBeatTimer = new Timer(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-HeartBeat");
        this.heartBeatTimer.schedule(new HeartBeatTimerTask(this),
                new java.util.Date(System.currentTimeMillis() + 500),
                this.taskTypeInfo.getHeartBeatRate());
        initial();
    } 

 5)上面有兩個重要的操做,一個是心跳調度器,主要職責是更新/server目錄下對應的調度管理器心跳信息,清除過時的scheduleServer,若是是leader則進行任務項的分配。

class HeartBeatTimerTask extends java.util.TimerTask {
    private static transient Logger log = LoggerFactory
            .getLogger(HeartBeatTimerTask.class);
    TBScheduleManager manager;

    public HeartBeatTimerTask(TBScheduleManager aManager) {
        manager = aManager;
    }

    public void run() {
        try {
            Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
            manager.refreshScheduleServerInfo();
        } catch (Exception ex) {
            log.error(ex.getMessage(), ex);
        }
    }
}
/**
     * 若是發現本次更新的時間若是已經超過了,服務器死亡的心跳週期,則不能在向服務器更新信息。
     * 而應該看成新的服務器,進行從新註冊。
     * @throws Exception 
     */
    public void refreshScheduleServerInfo() throws Exception {
      try{
          //在/server下更新任務調度服務器的心跳時間,調度信息
        rewriteScheduleInfo();
        //若是任務信息沒有初始化成功,不作任務相關的處理,未完成init()
        if(this.isRuntimeInfoInitial == false){
            return;
        }
        
        //從新分配任務,leader從新檢查可用調度管理器,並修改taskItem下的current_server,req_server.
        this.assignScheduleTask();
        
        //判斷是否須要從新加載任務隊列,避免任務處理進程沒必要要的檢查和等待
        //思路:每一次修改了taskitem的任務分配以後,會在/taskitem下保存leader信息,及默認版本號-1
        //比較保存的上一次任務加載的版本號是否  <   當前的版本號
        boolean tmpBoolean = this.isNeedReLoadTaskItemList();
        if(tmpBoolean != this.isNeedReloadTaskItem){
            //只要不相同,就設置須要從新裝載,由於在心跳異常的時候,作了清理隊列的事情,恢復後須要從新裝載。
            synchronized (NeedReloadTaskItemLock) {
                this.isNeedReloadTaskItem = true;
            }
            rewriteScheduleInfo();
        }
        
        if(this.isPauseSchedule  == true || this.processor != null && processor.isSleeping() == true){
            //若是服務已經暫停了,則須要從新定時更新 cur_server 和 req_server
            //若是服務沒有暫停,必定不能調用的
            //調度服務策略若是已經失效,會拋出異常
            //加載任務list<taskDefine>
               this.getCurrentScheduleTaskItemListNow();
          }
        }catch(Throwable e){
            //清除內存中全部的已經取得的數據和任務隊列,避免心跳線程失敗時候致使的數據重複
            this.clearMemoInfo();
            if(e instanceof Exception){
                throw (Exception)e;
            }else{
               throw new Exception(e.getMessage(),e);
            }
        }
    }    

其中的this.assignScheduleTask();實現了任務調度管理器的變化而相應的修改/taskItem下curr_server和req_server的調度變化。核心思想:rewriteScheduleInfo()中沒有相應的調度服務器,則在/server下注冊。而後獲取有效的全部調度服務器,遍歷全部任務項,若是發現該任務項的curr_server表示的manager不存在,則設置null。而後對全部的任務分片從新分配調度服務器,具體算法以下:

public void assignTaskItem(String taskType, String currentUuid,int maxNumOfOneServer,
            List<String> taskServerList) throws Exception {
         if(this.isLeader(currentUuid,taskServerList)==false){
             if(log.isDebugEnabled()){
               log.debug(currentUuid +":不是負責任務分配的Leader,直接返回");
             }
             return;
         }
         if(log.isDebugEnabled()){
               log.debug(currentUuid +":開始從新分配任務......");
         }        
         if(taskServerList.size()<=0){
             //在服務器動態調整的時候,可能出現服務器列表爲空的清空
             return;
         }
         String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(taskType);
         String zkPath = this.PATH_BaseTaskType + "/" + baseTaskType + "/" + taskType + "/" + this.PATH_TaskItem;
         List<String> children = this.getZooKeeper().getChildren(zkPath, false);
//         Collections.sort(children);
//         20150323 有些任務分片,業務方實際上是用數字的字符串排序的。優先以數字進行排序,不然以字符串排序
         Collections.sort(children,new Comparator<String>(){
             public int compare(String u1, String u2) {
                    if(StringUtils.isNumeric(u1) && StringUtils.isNumeric(u2)){
                        int iU1= Integer.parseInt(u1);
                        int iU2= Integer.parseInt(u2);
                        /*if(iU1==iU2){
                            return 0 ;
                        }else if(iU1>iU2){
                            return 1 ;
                        }else{
                            return -1;
                        }*/
                        return iU1-iU2;
                    }else{
                        return u1.compareTo(u2);
                    }
                }
            });
         int unModifyCount =0;
         int[] taskNums = ScheduleUtil.assignTaskNumber(taskServerList.size(), children.size(), maxNumOfOneServer);
         int point =0;
         int count = 0;
         String NO_SERVER_DEAL = "沒有分配到服務器"; 
         for(int i=0;i <children.size();i++){
            String name = children.get(i);
            if(point <taskServerList.size() && i >= count + taskNums[point]){
                count = count + taskNums[point];
                point = point + 1;
            }
            String serverName = NO_SERVER_DEAL;
            if(point < taskServerList.size() ){
                serverName = taskServerList.get(point);
            }
            byte[] curServerValue = this.getZooKeeper().getData(zkPath + "/" + name + "/cur_server",false,null);
            byte[] reqServerValue = this.getZooKeeper().getData(zkPath + "/" + name + "/req_server",false,null);
            
            if(curServerValue == null || new String(curServerValue).equals(NO_SERVER_DEAL)){
                //對沒有分配的任務分片,添加調度服務器
                this.getZooKeeper().setData(zkPath + "/" + name + "/cur_server",serverName.getBytes(),-1);
                this.getZooKeeper().setData(zkPath + "/" + name + "/req_server",null,-1);
            }else if(new String(curServerValue).equals(serverName)==true && reqServerValue == null ){
                //不須要作任何事情   當前執行的調度器正好和從新分配的調度器一致
                unModifyCount = unModifyCount + 1;
            }else{
                //調度服務器請求轉換
                this.getZooKeeper().setData(zkPath + "/" + name + "/req_server",serverName.getBytes(),-1);
            }
         }    
         
         if(unModifyCount < children.size()){ //設置須要全部的服務器從新裝載任務
             log.info("設置須要全部的服務器從新裝載任務:updateReloadTaskItemFlag......"+taskType+ "  ,currentUuid "+currentUuid );
             //設置/server[v.2][reload=true]
             this.updateReloadTaskItemFlag(taskType);
         }
         if(log.isDebugEnabled()){
             StringBuffer buffer = new StringBuffer();
             for(ScheduleTaskItem taskItem: this.loadAllTaskItem(taskType)){
                buffer.append("\n").append(taskItem.toString());
             }
             log.debug(buffer.toString());
         }
    }
View Code

 

在第4點附上的源碼最後有個initial();操做,首先啓動一個獨立的線程,判斷isRuntimeInfoInitial標誌位判斷是否已經初始化數據,若是沒有則leader調度器執行initialRunningInfo(),刪除/TaskItem目錄,根據ScheduleTaskType,獲取到的任務項數組,建立任務項節點,同時在/taskItem下設置leader數據。initial()源碼以下:

public void initial() throws Exception{
        new Thread(this.currenScheduleServer.getTaskType()  +"-" + this.currentSerialNumber +"-StartProcess"){
            @SuppressWarnings("static-access")
            public void run(){
                try{
                   log.info("開始獲取調度任務隊列...... of " + currenScheduleServer.getUuid());
                   //併發啓動調度管理器,直至leader初始化任務項完成
                   while (isRuntimeInfoInitial == false) {
                       if(isStopSchedule == true){
                          log.debug("外部命令終止調度,退出調度隊列獲取:" + currenScheduleServer.getUuid());
                          return;
                      }
                       //log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial);
                       try{
                      initialRunningInfo();
                      //在/taskitem下的數據判斷是否爲leader的數據
                      isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
                                        currenScheduleServer.getBaseTaskType(),
                                        currenScheduleServer.getOwnSign());
                       }catch(Throwable e){
                           //忽略初始化的異常
                           log.error(e.getMessage(),e);
                       }
                      if(isRuntimeInfoInitial == false){
                          Thread.currentThread().sleep(1000);
                      }
                    }
                   int count =0;
                   lastReloadTaskItemListTime = scheduleCenter.getSystemTime();
                   //此處會給currentTaskItemList添加元素,直至加載到任務
                   while(getCurrentScheduleTaskItemListNow().size() <= 0){
                          if(isStopSchedule == true){
                              log.debug("外部命令終止調度,退出調度隊列獲取:" + currenScheduleServer.getUuid());
                              return;
                          }
                          Thread.currentThread().sleep(1000);
                          count = count + 1;
                         // log.error("嘗試獲取調度隊列,第" + count + "次 ") ;
                   }
                   String tmpStr ="TaskItemDefine:";
                   for(int i=0;i< currentTaskItemList.size();i++){
                       if(i>0){
                           tmpStr = tmpStr +",";                           
                       }
                       tmpStr = tmpStr + currentTaskItemList.get(i);
                   }
                   log.info("獲取到任務處理隊列,開始調度:" + tmpStr +"  of  "+ currenScheduleServer.getUuid());
                   
                    //任務總量
                    taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size();
                    //只有在已經獲取到任務處理隊列後纔開始啓動任務處理器                   
                   computerStart();
                }catch(Exception e){
                    log.error(e.getMessage(),e);
                    String str = e.getMessage();
                    if(str.length() > 300){
                        str = str.substring(0,300);
                    }
                    startErrorInfo = "啓動處理異常:" + str;
                }
            }
        }.start();
    }
View Code

最後的computerStart()方法是實現週期執行的關鍵,TBSchedule基於cronExpression表達式實現週期性調度,執行類型分爲兩種TYPE_PAUSE,TYPE_RESUME。並更新setNextRunStartTime和setNextRunEndTime。

    /**
     * 開始的時候,計算第一次執行時間
     * @throws Exception
     */
    public void computerStart() throws Exception{
        //只有當存在可執行隊列後再開始啓動隊列
       
        boolean isRunNow = false;
        if(this.taskTypeInfo.getPermitRunStartTime() == null){
            isRunNow = true;
        }else{
            String tmpStr = this.taskTypeInfo.getPermitRunStartTime();
            if(tmpStr.toLowerCase().startsWith("startrun:")){
                isRunNow = true;
                tmpStr = tmpStr.substring("startrun:".length());
            }
            CronExpression cexpStart = new CronExpression(tmpStr);
            Date current = new Date( this.scheduleCenter.getSystemTime());
            Date firstStartTime = cexpStart.getNextValidTimeAfter(current);
            this.heartBeatTimer.schedule(
                    new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
                            PauseOrResumeScheduleTask.TYPE_RESUME,tmpStr), 
                            firstStartTime);
            this.currenScheduleServer.setNextRunStartTime(ScheduleUtil.transferDataToString(firstStartTime));    
            if( this.taskTypeInfo.getPermitRunEndTime() == null
               || this.taskTypeInfo.getPermitRunEndTime().equals("-1")){
                this.currenScheduleServer.setNextRunEndTime("當不能獲取到數據的時候pause");                
            }else{
                try {
                    String tmpEndStr = this.taskTypeInfo.getPermitRunEndTime();
                    CronExpression cexpEnd = new CronExpression(tmpEndStr);
                    Date firstEndTime = cexpEnd.getNextValidTimeAfter(firstStartTime);
                    Date nowEndTime = cexpEnd.getNextValidTimeAfter(current);
                    if(!nowEndTime.equals(firstEndTime) && current.before(nowEndTime)){
                        isRunNow = true;
                        firstEndTime = nowEndTime;
                    }
                    this.heartBeatTimer.schedule(
                            new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
                                    PauseOrResumeScheduleTask.TYPE_PAUSE,tmpEndStr), 
                                    firstEndTime);
                    this.currenScheduleServer.setNextRunEndTime(ScheduleUtil.transferDataToString(firstEndTime));
                } catch (Exception e) {
                    log.error("計算第一次執行時間出現異常:" + currenScheduleServer.getUuid(), e);
                    throw new Exception("計算第一次執行時間出現異常:" + currenScheduleServer.getUuid(), e);
                }
            }
        }
        //若是沒有getPermitRunStartTime,則跳過timer調度,當即執行
        if(isRunNow == true){
            this.resume("開啓服務當即啓動");
        }
        this.rewriteScheduleInfo();
        
    }
View Code

從上面的代碼中,咱們注意到了這個調度使用同一個timer對象,每次調度執行後在timer添加新的調度task。若是是PAUSE類型調度,則執行manager.pause("到達終止時間,pause調度"),若是是RESUME,則執行manager.resume("到達開始時間,resume調度");,並計算下次調度時間,從新添加到調度隊列。具體實現以下:

class PauseOrResumeScheduleTask extends java.util.TimerTask {
    private static transient Logger log = LoggerFactory
            .getLogger(HeartBeatTimerTask.class);
    public static int TYPE_PAUSE  = 1;
    public static int TYPE_RESUME = 2;    
    TBScheduleManager manager;
    Timer timer;
    int type;
    String cronTabExpress;
    public PauseOrResumeScheduleTask(TBScheduleManager aManager,Timer aTimer,int aType,String aCronTabExpress) {
        this.manager = aManager;
        this.timer = aTimer;
        this.type = aType;
        this.cronTabExpress = aCronTabExpress;
    }
    public void run() {
        try {
            Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
            this.cancel();//取消調度任務
            Date current = new Date(System.currentTimeMillis());
            CronExpression cexp = new CronExpression(this.cronTabExpress);
            Date nextTime = cexp.getNextValidTimeAfter(current);
            if(this.type == TYPE_PAUSE){
                manager.pause("到達終止時間,pause調度");
                this.manager.getScheduleServer().setNextRunEndTime(ScheduleUtil.transferDataToString(nextTime));
            }else{
                manager.resume("到達開始時間,resume調度");
                this.manager.getScheduleServer().setNextRunStartTime(ScheduleUtil.transferDataToString(nextTime));
            }
            this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);
        } catch (Throwable ex) {
            log.error(ex.getMessage(), ex);
        }
    }
}
View Code

resume即在可執行時間區間恢復調度,根據SchduleTaskType配置的處理器類型模式Sleep或者NotSleep來初始化處理器。默認使用TBScheduleProcessorSleep處理器。

/**
     * 處在了可執行的時間區間,恢復運行
     * @throws Exception 
     */
    public void resume(String message) throws Exception{
        if (this.isPauseSchedule == true) {
            if(log.isDebugEnabled()){
                log.debug("恢復調度:" + this.currenScheduleServer.getUuid());
            }
            this.isPauseSchedule = false;
            this.pauseMessage = message;
            if (this.taskDealBean != null) {
                if (this.taskTypeInfo.getProcessorType() != null &&
                    this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){
                    this.taskTypeInfo.setProcessorType("NOTSLEEP");
                    this.processor = new TBScheduleProcessorNotSleep(this,
                            taskDealBean,this.statisticsInfo);
                }else{
                    this.processor = new TBScheduleProcessorSleep(this,
                            taskDealBean,this.statisticsInfo);
                    this.taskTypeInfo.setProcessorType("SLEEP");
                }
            }
            rewriteScheduleInfo();//更新心跳信息
        }
    }    
6)多線程執行,TBScheduleProcessorSleep是一個Runnable對象,多個調度線程共享以下變量:
    final  LockObject   m_lockObject = new LockObject();
//緩存線程對象
    List<Thread> threadList =  new CopyOnWriteArrayList<Thread>();
    /**
     * 任務管理器
     */
    protected TBScheduleManager scheduleManager;
    /**
     * 任務類型
     */
    ScheduleTaskType taskTypeInfo;
    
    /**
     * 任務處理的接口類
     */
    protected IScheduleTaskDeal<T> taskDealBean;
        
    /**
     * 當前任務隊列的版本號
     */
    protected long taskListVersion = 0;
    final Object lockVersionObject = new Object();
    final Object lockRunningList = new Object();
        //任務隊列
    protected List<T> taskList = new CopyOnWriteArrayList<T>();

    /**
     * 是否能夠批處理
     */
    boolean isMutilTask = false;
    
    /**
     * 是否已經得到終止調度信號
     */
    boolean isStopSchedule = false;// 用戶中止隊列調度
    boolean isSleeping = false;

 在初始化執行處理器,會啓動ThreadNumber個線程數,

for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {
this.startThread(i);
}

下面具體看一下線程的run()操做。一個執行線程的職責主要是執行自定義的IScheduleTaskDealSingle,而IScheduleTaskDealMulti能夠實現批量處理,實現區別也是大同小異。其核心思想:

對開始執行的線程計數+1,在沒有中止調度的前提下即resume狀態下,執行客戶自定義ScheduleTask的execute()方法,並完成執行統計。當任務隊列中的全部任務Item都執行完成,隊列爲空時,若是正在執行任務的線程數不是最後一個線程,則等待。反之,則加載任務,有數據喚醒全部等待線程繼續執行,沒數據線程sleep SleepTimeNoData時間,並繼續加載任務數據。

public void run(){
          try {
            long startTime =0;
            while(true){
              this.m_lockObject.addThread();
              Object executeTask;
              while (true) {
                if(this.isStopSchedule == true){//中止隊列調度
                  this.m_lockObject.realseThread();
                  this.m_lockObject.notifyOtherThread();//通知全部的休眠線程
                  synchronized (this.threadList) {            
                      this.threadList.remove(Thread.currentThread());
                      if(this.threadList.size()==0){
                            this.scheduleManager.unRegisterScheduleServer();
                      }
                  }
                  return;
                }
                
                //加載調度任務
                if(this.isMutilTask == false){
                  executeTask = this.getScheduleTaskId();
                }else{
                  executeTask = this.getScheduleTaskIdMulti();
                }
                
                if(executeTask == null){
                  break;
                }
                
                try {//運行相關的程序
                  startTime =scheduleManager.scheduleCenter.getSystemTime();
                  if (this.isMutilTask == false) {
                        if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) {
                            addSuccessNum(1, scheduleManager.scheduleCenter.getSystemTime()
                                    - startTime,
                                    "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                        } else {
                            addFailNum(1, scheduleManager.scheduleCenter.getSystemTime()
                                    - startTime,
                                    "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                        }
                    } else {
                        if (((IScheduleTaskDealMulti) this.taskDealBean)
                                .execute((Object[]) executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) {
                            addSuccessNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime()
                                    - startTime,
                                    "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                        } else {
                            addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime()
                                    - startTime,
                                    "com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");
                        }
                    } 
                }catch (Throwable ex) {
                    if (this.isMutilTask == false) {
                        addFailNum(1,scheduleManager.scheduleCenter.getSystemTime()- startTime,
                                "TBScheduleProcessor.run");
                    } else {
                        addFailNum(((Object[]) executeTask).length, scheduleManager.scheduleCenter.getSystemTime()
                                - startTime,
                                "TBScheduleProcessor.run");
                    }
                    logger.warn("Task :" + executeTask + " 處理失敗", ex);                
                }
              }
              //當前隊列中全部的任務都已經完成了。
                if(logger.isTraceEnabled()){
                   logger.trace(Thread.currentThread().getName() +":當前運行線程數量:" +this.m_lockObject.count());
                }
                if (this.m_lockObject.realseThreadButNotLast() == false) {
                    int size = 0;
                    Thread.currentThread().sleep(100);
                    startTime =scheduleManager.scheduleCenter.getSystemTime();
                    // 裝載數據
                    size = this.loadScheduleData();
                    if (size > 0) {
                        this.m_lockObject.notifyOtherThread();
                    } else {
                        //判斷當沒有數據的是否,是否須要退出調度
                        if (this.isStopSchedule == false && this.scheduleManager.isContinueWhenData()== true ){                         
                            if(logger.isTraceEnabled()){
                                   logger.trace("沒有裝載到數據,start sleep");
                            }
                            this.isSleeping = true;
                            Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());
                            this.isSleeping = false;
                            
                            if(logger.isTraceEnabled()){
                                   logger.trace("Sleep end");
                            }
                        }else{
                            //沒有數據,退出調度,喚醒全部沉睡線程
                            this.m_lockObject.notifyOtherThread();
                        }
                    }
                    this.m_lockObject.realseThread();
                } else {// 將當前線程放置到等待隊列中。直到有線程裝載到了新的任務數據
                    if(logger.isTraceEnabled()){
                           logger.trace("不是最後一個線程,sleep");
                    }
                    this.m_lockObject.waitCurrentThread();
                }
            }
          }
          catch (Throwable e) {
              logger.error(e.getMessage(), e);
          }
        }
View Code

 

TBSchedule思考與挑戰

1)Zookeeper節點遍歷優化

列如存在上圖目錄節點,原有的查找節點數有些問題,不能徹底刪除目錄。我這邊提供的思想是遞歸查找目錄樹。最終結果爲A-B-E-C-F-D,刪除節點的時候從最後一個節點刪除,不會出現子目錄存在而直接刪除父節點的操做。代碼以下:

    /**
     * 使用遞歸遍歷全部結點
     * 
     * @param zk
     * @param path
     * @param dealList
     * @throws Exception
     * @throws InterruptedException
     */
    private static void getTree(ZooKeeper zk, String path, List<String> dealList)
            throws Exception, InterruptedException {
        //添加父目錄
        dealList.add(path);
        List<String> children = zk.getChildren(path, false);
        if (path.charAt(path.length() - 1) != '/') {
            path = path + "/";
        }
        //添加子目錄
        for (int i = 0; i < children.size(); i++) {
            getTree(zk, path + children.get(i), dealList);
        }

    }

2)線程優化

經過上面TBSchedule的源碼分析,咱們知道一個任務調度處理器,會建立一個timer根據cron表達式執行resume和pause操做。每一次resume都會建立TBScheduleProcessorSleep,而後初始化多個線程。

當該timer進行N次調度resume的時候,也就是系統會建立N*threadNum個線程,執行pause操做,則這些線程將會銷燬。個人建議是每個任務調度處理器,都指定1個線程數的cacheThreadPool線程池。可能會有人說,爲什麼不指定一個ThreadNum數的fixedThreadPool。由於當timer執行屢次resume的時候,若是上一次的resume尚未完成,線程池中沒有空閒的線程來執行新的task,會形成線程依賴而下一調度的超時或者失敗。指定cacheThreadPool,根據ThreadNum值向線程池submit   ThreadNum個runnable對象。

3)鎖優化

在任務執行器TBScheduleProcessorSleep中,t經過加載任務item (List<TaskItemDefine> taskItems),執行taskDealBean.selectTasks方法。獲取到的數據存放在CopyOnWriteArrayList中。這裏簡單的介紹下寫時拷貝容器CopyOnWriteArrayList,其對併發讀不會加鎖,而對併發寫同步,當有一個寫請求,首先獲取鎖 ,而後複製當前容器內數據,進行增刪改,最後替換掉原有的數組引用,從而達到現場安全的目的,實際上該容器很是適合讀多寫少的場景。而目前的場景並無讀get的操做。獲取容器元素調用remove()方法,一樣獲取鎖。

既然讀已經同步,那麼在獲取任務的時候,就不須要加synchronized關鍵字了。原有代碼以下:

public synchronized Object getScheduleTaskId() { //能夠去除synchronized
if (this.taskList.size() > 0) return this.taskList.remove(0); // 按正序處理 return null; }

4)設計優化

整個TBSchedule的調度,默認兩秒內會執行refresh()操做,中止全部的任務調度器而後從新建立新的任務調度器。這樣的好處可使得某一個調度節點宕機,或者網絡緣由致使心跳失敗,再或者在控制檯修改了調度策略配置信息。能夠動態的生效。可是若是可以基於ZK的watch機制,對系統的消耗會更小。因爲在factory目錄下建立的都是瞬時節點,若是某一個server宕機。zk會watch到相應的事件。一樣,在ScheduleTaskType下的數據發生改變,zk一樣能夠watch到相應的事件。若是發現出現了上述幾種狀況,那麼TBSchedule能夠執行refresh()操做了。

 

總結

  TBSchedule的使用場景仍是很是普遍,如定時數據同步,日誌上報等等。不一樣於quartz的搶佔式任務調度,TBSchedule更側重於任務多分片並行處理,基於分佈式集羣提升任務處理能力。知其然且知其因此然有助於更好的使用框架,並解決實際問題。

 

 

 

更多資料:http://geek.csdn.net/news/detail/65738

源碼:http://code.taobao.org/p/tbschedule/src/

 

 

 

相關文章
相關標籤/搜索