Azkaban的定時調度任務分析33:定時執行上文

以前,咱們研究了執行1次的任務,可是實際生產環境中,咱們可能須要不少個定時調度的需求,java

因此,接下來,咱們來研究定時調度的原理!web

========================================================================ajax

點開這個按鈕後,出現新的界面以下:session

填寫本身的數值後,提交Schedule按鈕後,觀察URLapp

對應的URL===>webapp

.POST /schedule HTTP/1.1

看來須要去查閱一個新的URL Servletide

root.addServlet(new ServletHolder(new ScheduleServlet()), "/schedule");函數

========================================================================================
this

jdb azkaban.webapp.AzkabanWebServer -conf  /root/azkb/azkaban_3.0.0_debug/confspa

stop in azkaban.webapp.servlet.ScheduleServlet.doPost

stop in  azkaban.webapp.servlet.ScheduleServlet.ajaxScheduleFlow

stop in azkaban.scheduler.ScheduleManager.scheduleFlow

run

========================================================================================

最後執行的是ExecutorServlet的下面的方法

 @Override

  protected void handlePost(HttpServletRequest req, HttpServletResponse resp,

      Session session) throws ServletException, IOException {

  //看到這裏了

    if (hasParam(req, "ajax")) {

      handleAJAXAction(req, resp, session);

    }

  }

通過一番參數解析後,後面會有這樣的一段代碼

Schedule schedule = scheduleManager.scheduleFlow(-1, projectId, projectName, flowName, "ready",

firstSchedTime.getMillis(), firstSchedTime.getZone(), thePeriod, DateTime.now().getMillis(),

firstSchedTime.getMillis(), firstSchedTime.getMillis(), user.getUserId(), flowOptions, slaOptions);

logger.info("User '" + user.getUserId() + "' has scheduled " + "[" + projectName + flowName + " (" + projectId

+ ")" + "].");

這個卻是須要仔細研究下~

========================================================================================

正如單次運行是放入一個queue中同樣,對於循環執行的任務,放入一個線程的隊列裏

public class TriggerManager extends EventHandler implements

裏面有一個線程

  private final TriggerScannerThread runnerThread;

任務是如何插入到這個線程的呢?

 

 

 

緣由就在於,以前有個函數

public void insertTrigger(Trigger t) throws TriggerManagerException {

    synchronized (syncObj) {

      try {

        triggerLoader.addTrigger(t);

      } catch (TriggerLoaderException e) {

        throw new TriggerManagerException(e);

      }

      runnerThread.addTrigger(t);//看這裏

      triggerIdMap.put(t.getTriggerId(), t);

    }

  }

public void addTrigger(Trigger t) {

      synchronized (syncObj) {

        t.updateNextCheckTime();

        triggers.add(t);

      }

}

 private class TriggerScannerThread extends Thread {

    private BlockingQueue<Trigger> triggers;

 

這樣,一個調度任務就插入到了一個BlockingQueue,

 

固然,若是有興趣的話,還能夠看這個變量的初始化部分:

public TriggerScannerThread(long scannerInterval) {

      triggers = new PriorityBlockingQueue<Trigger>(1, new TriggerComparator());

      justFinishedFlows = new ConcurrentHashMap<Integer, ExecutableFlow>();

      this.setName("TriggerRunnerManager-Trigger-Scanner-Thread");

      this.scannerInterval = scannerInterval;

    }

原來是一個PriorityBlockingQueue,這樣排序都不用本身作了,很方便!

------------------

先看看這個線程什麼上下文初始化的?

下文再說!

相關文章
相關標籤/搜索