聊聊quartz的調度及性能

本文主要研究下quartz的QuartzSchedulerThread的調度以及quartz的性能問題。html

SchedulerFactoryBean

spring-context-support-4.3.7.RELEASE-sources.jar!/org/springframework/scheduling/quartz/SchedulerFactoryBean.javajava

@Override
    public void afterPropertiesSet() throws Exception {
        if (this.dataSource == null && this.nonTransactionalDataSource != null) {
            this.dataSource = this.nonTransactionalDataSource;
        }

        if (this.applicationContext != null && this.resourceLoader == null) {
            this.resourceLoader = this.applicationContext;
        }

        // Create SchedulerFactory instance...
        SchedulerFactory schedulerFactory = BeanUtils.instantiateClass(this.schedulerFactoryClass);
        initSchedulerFactory(schedulerFactory);

        if (this.resourceLoader != null) {
            // Make given ResourceLoader available for SchedulerFactory configuration.
            configTimeResourceLoaderHolder.set(this.resourceLoader);
        }
        if (this.taskExecutor != null) {
            // Make given TaskExecutor available for SchedulerFactory configuration.
            configTimeTaskExecutorHolder.set(this.taskExecutor);
        }
        if (this.dataSource != null) {
            // Make given DataSource available for SchedulerFactory configuration.
            configTimeDataSourceHolder.set(this.dataSource);
        }
        if (this.nonTransactionalDataSource != null) {
            // Make given non-transactional DataSource available for SchedulerFactory configuration.
            configTimeNonTransactionalDataSourceHolder.set(this.nonTransactionalDataSource);
        }

        // Get Scheduler instance from SchedulerFactory.
        try {
            this.scheduler = createScheduler(schedulerFactory, this.schedulerName);
            populateSchedulerContext();

            if (!this.jobFactorySet && !(this.scheduler instanceof RemoteScheduler)) {
                // Use AdaptableJobFactory as default for a local Scheduler, unless when
                // explicitly given a null value through the "jobFactory" bean property.
                this.jobFactory = new AdaptableJobFactory();
            }
            if (this.jobFactory != null) {
                if (this.jobFactory instanceof SchedulerContextAware) {
                    ((SchedulerContextAware) this.jobFactory).setSchedulerContext(this.scheduler.getContext());
                }
                this.scheduler.setJobFactory(this.jobFactory);
            }
        }

        finally {
            if (this.resourceLoader != null) {
                configTimeResourceLoaderHolder.remove();
            }
            if (this.taskExecutor != null) {
                configTimeTaskExecutorHolder.remove();
            }
            if (this.dataSource != null) {
                configTimeDataSourceHolder.remove();
            }
            if (this.nonTransactionalDataSource != null) {
                configTimeNonTransactionalDataSourceHolder.remove();
            }
        }

        registerListeners();
        registerJobsAndTriggers();
    }複製代碼

這裏在afterPropertiesSet的時候,createScheduler(schedulerFactory, this.schedulerName);git

SchedulerFactoryBean.createScheduler

protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
            throws SchedulerException {

        // Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
        Thread currentThread = Thread.currentThread();
        ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
        boolean overrideClassLoader = (this.resourceLoader != null &&
                !this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
        if (overrideClassLoader) {
            currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
        }
        try {
            SchedulerRepository repository = SchedulerRepository.getInstance();
            synchronized (repository) {
                Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
                Scheduler newScheduler = schedulerFactory.getScheduler();
                if (newScheduler == existingScheduler) {
                    throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
                            "in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
                }
                if (!this.exposeSchedulerInRepository) {
                    // Need to remove it in this case, since Quartz shares the Scheduler instance by default!
                    SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
                }
                return newScheduler;
            }
        }
        finally {
            if (overrideClassLoader) {
                // Reset original thread context ClassLoader.
                currentThread.setContextClassLoader(threadContextClassLoader);
            }
        }
    }複製代碼

這裏調用schedulerFactory.getScheduler()來建立github

quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.javaspring

public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }複製代碼

這裏調用instantiate來初始化。裏頭初始化了一個QuartzSchedulersql

qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);複製代碼

QuartzScheduler

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzScheduler.javashell

public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);

        getLog().info("Quartz Scheduler v." + getVersion() + " created.");
    }複製代碼

這個又初始化了QuartzSchedulerThread數據庫

QuartzSchedulerThread

這個是調度的核心類
quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerThread.javasegmentfault

