詳解應對平臺高併發的分佈式調度框架TBSchedule
tbschedule是一款很是優秀的高性能分佈式調度框架,很是高興能分享給你們。這篇文章是我結合多年tbschedule使用經驗和研讀三遍源碼的基礎上完成的,期間和阿里空玄有過很多技術交流,很是感謝空玄給予的大力支持。我寫這篇文章的目的一是出於對tbschedule的一種熱愛,二是如今是一個資源共享、技術共享的時代,但願把它展示給你們(送人玫瑰,手留餘香),能給你們的工做帶來幫助。
1、tbschedule初識
時下互聯網和電商領域,各個平臺都存在大數據、高併發的特色,對數據處理的要求愈來愈高,既要保證高效性,又要保證安全性、準確性。tbschedule的使命就是將調度做業從業務系統中分離出來,下降或者是消除和業務系統的耦合度,進行高效異步任務處理。其實在互聯網和電商領域tbschedule的使用很是普遍,目前被應用於阿里巴巴、淘寶、支付寶、京東、聚美、汽車之家、國美等不少互聯網企業的流程調度系統。
在深刻了解tbschedule以前咱們先從內部和外部形態對它有個初步認識,如圖1.一、圖1.2。
從tbschedule的內部形態來講,與他有關的關鍵詞包括批量任務、動態擴展、多主機、多線程、併發、分片……,這些詞看起來很是的高大上,都是時下互聯網技術比較流行的詞彙。從tbschedule的外部架構來看,一目瞭然,宿主在調度應用中與zookeeper進行通訊。一個框架結構是不是優秀的,從美感的角度就能夠看出來,一個好的架構必定是隱藏了內部複雜的原理,外部視覺上美好的,讓用戶使用起來簡單易懂。
2、tbschedule原理
爲何TBSchedule值得推廣呢?
傳統的調度框架spring task、quartz也是能夠進行集羣調度做業的,一個節點掛了能夠將任務漂移給其餘節點執行從而避免單點故障,可是不支持分佈式做業,一旦達到單機處理極限也會存在問題。
elastic-job支持分佈式,是一個很好的調度框架,可是開源時間較短,尚未經歷大範圍市場考驗。
Beanstalkd基於C語言開發,使用範圍較小,沒法引入到php、java系統平臺。
tbschedule到底有多強大呢?我對tbschedule的優點特色進行了以下總結:
一、支持集羣、分佈式
二、靈活的任務分片
三、動態的服務擴容和資源回收
四、任務監控支持
tbschedule支持cluster,能夠宿主在多臺服務器多個線程組並行進行任務調度,或者說能夠將一個大的任務拆成多個小任務分配到不一樣的服務器。tbschedule的分佈式機制是經過靈活的sharding方式實現的,好比能夠按全部數據的ID按10取模分片(分片規則如圖2.1)、按月份分片等等,根據不一樣的需求,不一樣的場景由客戶端配置分片規則。咱們知道spring task、quarz也是能夠進行集羣調度做業的,一個節點掛了能夠將任務漂移給其餘節點執行從而避免單點故障,可是不支持分佈式做業,一旦達到單機處理極限也會存在問題,這個就是tbschedule的優點。而後就是tbschedule的宿主服務器能夠進行動態擴容和資源回收,這個特色主要是由於它後端依賴的zookeeper,這裏的zookeeper對於tbschedule來講是一個nosql,用於存儲數據,它的數據結構相似文件系統的目錄結構,它的節點有臨時節點、持久節點之分。調度引擎上線後,隨着業務量數據量的增多,當前cluster可能不能知足目前的處理需求,那麼就須要增長服務器數量,一個新的服務器上線後會在zk中建立一個表明當前服務器的一個惟一性路徑(臨時節點),而且新上線的服務器會和zk有長鏈接,當通訊斷開後,節點會自動摘除。tbschedule會定時掃描當前服務器的數量,從新分配任務。tbschedule不只提供了服務端的高性能調度服務,還提供了一個scheduleConsole war隨着宿主應用的部署直接部署到服務器,能夠經過web的方式對調度的任務、策略進行監控,實時更新。
是否是已經對tbschedule稍微了有些好感呢?咱們接着往下看。
tbschedule提供了兩個核心組件ScheduleServer、TbscheduleManagerFactory和兩類核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,這兩部分是客戶端研發的關鍵部分,是使用tbschedule必需要了解的。
ScheduleServer即任務處理器,的主要做用是任務和策略的管理、任務採集和執行,由一組工做線程組成,這組工做線程是基於隊列實現的,進行任務抓取和任務處理(two mode)。每一個任務處理器和zookeeper有一個心跳通訊鏈接,用於檢測server的狀態和進行任務動態分配,舉個例子,好比3服務器的worker集羣執行出票消息生成任務,這對這個任務類型每臺服務器能夠配置一個ScheduleSever(即一個線程組),也能夠配置兩個線程組,至關於6臺服務器在執行這個任務。當某臺服務器宕機或者其餘緣由與zk通訊斷開時,他的任務將被其餘服務器接管。ScheduleServer參數定義如圖2.2
在這些參數中taskItems是一個很是重要的屬性,是客戶單能夠自由發揮的地方,是任務分片的基礎,好比咱們處理一個任務能夠根據ID按10取模,那麼任務項就是0-9,3臺服務器分別拿到四、 三、 3個任務項,服務器的上下線都會對任務項進行從新分配。任務項是進行任務分配的最小單位。一個任務項只能由一個ScheduleServer來進行處理,但一個Server能夠處理任意數量的任務項。這就是剛纔咱們說的分片特性。
調度服務器TbscheduleManagerFactory的主要工做zookeeper鏈接參數配置和zookeeper的初始化、調度管理。
兩類核心接口是須要被咱們定義的目標任務實現的,根據本身的須要進行任務採集(重寫selectTasks方法)和任務執行(重寫execute方法),這類接口是客戶端研發自由發揮的地方。
接下來咱們深刻了解下tbschedule,看看它的內部是如何實現的。下面流程圖是我花了不少心血經過一週時間畫出來的,基本是清晰的展示了tbschedule內部的執行流程以及每一個步驟zookeer節點路徑和數據的變化。由於圖中的註釋已經描述的很詳細了,每一個節點右側是zk的信息(數據結構見圖2.3),這裏就再也不作過多的文字描述了,有任何建議或者不明白的地方能夠找我交流。
Tbschedule還有個強大之處是它提供了兩種處理器模式模式:
一、SLEEP模式
當某一個線程任務處理完畢,從任務池中取不到任務的時候,檢查其它線程是否處於活動狀態。若是是,則本身休眠;若是其它線程都已經由於沒有任務進入休眠,當前線程是最後一個活動線程的時候,就調用業務接口,獲取須要處理的任務,放入任務池中,同時喚醒其它休眠線程開始工做。
二、NOTSLEEP模式
當一個線程任務處理完畢,從任務池中取不到任務的時候,當即調用業務接口獲取須要處理的任務,放入任務池中。
SLEEP模式內部邏輯相對較簡單,若是遇到大任務須要處理較長時間,可能會形成其餘線程被動阻塞的狀況。但其實生產環境通常都是小而快的任務,即便出現阻塞的狀況ScheduleConsole也會及時的監控到。NOTSLEEP模式減小了線程休眠的時間,避免了因大任務形成阻塞的狀況,但爲了不數據被重複處理,增長了CPU在數據比較上的開銷。TBSchedule默認是SLEEP模式。
到目前爲止我相信你們對tbschedule有了一個深入的瞭解,心中的疑霧逐漸散開了。理論是實踐的基礎,實踐纔是最終的目的,下一節咱們將結合理論知識進行tbschedule實戰.
3、tbschedule實戰
在項目中使用tbschedule須要依賴zookeeper、tbschedule
zookeeper依賴:
Java代碼
- <dependency>
- <groupId>org.apache.zookeeper</groupId
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
-
tbschedule依賴:
Java代碼
- <dependency>
- <groupId>com.taobao.pamirs.schedule</groupId
- <artifactId>tbschedule</artifactId>
- <version>3.3.3.2</version>
- </dependency>
-
tbschedule有三種引入方式:
一、經過ScheduleConsole引入
Tbschedule隨着宿主調度應用部署到服務器後,能夠經過web瀏覽器的方式訪問其提供監控平臺。
第一步,初始化zookeeper
第二步,建立調度策略
第三步,建立調度任務
第四步,監控調度任務
二、經過原生java引入
Java代碼
- // 初始化Spring
- ApplicationContext ctx = new FileSystemXmlApplicationContext(
- "spring-config.xml");
-
- // 初始化調度工廠
- TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
-
- Properties p = new Properties();
- p.put("zkConnectString", "127.0.0.1:2181");
- p.put("rootPath", "/taobao-schedule/train_worker");
- p.put("zkSessionTimeout", "60000");
- p.put("userName", "train_dev");
- p.put("password", " train_dev ");
- p.put("isCheckParentPath", "true");
-
- scheduleManagerFactory.setApplicationContext(ctx);
-
- scheduleManagerFactory.init(p);
-
- // 建立任務調度任務的基本信息
- String baseTaskTypeName = "DemoTask";
- ScheduleTaskType baseTaskType = new ScheduleTaskType();
- baseTaskType.setBaseTaskType(baseTaskTypeName);
- baseTaskType.setDealBeanName("demoTaskBean");
- baseTaskType.setHeartBeatRate(10000);
- baseTaskType.setJudgeDeadInterval(100000);
- baseTaskType.setTaskParameter("AREA=BJ,YEAR>30"); baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(
- "0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +
- "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +
- "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
- baseTaskType.setFetchDataNumber(500);
- baseTaskType.setThreadNumber(5);
- this.scheduleManagerFactory.getScheduleDataManager()
- .createBaseTaskType(baseTaskType);
- log.info("建立調度任務成功:" + baseTaskType.toString());
-
- // 建立任務的調度策略
- String taskName = baseTaskTypeName;
- String strategyName =taskName +"-Strategy";
- try {
- this.scheduleManagerFactory.getScheduleStrategyManager()
- .deleteMachineStrategy(strategyName,true);
- } catch (Exception e) {
- e.printStackTrace();
- }
- ScheduleStrategy strategy = new ScheduleStrategy();
- strategy.setStrategyName(strategyName);
- strategy.setKind(ScheduleStrategy.Kind.Schedule);
- strategy.setTaskName(taskName);
- strategy.setTaskParameter("china");
-
- strategy.setNumOfSingleServer(1);
- strategy.setAssignNum(10);
- strategy.setIPList("127.0.0.1".split(","));
- this.scheduleManagerFactory.getScheduleStrategyManager()
- .createScheduleStrategy(strategy);
- log.info("建立調度策略成功:" + strategy.toString());
三、經過spring容器引入
Java代碼
- <!-- 初始化zookeeper -->
- <bean id="scheduleManagerFactory"
- class="com.xx.TBScheduleManagerFactory" init-method="init">
- <property name="zkConfig">
- <map>
- <entry key="zkConnectString" value="127.0.0.1:2181" />
- <entry key="rootPath" value="/taobao-schedule/train_worker" />
- <entry key="zkSessionTimeout" value="60000" />
- <entry key="userName" value="train_dev" />
- <entry key="password" value="train_dev" />
- <entry key="isCheckParentPath" value="true" />
- </map>
- </property>
- </bean>
- <!-- 配置調度策略 凌晨1點到3點執行 -->
- <bean id="abstractDemoScheduleTask" class="com.xx.core.tbschedule.InitMyScheduleTask" abstract="true">
- <property name="scheduleTaskType.heartBeatRate" value="10000" />
- <property name="scheduleTaskType.judgeDeadInterval" value="100000" />
- <property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/>
- <property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>
- <property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />
- <property name="scheduleTaskType.sleepTimeNoData" value="60000"/>
- <property name="scheduleTaskType.sleepTimeInterval" value="60000"/>
- <property name="scheduleTaskType.fetchDataNumber" value="500" />
- <property name="scheduleTaskType.executeNumber" value="1" />
- <property name="scheduleTaskType.threadNumber" value="5" />
- <property name="scheduleTaskType.taskItems">
- <list>
- <value>0:{TYPE=A,KIND=1}</value>
- <value>1:{TYPE=A,KIND=2}</value>
- <value>2:{TYPE=A,KIND=3}</value>
- <value>3:{TYPE=A,KIND=4}</value>
- <value>4:{TYPE=A,KIND=5}</value>
- <value>5:{TYPE=A,KIND=6}</value>
- <value>6:{TYPE=A,KIND=7}</value>
- <value>7:{TYPE=A,KIND=8}</value>
- <value>8:{TYPE=A,KIND=9}</value>
- <value>9:{TYPE=A,KIND=10}</value>
- </list>
- </property>
- <property name="scheduleStrategy.kind" value="Schedule" />
- <property name="scheduleStrategy.numOfSingleServer" value="1" />
- <property name="scheduleStrategy.assignNum" value="10" />
- <property name="scheduleStrategy.iPList">
- <list>
- <value>127.0.0.1</value>
- </list>
- </property>
- </bean>
- <!-- 配置調度任務 -->
- <bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">
- <property name="scheduleTaskType.baseTaskType" value="demoTask" />
- <property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />
- <property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />
- <property name="scheduleStrategy.taskName" value="demoTaskBean" />
- </bean>
調度任務具體實現 DemoTask.java
Java代碼
- /**
- * DemoTask任務類
- */
- ublic class DemoTask mplements
- IScheduleTaskDealSingle,TScheduleTaskDeal {
-
- /**
- * 數據採集
- * @param taskItemNum--分配的任務項 taskItemList--總任務項
- * eachFetchDataNum--採集任務數量
- */
- @Override
- public List<DemoTask> selectTasks(String taskParameter,
- String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,
- int eachFetchDataNum) throws Exception {
- List<DemoTask> taskList = new LinkedList<DemoTask>();
- //客戶端根據條件進行數據採集start
-
- //客戶端根據條件進行數據採集end
- return rt;
- }
- **
- * 數據處理
- */
- @Override
- public boolean execute(DemoTask task, String ownSign)
- throws Exception {
- //客戶端pop任務進行處理start
-
- //客戶端pop任務進行處理end
- return true;
- }
其實咱們看對於tbscchedule客戶端的使用很是簡單,初始化zk、配置調度策略、調度任務,對調度任務進行實現,就這幾個步驟。如今能夠慶祝下了,你又掌握了一個優秀開源框架的設計思想和使用方式。
4、tbschedule挑戰
任何事物都是沒有最好只有更好,tbschedule也同樣,雖然它如今已經很完美了,咱們不能放棄對更完美的追求。阿里團隊能夠在下面幾個方面進行優化。
一、ScheduleConsole監控頁面優化,目前ScheduleConsole監控頁面過於簡單,需完善UI設計,提升用戶體驗。
二、Zookeeper集羣自動切換,避免zk服務的集羣點多故障
三、原生zookeeper操做替換爲curator,Curator對ZooKeeper進行了一次包裝,對原生ZooKeeper的操做作了大量優化,Client和Server之間的鏈接可能出現的問題處理等等,能夠進一步提升TBSchedule的高可用。
四、幫助文檔較少,網上的資料基本是千篇一概,但願有更多的愛好者加入進來。
至此,咱們已經完成了對tbschedule的所有介紹,儘快使用起來吧!
個人博客:http://mycolababy.iteye.com/
歡迎關注本站公眾號,獲取更多信息