詳解應對平臺高併發的分佈式調度框架TBSchedule

  tbschedule是一款很是優秀的高性能分佈式調度框架,很是高興能分享給你們。這篇文章是我結合多年tbschedule使用經驗和研讀三遍源碼的基礎上完成的,期間和阿里空玄有過很多技術交流,很是感謝空玄給予的大力支持。我寫這篇文章的目的一是出於對tbschedule的一種熱愛,二是如今是一個資源共享、技術共享的時代,但願把它展示給你們(送人玫瑰,手留餘香),能給你們的工做帶來幫助。 

    1、tbschedule初識 

        時下互聯網和電商領域,各個平臺都存在大數據、高併發的特色,對數據處理的要求愈來愈高,既要保證高效性,又要保證安全性、準確性。tbschedule的使命就是將調度做業從業務系統中分離出來,下降或者是消除和業務系統的耦合度,進行高效異步任務處理。其實在互聯網和電商領域tbschedule的使用很是普遍,目前被應用於阿里巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等不少互聯網企業的流程調度系統。 
        在深刻了解tbschedule以前咱們先從內部和外部形態對它有個初步認識,如圖1.一、圖1.2。 
 

 
        從tbschedule的內部形態來講,與他有關的關鍵詞包括批量任務、動態擴展、多主機、多線程、併發、分片……,這些詞看起來很是的高大上,都是時下互聯網技術比較流行的詞彙。從tbschedule的外部架構來看,一目瞭然,宿主在調度應用中與zookeeper進行通訊。一個框架結構是不是優秀的,從美感的角度就能夠看出來,一個好的架構必定是隱藏了內部複雜的原理,外部視覺上美好的,讓用戶使用起來簡單易懂。  

    2、tbschedule原理 
        爲何TBSchedule值得推廣呢? 

       傳統的調度框架spring task、quartz也是能夠進行集羣調度做業的,一個節點掛了能夠將任務漂移給其餘節點執行從而避免單點故障,可是不支持分佈式做業,一旦達到單機處理極限也會存在問題。 
       elastic-job支持分佈式,是一個很好的調度框架,可是開源時間較短,尚未經歷大範圍市場考驗。 
       Beanstalkd基於C語言開發,使用範圍較小,沒法引入到php、java系統平臺。 

        tbschedule到底有多強大呢?我對tbschedule的優點特色進行了以下總結: 
        一、支持集羣、分佈式 
        二、靈活的任務分片 
        三、動態的服務擴容和資源回收 
        四、任務監控支持 
        tbschedule支持cluster,能夠宿主在多臺服務器多個線程組並行進行任務調度,或者說能夠將一個大的任務拆成多個小任務分配到不一樣的服務器。tbschedule的分佈式機制是經過靈活的sharding方式實現的,好比能夠按全部數據的ID按10取模分片(分片規則如圖2.1)、按月份分片等等,根據不一樣的需求,不一樣的場景由客戶端配置分片規則。咱們知道spring task、quarz也是能夠進行集羣調度做業的,一個節點掛了能夠將任務漂移給其餘節點執行從而避免單點故障,可是不支持分佈式做業,一旦達到單機處理極限也會存在問題,這個就是tbschedule的優點。而後就是tbschedule的宿主服務器能夠進行動態擴容和資源回收,這個特色主要是由於它後端依賴的zookeeper,這裏的zookeeper對於tbschedule來講是一個nosql,用於存儲數據,它的數據結構相似文件系統的目錄結構,它的節點有臨時節點、持久節點之分。調度引擎上線後,隨着業務量數據量的增多,當前cluster可能不能知足目前的處理需求,那麼就須要增長服務器數量,一個新的服務器上線後會在zk中建立一個表明當前服務器的一個惟一性路徑(臨時節點),而且新上線的服務器會和zk有長鏈接,當通訊斷開後,節點會自動摘除。tbschedule會定時掃描當前服務器的數量,從新分配任務。tbschedule不只提供了服務端的高性能調度服務,還提供了一個scheduleConsole war隨着宿主應用的部署直接部署到服務器,能夠經過web的方式對調度的任務、策略進行監控,實時更新。 

 

        是否是已經對tbschedule稍微了有些好感呢?咱們接着往下看。 
        tbschedule提供了兩個核心組件ScheduleServer、TbscheduleManagerFactory和兩類核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,這兩部分是客戶端研發的關鍵部分,是使用tbschedule必需要了解的。 
        ScheduleServer即任務處理器,的主要做用是任務和策略的管理、任務採集和執行,由一組工做線程組成,這組工做線程是基於隊列實現的,進行任務抓取和任務處理(two mode)。每一個任務處理器和zookeeper有一個心跳通訊鏈接,用於檢測server的狀態和進行任務動態分配,舉個例子,好比3服務器的worker集羣執行出票消息生成任務,這對這個任務類型每臺服務器能夠配置一個ScheduleSever(即一個線程組),也能夠配置兩個線程組,至關於6臺服務器在執行這個任務。當某臺服務器宕機或者其餘緣由與zk通訊斷開時,他的任務將被其餘服務器接管。ScheduleServer參數定義如圖2.2 

 

        在這些參數中taskItems是一個很是重要的屬性,是客戶單能夠自由發揮的地方,是任務分片的基礎,好比咱們處理一個任務能夠根據ID按10取模,那麼任務項就是0-9,3臺服務器分別拿到四、 三、 3個任務項,服務器的上下線都會對任務項進行從新分配。任務項是進行任務分配的最小單位。一個任務項只能由一個ScheduleServer來進行處理,但一個Server能夠處理任意數量的任務項。這就是剛纔咱們說的分片特性。 
        調度服務器TbscheduleManagerFactory的主要工做zookeeper鏈接參數配置和zookeeper的初始化、調度管理。 
        兩類核心接口是須要被咱們定義的目標任務實現的,根據本身的須要進行任務採集(重寫selectTasks方法)和任務執行(重寫execute方法),這類接口是客戶端研發自由發揮的地方。 
        接下來咱們深刻了解下tbschedule,看看它的內部是如何實現的。下面流程圖是我花了不少心血經過一週時間畫出來的,基本是清晰的展示了tbschedule內部的執行流程以及每一個步驟zookeer節點路徑和數據的變化。由於圖中的註釋已經描述的很詳細了,每一個節點右側是zk的信息(數據結構見圖2.3),這裏就再也不作過多的文字描述了,有任何建議或者不明白的地方能夠找我交流。 








 

        Tbschedule還有個強大之處是它提供了兩種處理器模式模式: 
        一、SLEEP模式 
        當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處於活動狀態。若是是,則本身休眠;若是其它線程都已經由於沒有任務進入休眠,當前線程是最後一個活動線程的時候,就調用業務接口,獲取須要處理的任務,放入任務池中,同時喚醒其它休眠線程開始工做。 
        二、NOTSLEEP模式 
        當一個線程任務處理完畢,從任務池中取不到任務的時候,當即調用業務接口獲取須要處理的任務,放入任務池中。 
        SLEEP模式內部邏輯相對較簡單,若是遇到大任務須要處理較長時間,可能會形成其餘線程被動阻塞的狀況。但其實生產環境通常都是小而快的任務,即便出現阻塞的狀況ScheduleConsole也會及時的監控到。NOTSLEEP模式減小了線程休眠的時間,避免了因大任務形成阻塞的狀況,但爲了不數據被重複處理,增長了CPU在數據比較上的開銷。TBSchedule默認是SLEEP模式。 

        到目前爲止我相信你們對tbschedule有了一個深入的瞭解,心中的疑霧逐漸散開了。理論是實踐的基礎,實踐纔是最終的目的,下一節咱們將結合理論知識進行tbschedule實戰. 

    3、tbschedule實戰 

        在項目中使用tbschedule須要依賴zookeeper、tbschedule 
        zookeeper依賴: 

   
