Quartz集羣實戰與原理分析

1、簡介java

Quartz是一個優秀的調度框架,徹底基於Java實現。具備如下幾大特色:mysql

(1)強大的調度功能,例如支持豐富多樣的調度方法,能夠知足各類常規及特殊需求;git

(2)靈活的應用方式,例如支持任務和調度的多種組合方式,支持調度數據的多種存儲方式;github

(3)分佈式和集羣能力,Terracotta收購後在原來功能基礎上做了進一步提高。算法


核心概念sql

  1. Job 表示一個工做,要執行的具體內容。此接口中只有一個方法,以下:數據庫

    void execute(JobExecutionContext context) 
  2. JobDetail 表示一個具體的可執行的調度程序,Job 是這個可執行程調度程序所要執行的內容,另外 JobDetail 還包含了這個任務調度的方案和策略。
    服務器

  3. Trigger 觸發器,指定什麼時候觸發任務。 
    架構

  4. Scheduler 表明一個調度容器,一個調度容器中能夠註冊多個 JobDetail 和 Trigger。當 Trigger 與 JobDetail 組合,就能夠被 Scheduler 容器調度了。 併發

666.png


Quartz線程視圖

在Quartz中,有兩類線程,Scheduler調度線程和任務執行線程,其中任務執行線程一般使用一個線程池維護一組線程。


777.png

Scheduler調度線程主要有兩個:執行常規調度的線程,和執行misfiredtrigger的線程。常規調度線程輪詢存儲的全部trigger,若是有須要觸發的trigger,即到達了下一次觸發的時間,則從任務執行線程池獲取一個空閒線程,執行與該trigger關聯的任務。Misfire線程是掃描全部的trigger,查看是否有misfiredtrigger,若是有的話根據misfire的策略分別處理(fire now OR wait for the next fire)。


Quartz Job數據存儲

Quartz中的trigger和job須要存儲下來才能被使用。Quartz中有兩種存儲方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是將trigger和job存儲在內存中,而JobStoreSupport是基於jdbc將trigger和job存儲到數據庫中。RAMJobStore的存取速度很是快,可是因爲其在系統被中止後全部的數據都會丟失,因此在集羣應用中,必須使用JobStoreSupport


2、Quartz集羣架構

一個Quartz集羣中的每一個節點是一個獨立的Quartz應用,它又管理着其餘的節點。這就意味着你必須對每一個節點分別啓動或中止。Quartz集羣中,獨立的Quartz節點並不與另外一其的節點或是管理節點通訊,而是經過相同的數據庫表來感知到另外一Quartz應用的。

20150519231230428.png

數據庫準備

由於Quzrtz集羣依賴於數據庫,因此必須先建立數據庫表,數據表示官方提供的,我用的是quartz2.3.0版本,有11張表,以下:

9999.png

表信息介紹

qrtz_blob_triggers : 以Blob 類型存儲的觸發器。

qrtz_calendars存儲Quartz的Calendar信息

qrtz_cron_triggers存儲CronTrigger,包括Cron表達式和時區信息

qrtz_fired_triggers存儲與已觸發的Trigger相關的狀態信息,以及相聯Job的執行信息

qrtz_job_details存儲每個已配置的Job的詳細信息

qrtz_locks存儲程序的悲觀鎖的信息

qrtz_paused_trigger_grps存儲已暫停的Trigger組的信息

qrtz_scheduler_state存儲少許的有關Scheduler的狀態信息,和別的Scheduler實例

qrtz_simple_triggers存儲簡單的Trigger,包括重複次數、間隔、以及已觸的次數

qrtz_simprop_triggers   存儲CalendarIntervalTrigger和DailyTimeIntervalTrigger兩種類型的觸發器

qrtz_triggers存儲已配置的Trigger的信息 


qrtz_locks就是Quartz集羣實現同步機制的行鎖表,包括如下幾個鎖:CALENDAR_ACCESS 、JOB_ACCESS、MISFIRE_ACCESS 、STATE_ACCESS 、TRIGGER_ACCESS


從故障實例中恢復Job

當一個Sheduler實例在執行某個Job時失敗了,有可能由另外一正常工做的Scheduler實例接過這個Job從新運行。要實現這種行爲,配置給JobDetail對象的Job可恢復屬性必須設置爲true(job.setRequestsRecovery(true))。若是可恢復屬性被設置爲false(默認爲false),當某個Scheduler在運行該job失敗時,它將不會從新運行;而是由另外一個Scheduler實例在下一次觸發時間觸發。Scheduler實例出現故障後多快能被偵測到取決於每一個Scheduler的檢入間隔(即2.3中提到的org.quartz.jobStore.clusterCheckinInterval)。


測試項目及配置

項目代碼下載:https://github.com/feixiameiruhua/my-quartz-cluster.git 

555.png


quartz.properties文件

# 固定前綴org.quartz
# 主要分爲scheduler、threadPool、jobStore、plugin等部分
#
#
#調度器實例編號自動生成
org.quartz.scheduler.instanceId = AUTO