public class QuartzSchedulerThread extends Thread {
    //......
    /**
     * <p>
     * The main processing loop of the <code>QuartzSchedulerThread</code>.
     * </p>
     */
    @Override
    public void run() {
        int acquiresFailed = 0;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause... synchronized (sigLock) { while (paused && !halted.get()) { try { // wait until togglePause(false) is called... sigLock.wait(1000L); } catch (InterruptedException ignore) { } // reset failure counter when paused, so that we don't
                        // wait again after unpausing
                        acquiresFailed = 0;
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                // wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }
                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
                    try {
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
                        acquiresFailed = 0;
                        if (log.isDebugEnabled())
                            log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
                    } catch (JobPersistenceException jpe) {
                        if (acquiresFailed == 0) {
                            qs.notifySchedulerListenersError(
                                "An error occurred while scanning for the next triggers to fire.",
                                jpe);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    } catch (RuntimeException e) {
                        if (acquiresFailed == 0) {
                            getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                                    +e.getMessage(), e);
                        }
                        if (acquiresFailed < Integer.MAX_VALUE)
                            acquiresFailed++;
                        continue;
                    }

                    if (triggers != null && !triggers.isEmpty()) {

                        now = System.currentTimeMillis();
                        long triggerTime = triggers.get(0).getNextFireTime().getTime();
                        long timeUntilTrigger = triggerTime - now;
                        while(timeUntilTrigger > 2) {
                            synchronized (sigLock) {
                                if (halted.get()) {
                                    break;
                                }
                                if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
                                    try {
                                        // we could have blocked a long while
                                        // on 'synchronize', so we must recompute
                                        now = System.currentTimeMillis();
                                        timeUntilTrigger = triggerTime - now;
                                        if(timeUntilTrigger >= 1)
                                            sigLock.wait(timeUntilTrigger);
                                    } catch (InterruptedException ignore) {
                                    }
                                }
                            }
                            if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
                                break;
                            }
                            now = System.currentTimeMillis();
                            timeUntilTrigger = triggerTime - now;
                        }

                        // this happens if releaseIfScheduleChangedSignificantly decided to release triggers
                        if(triggers.isEmpty())
                            continue;

                        // set triggers to 'executing'
                        List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();

                        boolean goAhead = true;
                        synchronized(sigLock) {
                            goAhead = !halted.get();
                        }
                        if(goAhead) {
                            try {
                                List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
                                if(res != null)
                                    bndles = res;
                            } catch (SchedulerException se) {
                                qs.notifySchedulerListenersError(
                                        "An error occurred while firing triggers '"
                                                + triggers + "'", se);
                                //QTZ-179 : a problem occurred interacting with the triggers from the db
                                //we release them and loop again
                                for (int i = 0; i < triggers.size(); i++) {
                                    qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                }
                                continue;
                            }

                        }

                        for (int i = 0; i < bndles.size(); i++) {
                            TriggerFiredResult result =  bndles.get(i);
                            TriggerFiredBundle bndle =  result.getTriggerFiredBundle();
                            Exception exception = result.getException();

                            if (exception instanceof RuntimeException) {
                                getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
                                qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
                                continue;
                            }

                            // it's possible to get 'null' if the triggers was paused, // blocked, or other similar occurrences that prevent it being // fired at this time... or if the scheduler was shutdown (halted) if (bndle == null) { qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)); continue; } JobRunShell shell = null; try { shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle); shell.initialize(qs); } catch (SchedulerException se) { qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); continue; } if (qsRsrcs.getThreadPool().runInThread(shell) == false) { // this case should never happen, as it is indicative of the // scheduler being shutdown or a bug in the thread pool or // a thread pool being used concurrently - which the docs // say not to do... getLog().error("ThreadPool.runInThread() return false!"); qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR); } } continue; // while (!halted) } } else { // if(availThreadCount > 0) // should never happen, if threadPool.blockForAvailableThreads() follows contract continue; // while (!halted) } long now = System.currentTimeMillis(); long waitTime = now + getRandomizedIdleWaitTime(); long timeUntilContinue = waitTime - now; synchronized(sigLock) { try { if(!halted.get()) { // QTZ-336 A job might have been completed in the mean time and we might have // missed the scheduled changed signal by not waiting for the notify() yet // Check that before waiting for too long in case this very job needs to be // scheduled very soon if (!isScheduleChanged()) { sigLock.wait(timeUntilContinue); } } } catch (InterruptedException ignore) { } } } catch(RuntimeException re) { getLog().error("Runtime error occurred in main trigger firing loop.", re); } } // while (!halted) // drop references to scheduler stuff to aid garbage collection... qs = null; qsRsrcs = null; } }複製代碼

這裏有幾個關鍵的信息springboot

blockForAvailableThreads

就是qsRsrcs.getThreadPool().blockForAvailableThreads(),若是線程池滿了的話,則會阻塞,於是會影響調度的準確性。

acquireNextTriggers

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize())複製代碼

timeUntilContinue

這裏進行等待

long now = System.currentTimeMillis();
                long waitTime = now + getRandomizedIdleWaitTime();
                long timeUntilContinue = waitTime - now;
                synchronized(sigLock) {
                    try {
                      if(!halted.get()) {
                        // QTZ-336 A job might have been completed in the mean time and we might have
                        // missed the scheduled changed signal by not waiting for the notify() yet
                        // Check that before waiting for too long in case this very job needs to be
                        // scheduled very soon
                        if (!isScheduleChanged()) {
                          sigLock.wait(timeUntilContinue);
                        }
                      }
                    } catch (InterruptedException ignore) {
                    }
                }複製代碼

而getRandomizedIdleWaitTime以下

private static long DEFAULT_IDLE_WAIT_TIME = 30L * 1000L;
    private long idleWaitTime = DEFAULT_IDLE_WAIT_TIME;
    private long getRandomizedIdleWaitTime() {
        return idleWaitTime - random.nextInt(idleWaitVariablness);
    }複製代碼

idleWaitTime默認是30秒,經過org.quartz.scheduler.idleWaitTime這個來配置

computeDelayForRepeatedErrors

// wait a bit, if reading from job store is consistently
                // failing (e.g. DB is down or restarting)..
                if (acquiresFailed > 1) {
                    try {
                        long delay = computeDelayForRepeatedErrors(qsRsrcs.getJobStore(), acquiresFailed);
                        Thread.sleep(delay);
                    } catch (Exception ignore) {
                    }
                }複製代碼

這裏優化了對數據庫掛的時候,避免頻繁輪詢的問題

private static final long MIN_DELAY = 20;
    private static final long MAX_DELAY = 600000;

    private static long computeDelayForRepeatedErrors(JobStore jobStore, int acquiresFailed) {
        long delay;
        try {
            delay = jobStore.getAcquireRetryDelay(acquiresFailed);
        } catch (Exception ignored) {
            // we're trying to be useful in case of error states, not cause // additional errors.. delay = 100; } // sanity check per getAcquireRetryDelay specification if (delay < MIN_DELAY) delay = MIN_DELAY; if (delay > MAX_DELAY) delay = MAX_DELAY; return delay; }複製代碼

connectionProvider.class

好比

org.quartz.jobStore.dataSource=myDataSourceName
org.quartz.dataSource.myDataSourceName.connectionProvider.class=org.quartz.utils.HikariCpPoolingConnectionProvider
org.quartz.dataSource.myDataSourceName.driver=org.postgresql.Driver
org.quartz.dataSource.myDataSourceName.URL=jdbc:postgresql://192.168.99.100:5432/postgres
org.quartz.dataSource.myDataSourceName.user:postgres
org.quartz.dataSource.myDataSourceName.password:postgres
org.quartz.dataSource.myDataSourceName.maxConnection:30
org.quartz.dataSource.myDataSourceName.validationQuery: select 1複製代碼

這樣設置,貌似報錯

Caused by: org.quartz.SchedulerException: ConnectionProvider class 'org.quartz.utils.HikariCpPoolingConnectionProvider' could not be instantiated.
    at org.quartz.impl.StdSchedulerFactory.instantiate(StdSchedulerFactory.java:936) ~[quartz-2.3.0.jar:na]
    at org.quartz.impl.StdSchedulerFactory.getScheduler(StdSchedulerFactory.java:1559) ~[quartz-2.3.0.jar:na]複製代碼

quartz-2.3.0-sources.jar!/org/quartz/impl/StdSchedulerFactory.java

ConnectionProvider cp = null;
                try {
                    cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class '" + cpClass
                            + "' could not be instantiated.", e);
                    throw initException;
                }複製代碼

不過我是在spring中直接配置鏈接池的,詳見springboot集成quartz2.3.0,於是就無論了

性能問題

批量參數

採用jdbc store的話,定時輪詢數據庫,比較消耗時間,一種解決方案就是儘可能批量查詢,這裏能夠設置兩個參數以下

org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow=1 ## 時間窗口,默認0
org.quartz.scheduler.batchTriggerAcquisitionMaxCount=5 ##這裏默認是1,能夠設置跟thread pool的core size相等複製代碼

quartz-2.3.0-sources.jar!/org/quartz/core/QuartzSchedulerResources.java

batchTriggerAcquisitionMaxCount對應的就是QuartzSchedulerResources中的maxBatchSize
batchTriggerAcquisitionFireAheadTimeWindow對應的就是QuartzSchedulerResources中的batchTimeWindow

triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());複製代碼

這裏實際取availThreadCount與maxBatchSize的最小值做爲maxCount
最後做用的是selectTriggerToAcquire
quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/JobStoreSupport.java

protected List<OperableTrigger> acquireNextTrigger(Connection conn, long noLaterThan, int maxCount, long timeWindow)
        throws JobPersistenceException {
        if (timeWindow < 0) {
          throw new IllegalArgumentException();
        }

        List<OperableTrigger> acquiredTriggers = new ArrayList<OperableTrigger>();
        Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
        final int MAX_DO_LOOP_RETRY = 3;
        int currentLoopCount = 0;
        do {
            currentLoopCount ++;
            try {
                List<TriggerKey> keys = getDelegate().selectTriggerToAcquire(conn, noLaterThan + timeWindow, getMisfireTime(), maxCount);

                // No trigger is ready to fire yet.
                if (keys == null || keys.size() == 0)
                    return acquiredTriggers;

                long batchEnd = noLaterThan;

                // ...

                // if we didn't end up with any trigger to fire from that first // batch, try again for another batch. We allow with a max retry count. if(acquiredTriggers.size() == 0 && currentLoopCount < MAX_DO_LOOP_RETRY) { continue; } // We are done with the while loop. break; } catch (Exception e) { throw new JobPersistenceException( "Couldn't acquire next trigger: " + e.getMessage(), e); } } while (true); // Return the acquired trigger list return acquiredTriggers; }複製代碼

sql實例

quartz-2.3.0-sources.jar!/org/quartz/impl/jdbcjobstore/StdJDBCDelegate.java

public List<TriggerKey> selectTriggerToAcquire(Connection conn, long noLaterThan, long noEarlierThan, int maxCount)
        throws SQLException {
        PreparedStatement ps = null;
        ResultSet rs = null;
        List<TriggerKey> nextTriggers = new LinkedList<TriggerKey>();
        try {
            ps = conn.prepareStatement(rtp(SELECT_NEXT_TRIGGER_TO_ACQUIRE));

            // Set max rows to retrieve
            if (maxCount < 1)
                maxCount = 1; // we want at least one trigger back.
            ps.setMaxRows(maxCount);

            // Try to give jdbc driver a hint to hopefully not pull over more than the few rows we actually need.
            // Note: in some jdbc drivers, such as MySQL, you must set maxRows before fetchSize, or you get exception!
            ps.setFetchSize(maxCount);

            ps.setString(1, STATE_WAITING);
            ps.setBigDecimal(2, new BigDecimal(String.valueOf(noLaterThan)));
            ps.setBigDecimal(3, new BigDecimal(String.valueOf(noEarlierThan)));
            rs = ps.executeQuery();

            while (rs.next() && nextTriggers.size() <= maxCount) {
                nextTriggers.add(triggerKey(
                        rs.getString(COL_TRIGGER_NAME),
                        rs.getString(COL_TRIGGER_GROUP)));
            }

            return nextTriggers;
        } finally {
            closeResultSet(rs);
            closeStatement(ps);
        }      
    }複製代碼

這裏的maxCount對應ps.setMaxRows(maxCount)
noLaterThan = noLaterThan + timeWindow ; noEarlierThan = getMisfireTime() 分別做用在NEXT_FIRE_TIME的範圍上
NEXT_FIRE_TIME <= noLaterThan && NEXT_FIRE_TIME >= noEarlierThan

查詢sql實例

SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = 'schedulerFactoryBean' AND TRIGGER_STATE = ? AND NEXT_FIRE_TIME <= ? AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= ?)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC複製代碼

小結

因爲是定時輪詢trigger,那麼在採用jdbc store的話,則這裏的輪詢多是個不小的開銷,對數據庫有潛在的壓力,另外在數據庫掛掉的時候,可能也有問題,雖然作了延遲等待。另外儘可能配置鏈接池,還有若是須要的話,調整batch參數。

doc

相關文章
相關標籤/搜索