xxl-job任務定時觸發流程

xxl-job任務觸發流程html

xxl-job老版本是依賴quartz的定時任務觸發,在v2.1.0版本開始 移除quartz依賴:一方面是爲了精簡系統下降冗餘依賴,另外一方面是爲了提供系統的可控度與穩定性。(本文 相應代碼版本 2.2.0-SNAPSHOT)java

如下是本文的目錄大綱:算法

一.任務觸發執行整體流程spring

  二.任務定時觸發流程sql

  三.關於這麼設計的感悟數據庫

請尊重做者勞動成果,轉載請標明原文連接:數據結構

http://www.javashuo.com/article/p-eyasciub-nq.html 分佈式

 

一 任務觸發執行整體流程

先來看下任務觸發和執行的 完整的任務觸發執行整體流程圖 如下:ide

 上圖所示左上角的 第一步:任務觸發方式 主要有如下幾種類型:1 根據設置的時間自動觸發JobScheduleHelper,2 頁面點擊操做按鈕執行觸發,3 父子任務觸發,4失敗重試觸發。性能

 本文重點講解 第一步:任務觸發 的第一種 1 根據設置的時間自動觸發,即上圖 紅色框內標示的部分,具體見JobScheduleHelper這個類。

 

二 任務定時觸發流程

 詳細的JobScheduleHelperCron定時觸發 這個階段流程圖以下:

具體見JobScheduleHelper這個類結合上面流程圖來分析,在工程spring啓動的時候 觸發了JobScheduleHelper類的start()方法,完整代碼以下

  1     public void start(){
  2 
  3         // schedule thread
  4         scheduleThread = new Thread(new Runnable() {
  5             @Override
  6             public void run() {
  7 
  8                 try {
  9                     TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
 10                 } catch (InterruptedException e) {
 11                     if (!scheduleThreadToStop) {
 12                         logger.error(e.getMessage(), e);
 13                     }
 14                 }
 15                 logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
 16 
 17                 // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
 18                 int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
 19 
 20                 while (!scheduleThreadToStop) {
 21 
 22                     // Scan Job
 23                     long start = System.currentTimeMillis();
 24 
 25                     Connection conn = null;
 26                     Boolean connAutoCommit = null;
 27                     PreparedStatement preparedStatement = null;
 28 
 29                     boolean preReadSuc = true;
 30                     try {
 31 
 32                         conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
 33                         connAutoCommit = conn.getAutoCommit();
 34                         conn.setAutoCommit(false);
 35 
 36                         preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
 37                         preparedStatement.execute();
 38 
 39                         // tx start
 40 
 41                         // 一、pre read
 42                         long nowTime = System.currentTimeMillis();
 43                         List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
 44                         if (scheduleList!=null && scheduleList.size()>0) {
 45                             // 二、push time-ring
 46                             for (XxlJobInfo jobInfo: scheduleList) {
 47 
 48                                 // time-ring jump
 49                                 if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
 50                                     // 2.一、trigger-expire > 5s:pass && make next-trigger-time
 51                                     logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
 52 
 53                                     // fresh next
 54                                     refreshNextValidTime(jobInfo, new Date());
 55 
 56                                 } else if (nowTime > jobInfo.getTriggerNextTime()) {
 57                                     // 2.二、trigger-expire < 5s:direct-trigger && make next-trigger-time
 58 
 59                                     // 一、trigger
 60                                     JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
 61                                     logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
 62 
 63                                     // 二、fresh next
 64                                     refreshNextValidTime(jobInfo, new Date());
 65 
 66                                     // next-trigger-time in 5s, pre-read again
 67                                     if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
 68 
 69                                         // 一、make ring second
 70                                         int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 71 
 72                                         // 二、push time ring
 73                                         pushTimeRing(ringSecond, jobInfo.getId());
 74 
 75                                         // 三、fresh next
 76                                         refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
 77 
 78                                     }
 79 
 80                                 } else {
 81                                     // 2.三、trigger-pre-read:time-ring trigger && make next-trigger-time
 82 
 83                                     // 一、make ring second
 84                                     int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 85 
 86                                     // 二、push time ring
 87                                     pushTimeRing(ringSecond, jobInfo.getId());
 88 
 89                                     // 三、fresh next
 90                                     refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
 91 
 92                                 }
 93 
 94                             }
 95 
 96                             // 三、update trigger info
 97                             for (XxlJobInfo jobInfo: scheduleList) {
 98                                 XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
 99                             }
100 
101                         } else {
102                             preReadSuc = false;
103                         }
104 
105                         // tx stop
106 
107 
108                     } catch (Exception e) {
109                         if (!scheduleThreadToStop) {
110                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
111                         }
112                     } finally {
113 
114                         // commit
115                         if (conn != null) {
116                             try {
117                                 conn.commit();
118                             } catch (SQLException e) {
119                                 if (!scheduleThreadToStop) {
120                                     logger.error(e.getMessage(), e);
121                                 }
122                             }
123                             try {
124                                 conn.setAutoCommit(connAutoCommit);
125                             } catch (SQLException e) {
126                                 if (!scheduleThreadToStop) {
127                                     logger.error(e.getMessage(), e);
128                                 }
129                             }
130                             try {
131                                 conn.close();
132                             } catch (SQLException e) {
133                                 if (!scheduleThreadToStop) {
134                                     logger.error(e.getMessage(), e);
135                                 }
136                             }
137                         }
138 
139                         // close PreparedStatement
140                         if (null != preparedStatement) {
141                             try {
142                                 preparedStatement.close();
143                             } catch (SQLException e) {
144                                 if (!scheduleThreadToStop) {
145                                     logger.error(e.getMessage(), e);
146                                 }
147                             }
148                         }
149                     }
150                     long cost = System.currentTimeMillis()-start;
151 
152 
153                     // Wait seconds, align second
154                     if (cost < 1000) {  // scan-overtime, not wait
155                         try {
156                             // pre-read period: success > scan each second; fail > skip this period;
157                             TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
158                         } catch (InterruptedException e) {
159                             if (!scheduleThreadToStop) {
160                                 logger.error(e.getMessage(), e);
161                             }
162                         }
163                     }
164 
165                 }
166 
167                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
168             }
169         });
170         scheduleThread.setDaemon(true);
171         scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
172         scheduleThread.start();
173 
174 
175         // ring thread
176         ringThread = new Thread(new Runnable() {
177             @Override
178             public void run() {
179 
180                 // align second
181                 try {
182                     TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
183                 } catch (InterruptedException e) {
184                     if (!ringThreadToStop) {
185                         logger.error(e.getMessage(), e);
186                     }
187                 }
188 
189                 while (!ringThreadToStop) {
190 
191                     try {
192                         // second data
193                         List<Integer> ringItemData = new ArrayList<>();
194                         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
195                         for (int i = 0; i < 2; i++) {
196                             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
197                             if (tmpData != null) {
198                                 ringItemData.addAll(tmpData);
199                             }
200                         }
201 
202                         // ring trigger
203                         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
204                         if (ringItemData.size() > 0) {
205                             // do trigger
206                             for (int jobId: ringItemData) {
207                                 // do trigger
208                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
209                             }
210                             // clear
211                             ringItemData.clear();
212                         }
213                     } catch (Exception e) {
214                         if (!ringThreadToStop) {
215                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
216                         }
217                     }
218 
219                     // next second, align second
220                     try {
221                         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
222                     } catch (InterruptedException e) {
223                         if (!ringThreadToStop) {
224                             logger.error(e.getMessage(), e);
225                         }
226                     }
227                 }
228                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
229             }
230         });
231         ringThread.setDaemon(true);
232         ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
233         ringThread.start();
234     }
View Code

