以前,咱們研究了執行1次的任務,可是實際生產環境中,咱們可能須要不少個定時調度的需求,java
因此,接下來,咱們來研究定時調度的原理!web
========================================================================ajax
點開這個按鈕後,出現新的界面以下:session
填寫本身的數值後,提交Schedule按鈕後,觀察URLapp
對應的URL===>webapp
.POST /schedule HTTP/1.1
看來須要去查閱一個新的URL Servletide
root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");函數
========================================================================================
this
jdb azkaban.webapp.AzkabanWebServer -conf /root/azkb/azkaban_3.
0
.0_debug/conf
spa
stop in azkaban.webapp.servlet.ScheduleServlet.doPost
stop in azkaban.webapp.servlet.ScheduleServlet.ajaxScheduleFlow
stop in azkaban.scheduler.ScheduleManager.scheduleFlow
run
========================================================================================
最後執行的是ExecutorServlet的下面的方法
@Override
protected void handlePost(HttpServletRequest req, HttpServletResponse resp,
Session session) throws ServletException, IOException {
//看到這裏了
if (hasParam(req, "ajax")) {
handleAJAXAction(req, resp, session);
}
}
通過一番參數解析後,後面會有這樣的一段代碼
Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready",
firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(),
firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);
logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId
+ ")" + "].");
這個卻是須要仔細研究下~
========================================================================================
正如單次運行是放入一個queue中同樣,對於循環執行的任務,放入一個線程的隊列裏
public class TriggerManager extends EventHandler implements
裏面有一個線程
private final TriggerScannerThread runnerThread;
任務是如何插入到這個線程的呢?
緣由就在於,以前有個函數
public void insertTrigger(Trigger t) throws TriggerManagerException {
synchronized (syncObj) {
try {
triggerLoader.addTrigger(t);
} catch (TriggerLoaderException e) {
throw new TriggerManagerException(e);
}
runnerThread.addTrigger(t);//看這裏
triggerIdMap.put(t.getTriggerId(), t);
}
}
public void addTrigger(Trigger t) {
synchronized (syncObj) {
t.updateNextCheckTime();
triggers.add(t);
}
}
private class TriggerScannerThread extends Thread {
private BlockingQueue<Trigger> triggers;
這樣,一個調度任務就插入到了一個BlockingQueue,
固然,若是有興趣的話,還能夠看這個變量的初始化部分:
public TriggerScannerThread(long scannerInterval) {
triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());
justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();
this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");
this.scannerInterval = scannerInterval;
}
原來是一個PriorityBlockingQueue,這樣排序都不用本身作了,很方便!
------------------
先看看這個線程什麼上下文初始化的?
下文再說!