#調度器實例名稱
org.quartz.scheduler.instanceName = DefaultQuartzScheduler
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false

# 實例化ThreadPool時,使用的線程類爲SimpleThreadPool
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool

# threadCount和threadPriority將以setter的形式注入ThreadPool實例
# 併發個數
org.quartz.threadPool.threadCount = 5
# 優先級
org.quartz.threadPool.threadPriority = 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

org.quartz.jobStore.misfireThreshold = 5000

# 默認存儲在內存中
#org.quartz.jobStore.class = org.quartz.simpl.RAMJobStore

#持久化
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX

#開啓分佈式部署
org.quartz.jobStore.isClustered = true

org.quartz.jobStore.tablePrefix = QRTZ_

org.quartz.jobStore.dataSource = qzDS

org.quartz.dataSource.qzDS.driver = com.mysql.jdbc.Driver

org.quartz.dataSource.qzDS.URL = jdbc:mysql://localhost:3306/quartz_test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true

org.quartz.dataSource.qzDS.user = root

org.quartz.dataSource.qzDS.password = 123456

org.quartz.dataSource.qzDS.maxConnections = 10

配置文件說明

org.quartz.jobStore.isClustered = true

在集羣中的每個實例都必須有一個惟一的"instance id" ("org.quartz.scheduler.instanceId" 屬性), 默認爲AUTO就能夠。還要有相同的"scheduler instance name" ("org.quartz.scheduler.instanceName"),也就是說集羣中的每個實例都必須使用相同的quartz.properties 配置文件。


調度任務代碼

package com.fwmagic.quartz.schedule;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.ResourceBundle;

public class SchedulerExecJob implements Job {

    private static Logger logger = LoggerFactory.getLogger(SchedulerExecJob.class);

    @Override
    public void execute(JobExecutionContext context)
            throws JobExecutionException {
        String jobName = context.getJobDetail().getKey().getName();
        switch (jobName) {
            /*每5s執行一次*/
            case "quartz_test1":
                System.err.println(getAddress()+" "+getDate()+"====>quartz_test1<====");
                break;
            /*每5s執行一次*/
            case "quartz_test2":
                System.err.println(getAddress()+" "+getDate()+"====>quartz_test2<====");
                break;
            /*每5s執行一次*/
            case "quartz_test3":
                System.err.println(getAddress()+" "+getDate()+"====>quartz_test3<====");
                break;
            default:
                System.err.println(getAddress()+" "+getDate()+"====>other task<====");
                break;
        }
    }

    public static String getDate(){
        return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
    }

    public static String getAddress(){
        return "http://localhost:"+ResourceBundle.getBundle("application").getString("server.port");
    }
}

333.png


2、集羣任務測試

項目中有三個任務,我用不一樣端口(三個服務同時啓動,端口不同,分別爲:9099(ServerA),,9098(ServerB),9097(ServerC))在本機啓動同一個項目,效果圖以下:

當開啓ServerA時,三個任務都在9099端口執行,控制檯信息爲:

9099.png


開啓ServerB後,有部分任務分配過來:

9098.png

開啓ServerC,集羣所有啓動的狀況下,會有任務交錯執行或者任務在同一臺機器上執行的效果:

9096.png


注意事項

Quartz實際並不關心你是在相同仍是不一樣的機器上運行節點。當集羣放置在不一樣的機器上時,稱之爲水平集羣。節點跑在同一臺機器上時,稱之爲垂直集羣。對於垂直集羣,存在着單點故障的問題。這對高可用性的應用來講是沒法接受的,由於一旦機器崩潰了,全部的節點也就被終止了。對於水平集羣,存在着時間同步問題。

  節點用時間戳來通知其餘實例它本身的最後檢入時間。假如節點的時鐘被設置爲未來的時間,那麼運行中的Scheduler將再也意識不到那個結點已經宕掉了。另外一方面,若是某個節點的時鐘被設置爲過去的時間,也許另外一節點就會認定那個節點已宕掉並試圖接過它的Job重運行。最簡單的同步計算機時鐘的方式是使用某一個Internet時間服務器(Internet Time Server ITS)。


節點爭搶Job問題

由於Quartz使用了一個隨機的負載均衡算法, Job以隨機的方式由不一樣的實例執行。Quartz官網上提到當前,還不存在一個方法來指派(釘住) 一個 Job 到集羣中特定的節點。


3、集羣源碼分析


Quartz如何保證多個節點的應用只進行一次調度(即某一時刻的調度任務只由其中一臺服務器執行)?,正如Quartz集羣架構上的那副圖,

Quartz的集羣是在同一個數據庫下, 由數據庫的數據來肯定調度任務是否正在執行, 正在執行則其餘服務器就不能去執行該行調度數據。 這個跟不少項目是用Zookeeper作集羣不同, 這些項目是靠Zookeeper選舉出來的的服務器去執行, 能夠理解爲Quartz靠數據庫選舉一個服務器來執行。