任務定時觸發,流程以下:

1 分佈式鎖

爲了保證分佈式一致性先上悲觀鎖:使用select  xx  for update來實現

1  conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
2   connAutoCommit = conn.getAutoCommit();
3   conn.setAutoCommit(false);
4   preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
5   preparedStatement.execute();

2 輪詢db,找出trigger_next_time(下次觸發時間)在距now 5秒內的任務

 1 List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
 2 詳細sql以下:
 3 trigger_status表明觸發狀態處於啓動的任務 trigger_next_time表明 任務下次 執行觸發的時間
 4 <select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
 5    SELECT *
 6    FROM xxl_job_info AS t
 7    WHERE t.trigger_status = 1
 8       and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
 9    ORDER BY id ASC
10    LIMIT #{pagesize}
11 </select>

3 觸發算法

拿到了距now 5秒內的任務列表數據:scheduleList,分三種狀況處理:for循環遍歷scheduleList集合

(1)對到達now時間後的任務:(超出now 5秒外):直接跳過不執行; 重置trigger_next_time;

(2)對到達now時間後的任務:(超出now 5秒內):線程執行觸發邏輯; 若任務下一次觸發時間是在5秒內, 則放到時間輪內(Map<Integer, List<Integer>> 秒數(1-60) => 任務id列表);

        再 重置trigger_next_time

(3)對未到達now時間的任務:直接放到時間輪內;重置trigger_next_time 。

分別對應下面 這個數軸 的 三個階段

具體參見下面代碼:

