Quartz.Net Cluster-DatabaseSupport-Priority

正如標題所示,文章主要是圍繞Quartz.Net做業調度框架話題展開的,內容出自博主學習官方Examples的學習心得與體會,文中不免會有錯誤之處,還請指出得以指教。node

一:觸發的優先級git

在往一個做業添加多個觸發器以後,若是同一個做業多個觸發時機在某個時間點同時觸發,那麼會先執行那個觸發器執行的輪詢呢?github

觸發的東西由觸發器說了算,經過設置觸發的Priority來解決先觸發誰的問題。數據庫

能夠經過一下代碼設置觸發的優先級(僅供參考):服務器

         ITrigger trigger1 = TriggerBuilder.Create()
                .WithIdentity("Priority1Trigger5SecondRepeat")
                .StartAt(startTime)
                .WithSimpleSchedule(x => x.WithRepeatCount(1).WithIntervalInSeconds(5))
                .WithPriority(1)
                .ForJob(job)
                .Build();

設置的數值越大,相應的觸發優先級就越高。框架

二:服務端與客戶端tcp

Quart.Net能夠在客戶端定義輪詢做業,在服務端進行調度。ide

首先定義服務端,以下:學習

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Quartz.Impl;
using Common.Logging;
using System.Collections.Specialized;

namespace Quartz.Examples
{
    /// <author>Bill Kratzer</author>
    /// <author>Marko Lahma (.NET)</author>
    public class RemoteServerExample : IExample
    {
        public string Name
        {
            get { return GetType().Name; }
        }

        /// <summary>
        /// This example will start a server that will allow clients to remotely schedule jobs.
        /// </summary>
        /// <author>  James House, Bill Kratzer
        /// </author>
        public virtual void Run()
        {
            ILog log = LogManager.GetLogger(typeof(RemoteServerExample));

            NameValueCollection properties = new NameValueCollection();
            properties["quartz.scheduler.instanceName"] = "RemoteServer";

            // set thread pool info
            properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
            properties["quartz.threadPool.threadCount"] = "5";
            properties["quartz.threadPool.threadPriority"] = "Normal";

            // set remoting exporter
            properties["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz";
            properties["quartz.scheduler.exporter.port"] = "555";
            properties["quartz.scheduler.exporter.bindName"] = "QuartzScheduler";
            properties["quartz.scheduler.exporter.channelType"] = "tcp";
            properties["quartz.scheduler.exporter.channelName"] = "httpQuartz";
            // reject non-local requests
            properties["quartz.scheduler.exporter.rejectRemoteRequests"] = "true";

            ISchedulerFactory sf = new StdSchedulerFactory(properties);
            IScheduler sched = sf.GetScheduler();

            log.Info("------- Initialization Complete -----------");

            log.Info("------- Not scheduling any Jobs - relying on a remote client to schedule jobs --");

            log.Info("------- Starting Scheduler ----------------");

            // start the schedule
            sched.Start();

            log.Info("------- Started Scheduler -----------------");

            log.Info("------- Waiting 5 minutes... ------------");

            // wait to give our jobs a chance to run
            try
            {
                Thread.Sleep(TimeSpan.FromMinutes(5));
            }
            catch (ThreadInterruptedException)
            {
            }

            // shut down the scheduler
            log.Info("------- Shutting Down ---------------------");
            sched.Shutdown(true);
            log.Info("------- Shutdown Complete -----------------");

            SchedulerMetaData metaData = sched.GetMetaData();
            log.Info("Executed " + metaData.NumberOfJobsExecuted + " jobs.");
        }

    }
}
Server

經過綁定的路徑,端口,以及協議。等待客戶端的接入。ui

客戶端:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Quartz.Impl;
using System.Collections.Specialized;
using Common.Logging;

namespace Quartz.Examples
{
    /// <summary> 
    /// This example is a client program that will remotely 
    /// talk to the scheduler to schedule a job.   In this 
    /// example, we will need to use the JDBC Job Store.  The 
    /// client will connect to the JDBC Job Store remotely to 
    /// schedule the job.
    /// 
    /// 在客戶端綁定做業鏈接到服務端調度做業
    /// 1.先開啓服務器進行監聽
    /// 2.再開啓客戶端發送做業到服務器進行調度
    /// </summary>
    /// <author>James House</author>
    /// <author>Bill Kratzer</author>
    /// <author>Marko Lahma (.NET)</author>
    public class RemoteClientExample : IExample
    {
        public virtual void Run()
        {
            ILog log = LogManager.GetLogger(typeof(RemoteClientExample));

            NameValueCollection properties = new NameValueCollection();
            properties["quartz.scheduler.instanceName"] = "RemoteClient";

            // set thread pool info
            properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
            properties["quartz.threadPool.threadCount"] = "5";
            properties["quartz.threadPool.threadPriority"] = "Normal";

            // set remoting exporter
            properties["quartz.scheduler.proxy"] = "true";
            properties["quartz.scheduler.proxy.address"] = "tcp://127.0.0.1:555/QuartzScheduler";

            // First we must get a reference to a scheduler
            ISchedulerFactory sf = new StdSchedulerFactory(properties);
            IScheduler sched = sf.GetScheduler();

            // define the job and ask it to run

            IJobDetail job = JobBuilder.Create<SimpleJob>()
                .WithIdentity("remotelyAddedJob", "default")
                .Build();

            JobDataMap map = job.JobDataMap;
            map.Put("msg", "Your remotely added job has executed!");

            ITrigger trigger = TriggerBuilder.Create()
                .WithIdentity("remotelyAddedTrigger", "default")
                .ForJob(job.Key)
                .WithCronSchedule("/5 * * ? * *")
                .Build();

            // schedule the job
            sched.ScheduleJob(job, trigger);

            log.Info("Remote job scheduled.");
        }

