quartz集羣模式任務觸發分析(三)

源碼回顧java

quartz線程模型sql

quartz集羣模式任務觸發分析(二)數據庫


JobStoreSupport

任務的存儲類,這裏麪包含了上面提到的兩個比較核心的方法緩存

acquireNextTriggers

public List<OperableTrigger> acquireNextTriggers(final long noLaterThan, final int maxCount, final long timeWindow)
    throws JobPersistenceException 
{

    String lockName;
    // 從這個地方能夠看到maxCount大於1 的時候纔會使用悲觀鎖, isAcquireTriggersWithinLock默認爲false
    if(isAcquireTriggersWithinLock() || maxCount > 1) {
        lockName = LOCK_TRIGGER_ACCESS;
    } else {
        lockName = null;
    }
    return executeInNonManagedTXLock(lockName,
            new TransactionCallback<List<OperableTrigger>>() {
                public List<OperableTrigger> execute(Connection conn) throws JobPersistenceException {
                    // 重點看這個方法
                    // executeInNonManagedTXLock 裏面最終主要的就是執行這個方法。
                    return acquireNextTrigger(conn, noLaterThan, maxCount, timeWindow);
                }
            },
            new TransactionValidator<List<OperableTrigger>>() {
                 // 省略代碼。。
            });
}


protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
    throws JobPersistenceException 
{
    if (timeWindow < 0) {
      throw new IllegalArgumentException();
    }

    List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
    Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
    // 最多重試三次
    final int MAX_DO_LOOP_RETRY = 3;
    int currentLoopCount = 0;
    do {
        // 進入do while循環
        currentLoopCount ++;
        try {
            //經過時間,獲取nextFireTime<noLaterThan的trigger
            List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

            // No trigger is ready to fire yet.
            if (keys == null || keys.size() == 0)
                return acquiredTriggers;
            // 設置截止時間
            long batchEnd = noLaterThan;

            for(TriggerKey triggerKey: keys) {
                // If our trigger is no longer available, try a new one.
                // 判斷 trigger是否存在
                OperableTrigger nextTrigger = retrieveTrigger(conn, triggerKey);
                if(nextTrigger == null) {
                    continue// next trigger
                }

                JobKey jobKey = nextTrigger.getJobKey();
                // 判斷trigger對應的jobDetail是否存在
                JobDetail job;
                try {
                    job = retrieveJob(conn, jobKey);
                } catch (JobPersistenceException jpe) {
                    try {
                        getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
                        getDelegate().updateTriggerState(conn, triggerKey, STATE_ERROR);
                    } catch (SQLException sqle) {
                        getLog().error("Unable to set trigger state to ERROR.", sqle);
                    }
                    continue;
                }
                // 是否容許併發執行, JobBean上面含@DisallowConcurrentExecution這個註解的,表示不容許併發執行
                if (job.isConcurrentExectionDisallowed()) {
                    // 進入這裏,表示不容許併發執行
                    if (acquiredJobKeysForNoConcurrentExec.contains(jobKey)) {
                        continue// next trigger
                    } else {
                        acquiredJobKeysForNoConcurrentExec.add(jobKey);
                    }
                }
                // 若是該任務的下次執行時間大於截止時間,那麼跳過
                if (nextTrigger.getNextFireTime().getTime() > batchEnd) {
                  break;
                }

                // 更新這個trigger的狀態爲ACQUIRED ,表示正在準備出發。
                int rowsUpdated = getDelegate().updateTriggerStateFromOtherState(conn, triggerKey, STATE_ACQUIRED, STATE_WAITING);
                if (rowsUpdated <= 0) {
                    continue// next trigger
                }
                nextTrigger.setFireInstanceId(getFiredTriggerRecordId());
                // 插入出發記錄
                getDelegate().insertFiredTrigger(conn, nextTrigger, STATE_ACQUIRED, null);

                if(acquiredTriggers.isEmpty()) {
                    batchEnd = Math.max(nextTrigger.getNextFireTime().getTime(), System.currentTimeMillis()) + timeWindow;
                }
                // 加入返回trigger
                acquiredTriggers.add(nextTrigger);
            }
            if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) {
                continue;
            }
            break;
        } catch (Exception e) {
            throw new JobPersistenceException(
                      "Couldn't acquire next trigger: " + e.getMessage(), e);
        }
    } while (true);

    // Return the acquired trigger list
    return acquiredTriggers;
}

上面看到的是觸發器的獲取詳細實現,若是每次獲取的maxCount大於1 ,那麼就會使用悲觀鎖,防止任務在集羣狀態下被重複獲取,默認maxCount=1 , 這也就致使了,在默認的集羣模式下,若是不作這個配置,在併發狀態下,就會有出現任務被重複獲取,會產生任務被重複觸發的狀況。微信

triggersFired

在主線程裏面調用以下:併發