1   下面對應代碼(1)對到達now時間後的任務(超出now 5秒外):直接跳過不執行; 重置trigger_next_time
2 if (nowTime > jobInfo.getTriggerNextTime() + 5000) {
3 // 2.一、trigger-expire > 5s:pass && make next-trigger-time
4 logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
5 // fresh next
6 refreshNextValidTime(jobInfo, new Date());
7 }

 

 1 下面對應代碼(2)對到達now時間後的任務(超出now 5秒內):線程執行觸發邏輯; 若任務下一次觸發時間是在5秒內,
 2 
 3 則放到時間輪內(Map<Integer, List<Integer>> 秒數(1-60) => 任務id列表);重置trigger_next_time
 4 
 5 else if (nowTime > jobInfo.getTriggerNextTime()) {
 6         // 一、trigger
 7        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
 8         // 二、fresh next
 9         refreshNextValidTime(jobInfo, new Date());
10         // next-trigger-time in 5s, pre-read again
11         if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
12         // 一、make ring second
13         int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
14         // 二、push time ring
15         pushTimeRing(ringSecond, jobInfo.getId());
16          // 三、fresh next
17         refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
18         }
19 } 

 

 1 下面對應代碼(3)對未到達now時間的任務:直接放到時間輪內;重置trigger_next_time
 2 
 3 else {
 4     // 一、make ring second
 5     int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 6     // 二、push time ring
 7     pushTimeRing(ringSecond, jobInfo.getId());
 8     // 三、fresh next
 9     refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
10 }

上面的refreshNextValidTime方法是 更新任務的 trigger_next_time 下次觸發時間,xxl_job_info表是記錄定時任務的db表,裏面有個trigger_next_time(Long)字段,表示下一次觸發的時間點任務時間被修改 

每一次任務觸發後,能夠根據cronb表達式計算下一次觸發時間戳:Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date())),更新trigger_next_time字段。

4 時間輪觸發

接下來說時間輪,時間輪數據結構: Map<Integer, List<Integer>>  key是秒數(1-60)  value是任務id列表,具體結構以下圖 :

時間輪的執行代碼以下:

 1 public void run() {
 2 
 3                 // align second
 4                 try {
 5                     TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
 6                 } catch (InterruptedException e) {
 7                     if (!ringThreadToStop) {
 8                         logger.error(e.getMessage(), e);
 9                     }
10                 }
11 
12                 while (!ringThreadToStop) {
13 
14                     try {
15                         // second data
16                         List<Integer> ringItemData = new ArrayList<>();
17                         int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免處理耗時太長,跨過刻度,向前校驗一個刻度;
18                         for (int i = 0; i < 2; i++) {
19                             List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
20                             if (tmpData != null) {
21                                 ringItemData.addAll(tmpData);
22                             }
23                         }
24 
25                         // ring trigger
26                         logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
27                         if (ringItemData.size() > 0) {
28                             // do trigger
29                             for (int jobId: ringItemData) {
30                                 // do trigger
31                                 JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
32                             }
33                             // clear
34                             ringItemData.clear();
35                         }
36                     } catch (Exception e) {
37                         if (!ringThreadToStop) {
38                             logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
39                         }
40                     }
41 
42                     // next second, align second
43                     try {
44                         TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
45                     } catch (InterruptedException e) {
46                         if (!ringThreadToStop) {
47                             logger.error(e.getMessage(), e);
48                         }
49                     }
50                 }
51                 logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
52             }

時間輪數據結構: Map<Integer, List<Integer>>  key是hash計算觸發時間得到的秒數(1-60),value是任務id列表

入輪:掃描任務觸發時 (1)本次任務處理完成,但下一次觸發時間是在5秒內(2)本次任務未達到觸發時間                     

出輪:獲取當前時間秒數,從時間輪內移出當前秒數前2個秒數的任務id列表, 依次進行觸發任務;(避免處理耗時太長,跨過刻度,多向前校驗一秒)

增長時間輪的目的是:任務過多可能會延遲,爲了保障觸發時間儘量和 任務設置的觸發時間儘可能一致,把即將要觸發的任務提早放到時間輪裏,每秒來觸發時間輪相應節點的任務

 

三 關於這麼設計的感悟:看似簡單的一個任務觸發爲何要搞這麼複雜呢?

個人答案是:  由於 出於「性能」 和「時效性」這兩點 綜合來考慮,即「中庸之道」。

就拿每次 「從DB查出 近期 即將要到觸發時間任務」 這個場景 來看:

1  若是但願「性能」更好,那確定每次多查出些數據,但這樣就不可避免的形成 由於任務過多,同一批查出來的位置靠後的某些任務 觸發就可能會延遲,好比實際觸發比設定觸發的時間晚幾秒。

2 若是但願「時效性」更好,那確定每次少查出些數據,好比每次只查出來一條或者幾條,實際觸發時間和設定的觸發時間 基本同樣,但這樣形成了頻繁查詢數據庫,性能降低。

故 通「時間輪」達到既「性能」比較好而且每次查出相對儘可能多 的數據(目前是取5s內觸發的任務),又時間輪來保障「時效性」:實際觸發時間和設定的觸發時間 儘可能同樣。這就是設計這麼複雜的緣由。

相關文章
相關標籤/搜索