        public string Name
        {
            get { return null; }
        }
    }
}
Cilent

綁定和Server配置信息指定的路徑端口協議等鏈接到服務端,在服務端進行調度實例的調度操做。

注意:先開啓Server端,再啓動服務端。

 

三:數據庫支持

在調度做業不少以後,咱們須要把這些調度數據管理起來,日積月累以後經過人工的方式明顯不夠明智,因此,由數據庫來保存這些調度數據是更好的選擇。

官方提供的各類數據庫腳本下載地址:

https://github.com/quartznet/quartznet/tree/master/database/tables

在代碼中的操做須要留意幾個地方:

            properties["quartz.jobStore.tablePrefix"] = "QRTZ_";// 數據庫表名的前綴
            properties["quartz.jobStore.clustered"] = "false";//是否羣集
            // properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz";

// 指定Sqlserver數據庫引擎 properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz"; //一下三個關於數據源爲"quartz"根據本身的數據庫名稱而定 properties["quartz.jobStore.dataSource"] = "quartz";//個人數據庫取名爲quartz,這裏填寫大家本身的數據庫名稱 properties["quartz.dataSource.quartz.connectionString"] = "Server=(local);Database=quartz;Trusted_Connection=True;";//個人數據庫的鏈接字符串 properties["quartz.dataSource.quartz.provider"] = "SqlServer-20";//個人數據庫的引擎 properties["quartz.dataSource.quartz.maxConnections"] = "5";//個人數據庫最大鏈接數

 

加入你的數據庫名稱叫作「YouSelfDb」,那麼配置信息應該寫成這樣:

            properties["quartz.jobStore.dataSource"] = "YouSelfDb";//個人數據庫取名爲quartz,這裏填寫大家本身的數據庫名稱
            properties["quartz.dataSource.YouSelfDb.connectionString"] = "Server=(local);Database=quartz;Trusted_Connection=True;";//個人數據庫的鏈接字符串
            properties["quartz.dataSource.YouSelfDb.provider"] = "SqlServer-20";//個人數據庫的引擎
            properties["quartz.dataSource.YouSelfDb.maxConnections"] = "5";/

 

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Quartz.Impl;
using System.Collections.Specialized;
using Common.Logging;

namespace Quartz.Examples
{
    public class ClusterExample : IExample
    {
        private static readonly ILog log = LogManager.GetLogger(typeof(ClusterExample));

        public virtual void Run(bool inClearJobs, bool inScheduleJobs)
        {
            NameValueCollection properties = new NameValueCollection();

            properties["quartz.scheduler.instanceName"] = "TestScheduler";
            properties["quartz.scheduler.instanceId"] = "instance_one";
            properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
            properties["quartz.threadPool.threadCount"] = "5";
            properties["quartz.threadPool.threadPriority"] = "Normal";
            properties["quartz.jobStore.misfireThreshold"] = "60000";
            properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
            properties["quartz.jobStore.useProperties"] = "false";
            
            properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
            properties["quartz.jobStore.clustered"] = "false";
            // if running SQLite we need this
            // properties["quartz.jobStore.lockHandler.type"] = "Quartz.Impl.AdoJobStore.UpdateLockRowSemaphore, Quartz";
            properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz";
            //一下三個關於數據源爲"quartz"根據本身的數據庫名稱而定
            properties["quartz.jobStore.dataSource"] = "quartz";
            properties["quartz.dataSource.quartz.connectionString"] = "Server=(local);Database=quartz;Trusted_Connection=True;";
            properties["quartz.dataSource.quartz.provider"] = "SqlServer-20";
            properties["quartz.dataSource.quartz.maxConnections"] = "5";//最大鏈接數
            // First we must get a reference to a scheduler
            ISchedulerFactory sf = new StdSchedulerFactory(properties);
            IScheduler sched = sf.GetScheduler();

            if (inClearJobs)
            {
                log.Warn("***** Deleting existing jobs/triggers *****");
                sched.Clear();
            }

            log.Info("------- Initialization Complete -----------");

            if (inScheduleJobs)
            {
                log.Info("------- Scheduling Jobs ------------------");

                string schedId = sched.SchedulerInstanceId;

                int count = 1;


                IJobDetail job = JobBuilder.Create<SimpleRecoveryJob>()
                    .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where
                    .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down...
                    .Build();


                ISimpleTrigger trigger = (ISimpleTrigger)TriggerBuilder.Create()
                                                              .WithIdentity("triger_" + count, schedId)
                                                              .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
                                                              .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(5)))
                                                              .Build();