List<TriggerFiredResult> res =qsRsrcs.getJobStore().triggersFired(triggers);
public List<TriggerFiredResult> triggersFired(final List<OperableTrigger> triggers) throws JobPersistenceException {
    // 直接傳入鎖名,使用悲觀鎖
    return executeInNonManagedTXLock(LOCK_TRIGGER_ACCESS,
            new TransactionCallback<List<TriggerFiredResult>>() {
                public List<TriggerFiredResult> execute(Connection conn) throws JobPersistenceException {
                    List<TriggerFiredResult> results = new ArrayList<TriggerFiredResult>();

                    TriggerFiredResult result;
                    for (OperableTrigger trigger : triggers) {
                        try {
                          // 單個任務來慢慢搞
                          TriggerFiredBundle bundle = triggerFired(conn, trigger);
                          result = new TriggerFiredResult(bundle);
                        } catch (JobPersistenceException jpe) {
                            result = new TriggerFiredResult(jpe);
                        } catch(RuntimeException re) {
                            result = new TriggerFiredResult(re);
                        }
                        results.add(result);
                    }

                    return results;
                }
            },
            new TransactionValidator<List<TriggerFiredResult>>() {
                // 省略代碼。。
            });
}


protected TriggerFiredBundle triggerFired(Connection conn,
        OperableTrigger trigger)

    throws JobPersistenceException 
{
    JobDetail job;
    Calendar cal = null;

    try { // if trigger was deleted, state will be STATE_DELETED
        // 驗證trigger的狀態,若是不是等於ACQUIRED的,則直接return null
        String state = getDelegate().selectTriggerState(conn,
                trigger.getKey());
        if (!state.equals(STATE_ACQUIRED)) {
            return null;
        }
    } catch (SQLException e) {
        throw new JobPersistenceException("Couldn't select trigger state: "
                + e.getMessage(), e);
    }

    try {
        // 獲取這個trigger的任務詳情。
        job = retrieveJob(conn, trigger.getJobKey());
        if (job == null) { return null; }
    } catch (JobPersistenceException jpe) {
        try {
            getLog().error("Error retrieving job, setting trigger state to ERROR.", jpe);
            getDelegate().updateTriggerState(conn, trigger.getKey(),
                    STATE_ERROR);
        } catch (SQLException sqle) {
            getLog().error("Unable to set trigger state to ERROR.", sqle);
        }
        throw jpe;
    }

    if (trigger.getCalendarName() != null) {
        // 這裏主要是對非集羣模式下作一些緩存處理
        cal = retrieveCalendar(conn, trigger.getCalendarName());
        if (cal == null) { return null; }
    }

    try {
        // 更新觸發記錄的狀態爲EXECUTING
        getDelegate().updateFiredTrigger(conn, trigger, STATE_EXECUTING, job);
    } catch (SQLException e) {
        throw new JobPersistenceException("Couldn't insert fired trigger: "
                + e.getMessage(), e);
    }

    Date prevFireTime = trigger.getPreviousFireTime();
    // 計算下一次的trigger的執行時間
    trigger.triggered(cal);

    String state = STATE_WAITING;
    boolean force = true;
    //若是任務是不容許併發執行的,那麼須要將任務的狀態修改成BLOCK,阻塞
    if (job.isConcurrentExectionDisallowed()) {
        state = STATE_BLOCKED;
        force = false;
        try {
            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                    STATE_BLOCKED, STATE_WAITING);
            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                    STATE_BLOCKED, STATE_ACQUIRED);
            getDelegate().updateTriggerStatesForJobFromOtherState(conn, job.getKey(),
                    STATE_PAUSED_BLOCKED, STATE_PAUSED);
        } catch (SQLException e) {
            throw new JobPersistenceException(
                    "Couldn't update states of blocked triggers: "
                            + e.getMessage(), e);
        }
    }

    if (trigger.getNextFireTime() == null) {
        // 下次執行時間爲空,也就是說沒有下次了,直接修改trigger的狀態爲完成
        state = STATE_COMPLETE;
        force = true;
    }
    // 修改trigger的撞他信息
    storeTrigger(conn, trigger, job, true, state, force, false);

    job.getJobDataMap().clearDirtyFlag();
    // 返回任務的執行信息
    return new TriggerFiredBundle(job, trigger, cal, trigger.getKey().getGroup()
            .equals(Scheduler.DEFAULT_RECOVERY_GROUP), new Date(), trigger
            .getPreviousFireTime(), prevFireTime, trigger.getNextFireTime());

該方法作了如下工做:app

1.獲取trigger當前狀態異步

2.經過trigger中的JobKey讀取trigger包含的Job信息分佈式

3.將trigger更新至觸發狀態oop

4.更新數據庫中trigger的信息,包括更改狀態至STATE_COMPLETE,及計算下一次觸發時間.

5.返回trigger觸發結果的數據傳輸類TriggerFiredBundle

從該方法返回後,trigger的執行過程已基本完畢.回到執行quratz操做規範的executeInNonManagedTXLock方法,將數據庫鎖釋放.

trigger觸發操做完成

總結:
簡單地說,quartz的分佈式調度策略是以數據庫爲邊界資源的一種異步策略.各個調度器都遵照一個基於數據庫鎖的操做規則保證了操做的惟一性.

同時多個節點的異步運行保證了服務的可靠.但這種策略有本身的侷限性,集羣特性對於高cpu使用率的任務效果很好,可是對於大量的短任務,

各個節點都會搶佔數據庫鎖,這樣就出現大量的線程等待資源.這種狀況隨着節點的增長會愈來愈嚴重.


本文分享自微信公衆號 - sharedCode(sharedCode)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索