首先學習一個開源項目,必定要先學習該開源項目的使用方法。該項目的使用方法本文再也不詳述。請參考博文: java
http://pinsir.iteye.com/blog/882275 mysql
http://pinsir.iteye.com/blog/882518 git
http://pinsir.iteye.com/blog/882525 sql
經過學習開源項目的使用方法,來掌握開源項目實現的功能特性,瞭解開源項目中引入的不少概念和術語的含義,這對於後面閱讀分析源碼有很是大的幫助。 數據庫
該項目的源碼網上已經很難找到了,給你們分享一下。http://git.oschina.net/ywbrj042/taobao-pamirs-schedule-2.0.3.6 安全
本項目相對很是簡單,對於閱讀源碼來講也很是簡單,它只有一個工程構成,並且工程中也只有一個包構成。若是是相對複雜的項目通常每每由多個工程組成,並且每一個工程裏也會有很是多的包,那麼咱們閱讀源碼的時候就必定要先了解各工程的職責和實現的功能,瞭解各工程之間的關係。 服務器
再選擇具體的工程進入閱讀分析,進入到具體工程後,則同樣要了解工程內各包的職責和實現的功能,還包括包之間的關係。 多線程
掌握了這些內容以後,後期咱們閱讀源碼的時候纔不會迷茫,咱們腦中就掌握了很好的導航結構,什麼功能和代碼要去何處查找就很是容易了。 運維
一個好的開源項目必定會有一個設計合理的類結構,咱們能夠找到相關的類結構圖輔助分析源代碼。由於沒有找到本項目的類設計圖,我根據源碼畫出了本項目的核心類設計圖。圖中只畫出了關鍵的類屬性和方法,去除了不少細節。 分佈式
這是淘寶調度的核心組件,從圖中能夠看出,它聚合了全部的其它類協同對外提供分佈式定時調度服務。它的職責有這些:
1.啓動及中止定時調度器。
2.暫停及恢復定時調度器。
3.定時向集中的數據配置中心更新當前調度服務器的心跳狀態。
4.向數據配置中心獲取全部服務器的狀態來從新計算任務的分配。這麼作的目標是避免集中任務調度中心的單點問題。
5.在每一個批次數據處理完畢後,檢查是否有其它處理服務器申請本身把持的任務隊列,若是有,則釋放給相關處理服務器。
6.若是當前服務器在處理當前任務的時候超時,須要清除當前隊列,並釋放已經把持的任務。並向控制主動中心報警。
7.定時調度處理器處理任務。啓動定時器,根據定時表達式配置,當達到可執行實現,則建立IScheduleProcessor對象並執行任務。
調度管理器工廠。該工廠負責建立TBScheduleManager對象,並負責組裝其聚合的其它一些對象。該類的實現是依賴Spring容器環境的,所以使用該類建立
TBScheduleManager必須是在Spring容器環境下的應用。它從Spring容器中得到任務執行器對象。
調度處理器。該處理器的職責是負責執行任務處理,它會調用任務執行器獲取任務,啓動多線程來處理這些任務。該接口包含的職責有。
1.判斷組件是否睡眠。
2.判斷分配的數據是否處理完。
3.清除已經分配的數據。
4.中止任務調度。
我的聚的該接口設計的很是不合理,它的核心職責沒有體如今它的接口定義上,而是在具體的實現中執行了這些操做,它在對象構造的時候就啓動了一個定時任務及多個線程來處理任務。
1.接收一個unix格式的cron表達式。
2.計算下一個符合要求的時間。
任務執行器。該接口的職責有這些:
1.獲取任務列表。
2.處理任務。處理任務有兩種方式,單個任務處理及多個任務批量處理。
這個接口是提供給業務開發方根據本身的業務實現的。業務查詢本身的業務任務隊列,將查詢到的任務隊列中的任務調用處理任務方法處理。
IScheduleConfigCenterClient
這是調度器配置中心接口,提供了對於各類調度配置信息的存取操做。默認提供了基於數據庫的配置中心實現,目前支持mysql和oralce數據庫的實現。
該組件的職責有這些:
1.基本任務信息的獲取、建立、刪除等操做。
2.任務運行期信息的清除、查詢等操做。
3.服務的任務隊列信息查詢、釋放。
4.任務隊列的查詢、清除等操做,任務隊列數量查詢操做。
5.服務器信息的註冊、查詢、清除過時和註銷等操做。
6.從新分配任務隊列操做。
7.刷新服務器心跳信息。
其它的類都是該接口的實現類,項目自帶的是基於數據庫的實現類及輔助類。
IScheduleAlert
調度器報警組件。讀運行期間的異常狀況進行監控報警,目前提供的功能有:
1.超過10個心跳週期尚未獲取到調度隊列報警。
2.超過10個心跳週期,尚未進行從新裝載操做報警。
/** * 任務類型 */ private String baseTaskType; /** * 向配置中心更新心跳信息的頻率 */ private long heartBeatRate = 5*1000;//1分鐘 /** * 判斷一個服務器死亡的週期。爲了安全,至少是心跳週期的兩倍以上 */ private long judgeDeadInterval = 1*60*1000;//2分鐘 /** * 當沒有數據的時候,休眠的時間 * */ private int sleepTimeNoData = 500; /** * 在每次數據處理晚後休眠的時間 */ private int sleepTimeInterval = 0; /** * 任務隊列數量 */ private int taskQueueNumber = -1; /** * 每次獲取數據的數量 */ private int fetchDataNumber = 500; /** * 在批處理的時候,每次處理的數據量 */ private int executeNumber =1; private int threadNumber = 5; /** * 調度器類型 */ private String processorType="NOTSLEEP" ; /** * 容許執行的開始時間 */ private String permitRunStartTime; /** * 容許執行的開始時間 */ private String permitRunEndTime; /** * 清除過時環境信息的時間間隔,以天爲單位 */ private double expireOwnSignInterval = 1; /** * 處理任務的BeanName */ private String dealBeanName; /** * 版本號 */ private long version;
private long id; /** * 任務類型:原始任務類型+"-"+ownSign */ private String taskType; /** * 原始任務類型 */ private String baseTaskType; /** * 環境 */ private String ownSign; /** * 最後一次任務分配的時間 */ private Timestamp lastAssignTime; /** * 最後一次執行任務分配的服務器 */ private String lastAssignUUID; private Timestamp gmtCreate; private Timestamp gmtModified;
/* * 全局惟一編號 */ private String uuid; private long id; /** * 任務類型 */ private String taskType; /** * 原始任務類型 */ private String baseTaskType; private String ownSign; /** * 機器IP地址 */ private String ip; /** * 機器名稱 */ private String hostName; /** * 調度服務器遠程控制端口 */ int managerPort; String jmxUrl; /** * 數據處理線程數量 */ private int threadNum; /** * 服務開始時間 */ private Timestamp registerTime; /** * 最後一次心跳通知時間 */ private Timestamp heartBeatTime; /** * 處理描述信息,例如讀取的任務數量,處理成功的任務數量,處理失敗的數量,處理耗時 * FetchDataCount=4430,FetcheDataNum=438570,DealDataSucess=438570,DealDataFail=0,DealSpendTime=651066 */ private String dealInfoDesc; private String nextRunStartTime; private String nextRunEndTime; /** * 配置中心的當前時間 */ private Timestamp centerServerTime; /** * 數據版本號 */ private long version;
任務隊列信息。調度管理器啓動後會申請分配對應的任務隊列,之後該服務器就會只處理它負責的這些隊列的任務的處理。
/** * 處理任務類型 */ private String taskType; /** * 原始任務類型 */ private String baseTaskType; /** * 隊列的環境標識 */ private String ownSign; /** * 任務隊列ID */ private String taskQueueId; /** * 持有當前任務隊列的任務處理器 */ private String currentScheduleServer; /** * 正在申請此任務隊列的任務處理器 */ private String requestScheduleServer; /** * 數據版本號 */ private long version;