Quartz最主要的一個類QuartzSchedulerThread職責是觸發任務, 是一個不斷運行的Quartz主線程, 仍是從這裏入手瞭解集羣原理。

QuartzSchedulerThread繼承自Thread,實現了run方法,在run方法中調用了以下幾個重要的方法,都進行了加鎖的操做:

一、qsRsrcs.getJobStore().acquireNextTriggers【查找即將觸發的Trigger】

二、sigLock.wait(timeUntilTrigger)【等待執行】

三、qsRsrcs.getJobStore().triggersFired(triggers)【執行】

四、qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i)) 【釋放Trigger】


以acquireNextTriggers爲例,能夠看到:

suo.png

protected static final String LOCK_TRIGGER_ACCESS = "TRIGGER_ACCESS";//數據庫鎖名字

將鎖名傳入核心的加鎖方法(executeInNonManagedTXLock)中:

protected <T> T executeInNonManagedTXLock(
        String lockName, 
        TransactionCallback<T> txCallback, final TransactionValidator<T> txValidator) throws JobPersistenceException {
    boolean transOwner = false;
    Connection conn = null;
    try {
        if (lockName != null) {
            // If we aren't using db locks, then delay getting DB connection 
            // until after acquiring the lock since it isn't needed.
            if (getLockHandler().requiresConnection()) {
                conn = getNonManagedTXConnection();
            }
            //獲取鎖
            transOwner = getLockHandler().obtainLock(conn, lockName);
        }
        
        if (conn == null) {
            conn = getNonManagedTXConnection();
        }
        
        //回調執行
        final T result = txCallback.execute(conn);
        try {
            commitConnection(conn);
        } catch (JobPersistenceException e) {
            rollbackConnection(conn);
            if (txValidator == null || !retryExecuteInNonManagedTXLock(lockName, new TransactionCallback<Boolean>() {
                @Override
                public Boolean execute(Connection conn) throws JobPersistenceException {
                    return txValidator.validate(conn, result);
                }
            })) {
                throw e;
            }
        }

        Long sigTime = clearAndGetSignalSchedulingChangeOnTxCompletion();
        if(sigTime != null && sigTime >= 0) {
            signalSchedulingChangeImmediately(sigTime);
        }
        
        return result;
    } catch (JobPersistenceException e) {
        rollbackConnection(conn);
        throw e;
    } catch (RuntimeException e) {
        rollbackConnection(conn);
        throw new JobPersistenceException("Unexpected runtime exception: "
                + e.getMessage(), e);
    } finally {
        try {
            //釋放鎖
            releaseLock(lockName, transOwner);
        } finally {
            cleanupConnection(conn);
        }
    }
}


經過這行代碼查找鎖是怎麼來的

transOwner = getLockHandler().obtainLock(conn, lockName);

在JobStoreSupport的initialize方法中:

public void initialize(ClassLoadHelper loadHelper,
        SchedulerSignaler signaler) throws SchedulerConfigException {

    if (dsName == null) { 
        throw new SchedulerConfigException("DataSource name not set."); 
    }

    // If the user hasn't specified an explicit lock handler, then 
    // choose one based on CMT/Clustered/UseDBLocks.
    if (getLockHandler() == null) {
        
        // If the user hasn't specified an explicit lock handler, 
        // then we *must* use DB locks with clustering
        if (isClustered()) {
            setUseDBLocks(true);
        }
          //……
          // 在初始化方法裏面賦值了
           setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
        } else {
            getLog().info(
                "Using thread monitor-based data access locking (synchronization).");
            setLockHandler(new SimpleSemaphore());
        }
    }

}

在new StdRowLockSemaphore構造方法中

public StdRowLockSemaphore(String tablePrefix, String schedName, String selectWithLockSQL) {
    super(tablePrefix, schedName, selectWithLockSQL != null ? selectWithLockSQL : SELECT_FOR_LOCK, INSERT_LOCK);
}

能夠發現有兩個鎖名稱:

public static final String SELECT_FOR_LOCK = "SELECT * FROM "
        + TABLE_PREFIX_SUBST + TABLE_LOCKS + " WHERE " + COL_SCHEDULER_NAME + " = " + SCHED_NAME_SUBST
        + " AND " + COL_LOCK_NAME + " = ? FOR UPDATE";

public static final String INSERT_LOCK = "INSERT INTO "
    + TABLE_PREFIX_SUBST + TABLE_LOCKS + "(" + COL_SCHEDULER_NAME + ", " + COL_LOCK_NAME + ") VALUES (" 
    + SCHED_NAME_SUBST + ", ?)";

數據庫的qrtz_locks中存放兩個鎖的記錄

ss.png


能夠看出採用了Quartz集羣採用了悲觀鎖的方式對triggers表進行行加鎖, 以保證任務同步的正確性。

當線程使用上述的SQL對錶中的數據執行操做時,數據庫對該行進行行加鎖; 於此同時, 另外一個線程對該行數據執行操做前須要獲取鎖, 而此時已被佔用, 那麼這個線程就只能等待, 直到該行鎖被釋放。

相關文章
相關標籤/搜索