                log.InfoFormat("{0} will run at: {1} and repeat: {2} times, every {3} seconds", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds);

                count++;


                job = JobBuilder.Create<SimpleRecoveryJob>()
                    .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where
                    .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down...
                    .Build();

                trigger = (ISimpleTrigger)TriggerBuilder.Create()
                                               .WithIdentity("triger_" + count, schedId)
                                               .StartAt(DateBuilder.FutureDate(2, IntervalUnit.Second))
                                               .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(5)))
                                               .Build();

                log.Info(string.Format("{0} will run at: {1} and repeat: {2} times, every {3} seconds", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds));
                sched.ScheduleJob(job, trigger);

                count++;


                job = JobBuilder.Create<SimpleRecoveryStatefulJob>()
                    .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where
                    .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down...
                    .Build();

                trigger = (ISimpleTrigger)TriggerBuilder.Create()
                                               .WithIdentity("triger_" + count, schedId)
                                               .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
                                               .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(3)))
                                               .Build();

                log.Info(string.Format("{0} will run at: {1} and repeat: {2} times, every {3} seconds", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval.TotalSeconds));
                sched.ScheduleJob(job, trigger);

                count++;

                job = JobBuilder.Create<SimpleRecoveryJob>()
                    .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where
                    .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down...
                    .Build();

                trigger = (ISimpleTrigger)TriggerBuilder.Create()
                                               .WithIdentity("triger_" + count, schedId)
                                               .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
                                               .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromSeconds(4)))
                                               .Build();

                log.Info(string.Format("{0} will run at: {1} & repeat: {2}/{3}", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval));
                sched.ScheduleJob(job, trigger);

                count++;


                job = JobBuilder.Create<SimpleRecoveryJob>()
                    .WithIdentity("job_" + count, schedId) // put triggers in group named after the cluster node instance just to distinguish (in logging) what was scheduled from where
                    .RequestRecovery() // ask scheduler to re-execute this job if it was in progress when the scheduler went down...
                    .Build();

                trigger = (ISimpleTrigger)TriggerBuilder.Create()
                                               .WithIdentity("triger_" + count, schedId)
                                               .StartAt(DateBuilder.FutureDate(1, IntervalUnit.Second))
                                               .WithSimpleSchedule(x => x.WithRepeatCount(20).WithInterval(TimeSpan.FromMilliseconds(4500)))
                                               .Build();

                log.Info(string.Format("{0} will run at: {1} & repeat: {2}/{3}", job.Key, trigger.GetNextFireTimeUtc(), trigger.RepeatCount, trigger.RepeatInterval));
                sched.ScheduleJob(job, trigger);
            }

            // jobs don't start firing until start() has been called...
            log.Info("------- Starting Scheduler ---------------");
            sched.Start();
            log.Info("------- Started Scheduler ----------------");

            log.Info("------- Waiting for one hour... ----------");

            Thread.Sleep(TimeSpan.FromHours(1));


            log.Info("------- Shutting Down --------------------");
            sched.Shutdown();
            log.Info("------- Shutdown Complete ----------------");
        }

        public string Name
        {
            get { throw new NotImplementedException(); }
        }

        public void Run()
        {
            bool clearJobs = true;
            bool scheduleJobs = true;
            /* TODO
            for (int i = 0; i < args.Length; i++)
            {
                if (args[i].ToUpper().Equals("clearJobs".ToUpper()))
                {
                    clearJobs = true;
                }
                else if (args[i].ToUpper().Equals("dontScheduleJobs".ToUpper()))
                {
                    scheduleJobs = false;
                }
            }
            */
            ClusterExample example = new ClusterExample();
            example.Run(clearJobs, scheduleJobs);
        }
    }
}
DbSupport

打開你的數據庫YouSelfDb,這個庫下面有相應的表存儲相應的調度數據:好比

QRTZ_TRIGGERS表將會保存具體的做業調度信息,包括做業,觸發器等信息。

QRTZ_JOB_DETAILS表保存做業數據。

QRTZ_SIMPLE_TRIGGERS表保存SimpleTirgger類的觸發器數據。

QRTZ_CALENDARS表保存日曆觸發器數據。

QRTZ_CRON_TRIGGERS表保存Cron表達式的tirgger觸發器數據。

等等。。。。。(不一 一例舉了,大家本身看吧)

 

四,集羣Cluster

在我看來所謂的集羣就是一堆機器一塊兒工做。

一堆做業一塊兒調度,但做業與做業之間不會相互干擾,一個做業的中止運行不會影響到另一個做業的正常工做。

經過properties["quartz.jobStore.clustered"] = "true";啓動集羣

而集羣的代碼例子和上面的DbSupport是同樣的,這裏再也不重複貼出。

相關文章
相關標籤/搜索