Java代碼   收藏代碼
  1. <dependency>          
  2. <groupId>org.apache.zookeeper</groupId  
  3. <artifactId>zookeeper</artifactId>        
  4. <version>3.4.6</version>      
  5. </dependency>  
  6.       


       tbschedule依賴: 

  
Java代碼   收藏代碼
  1. <dependency>  
  2. <groupId>com.taobao.pamirs.schedule</groupId  
  3. <artifactId>tbschedule</artifactId>  
  4. <version>3.3.3.2</version>  
  5. </dependency>  
  6.      


        tbschedule有三種引入方式: 
        一、經過ScheduleConsole引入 
        Tbschedule隨着宿主調度應用部署到服務器後,能夠經過web瀏覽器的方式訪問其提供監控平臺。 
        第一步,初始化zookeeper 



        第二步,建立調度策略 



        第三步,建立調度任務 



        第四步,監控調度任務 



        二、經過原生java引入 

    
Java代碼   收藏代碼
  1.        // 初始化Spring  
  2.        ApplicationContext ctx = new FileSystemXmlApplicationContext(  
  3.                "spring-config.xml");  
  4.   
  5.        // 初始化調度工廠  
  6.        TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();  
  7.   
  8.        Properties p = new Properties();  
  9.        p.put("zkConnectString""127.0.0.1:2181");  
  10.        p.put("rootPath""/taobao-schedule/train_worker");  
  11.        p.put("zkSessionTimeout""60000");    
  12.        p.put("userName""train_dev");  
  13.        p.put("password"" train_dev ");  
  14.        p.put("isCheckParentPath""true");  
  15.   
  16.        scheduleManagerFactory.setApplicationContext(ctx);  
  17.          
  18.        scheduleManagerFactory.init(p);   
  19.   
  20.       // 建立任務調度任務的基本信息  
  21.        String baseTaskTypeName = "DemoTask";  
  22. ScheduleTaskType baseTaskType = new ScheduleTaskType();  
  23.        baseTaskType.setBaseTaskType(baseTaskTypeName);  
  24.        baseTaskType.setDealBeanName("demoTaskBean");  
  25.        baseTaskType.setHeartBeatRate(10000);  
  26.        baseTaskType.setJudgeDeadInterval(100000);  
  27.        baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");  baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(  
  28.             "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +  
  29.             "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +  
  30.             "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));  
  31. baseTaskType.setFetchDataNumber(500);  
  32. baseTaskType.setThreadNumber(5);  
  33. this.scheduleManagerFactory.getScheduleDataManager()  
  34.     .createBaseTaskType(baseTaskType);  
  35. log.info("建立調度任務成功:" + baseTaskType.toString());  
  36.   
  37. // 建立任務的調度策略  
  38. String taskName = baseTaskTypeName;  
  39. String strategyName =taskName +"-Strategy";  
  40. try {  
  41.     this.scheduleManagerFactory.getScheduleStrategyManager()  
  42.             .deleteMachineStrategy(strategyName,true);  
  43. catch (Exception e) {  
  44.     e.printStackTrace();  
  45. }  
  46. ScheduleStrategy strategy = new ScheduleStrategy();  
  47. strategy.setStrategyName(strategyName);  
  48. strategy.setKind(ScheduleStrategy.Kind.Schedule);  
  49. strategy.setTaskName(taskName);  
  50. strategy.setTaskParameter("china");  
  51.       
  52. strategy.setNumOfSingleServer(1);  
  53. strategy.setAssignNum(10);  
  54. strategy.setIPList("127.0.0.1".split(","));  
  55. this.scheduleManagerFactory.getScheduleStrategyManager()  
  56.         .createScheduleStrategy(strategy);  
  57. log.info("建立調度策略成功:" + strategy.toString());  


        三、經過spring容器引入 

Java代碼   收藏代碼
  1. <!-- 初始化zookeeper -->    
  2. <bean id="scheduleManagerFactory"  
  3.             class="com.xx.TBScheduleManagerFactory" init-method="init">  
  4. <property name="zkConfig">  
  5. <map>  
  6.     <entry key="zkConnectString" value="127.0.0.1:2181" />  
  7.     <entry key="rootPath" value="/taobao-schedule/train_worker" />  
  8.     <entry key="zkSessionTimeout" value="60000" />  
  9.     <entry key="userName" value="train_dev" />  
  10.     <entry key="password" value="train_dev" />  
  11.     <entry key="isCheckParentPath" value="true" />  
  12. </map>  
  13. </property>     
  14. </bean>  
  15. <!-- 配置調度策略 凌晨1點到3點執行 -->  
  16. <bean id="abstractDemoScheduleTask" class="com.xx.core.tbschedule.InitMyScheduleTask" abstract="true">  
  17. <property name="scheduleTaskType.heartBeatRate" value="10000" />  
  18. <property name="scheduleTaskType.judgeDeadInterval" value="100000" />  
  19. <property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/>   
  20. <property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>    
  21. <property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />  
  22. <property name="scheduleTaskType.sleepTimeNoData" value="60000"/>  
  23. <property name="scheduleTaskType.sleepTimeInterval" value="60000"/>  
  24. <property name="scheduleTaskType.fetchDataNumber" value="500" />  
  25. <property name="scheduleTaskType.executeNumber" value="1" />  
  26. <property name="scheduleTaskType.threadNumber" value="5" />  
  27. <property name="scheduleTaskType.taskItems">   
  28. <list>  
  29.         <value>0:{TYPE=A,KIND=1}</value>  
  30.         <value>1:{TYPE=A,KIND=2}</value>  
  31.         <value>2:{TYPE=A,KIND=3}</value>  
  32.         <value>3:{TYPE=A,KIND=4}</value>  
  33.         <value>4:{TYPE=A,KIND=5}</value>  
  34.         <value>5:{TYPE=A,KIND=6}</value>  
  35.         <value>6:{TYPE=A,KIND=7}</value>  
  36.         <value>7:{TYPE=A,KIND=8}</value>  
  37.         <value>8:{TYPE=A,KIND=9}</value>  
  38.         <value>9:{TYPE=A,KIND=10}</value>  
  39.     </list>  
  40. </property>  
  41. <property name="scheduleStrategy.kind" value="Schedule" />  
  42. <property name="scheduleStrategy.numOfSingleServer" value="1" />  
  43. <property name="scheduleStrategy.assignNum" value="10" />   
  44.     <property name="scheduleStrategy.iPList">  
  45.         <list>  
  46.             <value>127.0.0.1</value>  
  47.         </list>  
  48.     </property>  
  49.     </bean>          
  50. <!-- 配置調度任務 -->  
  51. <bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">  
  52. <property name="scheduleTaskType.baseTaskType" value="demoTask" />  
  53. <property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />  
  54. <property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />  
  55. <property name="scheduleStrategy.taskName" value="demoTaskBean" />  
  56. </bean>     


    調度任務具體實現 DemoTask.java 

   
Java代碼   收藏代碼
  1. /** 
  2. * DemoTask任務類 
  3. */  
  4. ublic class DemoTask  mplements  
  5.     IScheduleTaskDealSingle,TScheduleTaskDeal {  
  6.   
  7. /** 
  8.  * 數據採集 
  9.  * @param taskItemNum--分配的任務項 taskItemList--總任務項  
  10.  *        eachFetchDataNum--採集任務數量 
  11.  */  
  12. @Override  
  13. public List<DemoTask> selectTasks(String taskParameter,  
  14.         String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,  
  15.         int eachFetchDataNum) throws Exception {  
  16.     List<DemoTask> taskList = new LinkedList<DemoTask>();  
  17.     //客戶端根據條件進行數據採集start  
  18.       
  19.     //客戶端根據條件進行數據採集end  
  20.     return rt;  
  21. }  
  22. **  
  23.  * 數據處理  
  24.  */  
  25. @Override  
  26. public boolean execute(DemoTask task, String ownSign)  
  27.         throws Exception {  
  28.         //客戶端pop任務進行處理start  
  29.       
  30.     //客戶端pop任務進行處理end  
  31.     return true;  
  32. }  
   

        其實咱們看對於tbscchedule客戶端的使用很是簡單,初始化zk、配置調度策略、調度任務,對調度任務進行實現,就這幾個步驟。如今能夠慶祝下了,你又掌握了一個優秀開源框架的設計思想和使用方式。 

    4、tbschedule挑戰 

        任何事物都是沒有最好只有更好,tbschedule也同樣,雖然它如今已經很完美了,咱們不能放棄對更完美的追求。阿里團隊能夠在下面幾個方面進行優化。 
        一、ScheduleConsole監控頁面優化,目前ScheduleConsole監控頁面過於簡單,需完善UI設計,提升用戶體驗。 
        二、Zookeeper集羣自動切換,避免zk服務的集羣點多故障 
        三、原生zookeeper操做替換爲curator,Curator對ZooKeeper進行了一次包裝,對原生ZooKeeper的操做作了大量優化,Client和Server之間的鏈接可能出現的問題處理等等,能夠進一步提升TBSchedule的高可用。 
        四、幫助文檔較少,網上的資料基本是千篇一概,但願有更多的愛好者加入進來。 

   至此,咱們已經完成了對tbschedule的所有介紹,儘快使用起來吧! 
   個人博客:http://mycolababy.iteye.com/ 
相關文章
相關標籤/搜索