APScheduler 3.0.1淺析

簡介

APScheduler是一個小巧而強大的Python類庫,經過它你能夠實現相似Unix系統cronjob相似的定時任務系統。使用之餘,閱讀一下源碼,一方面有助於更好的使用它,另外一方面,我的認爲aps的架構設計質量很高,閱讀它對於提高軟件開發的sense頗有幫助。python

組成

APScheduler整個系統能夠說由這五個概念組成:react

  • scheduler:控制器,能夠看作整個系統的driver,外部世界經過它來實現任務(Job)的增刪改查管理。根據IO模式的不一樣,aps提供了多種scheduler實現。
  • job:描述一個任務自己。
  • jobstore:任務持久化倉庫。aps提供了內存、redis、mongodb、sqlalchemy幾種store
  • executor:執行任務的模塊。根據不一樣的IO模型有多種executor選擇。
  • trigger:描述一個任務什麼時候被觸發,有按日期、按時間間隔、按cronjob描述式三種觸發方式

這樣的劃分充分發揮了軟件設計中抽象的威力,咱們下面對每一個模塊進行描述redis

scheduler

BaseScheduler類是全部scheduler的抽象基類,它的初始化代碼是這樣的:sql

     def __init__(self, gconfig={}, **options):
         super(BaseScheduler, self).__init__()
         self._executors = {}
         self._executors_lock = self._create_lock()
         self._jobstores = {}
         self._jobstores_lock = self._create_lock()
         self._listeners = []
         self._listeners_lock = self._create_lock()
         self._pending_jobs = []
         self.configure(gconfig, **options)

能夠看到一個scheduler維護了本身的executor和jobstore表,經過configure方法進行初始化。在configure中,scheduler讀取傳入的配置,對executors和jobstores進行初始化,一個典型的配置是這樣的:mongodb

 APS_SCHEDULER_CONFIG = {
     'jobstores': {
         'default': {'type': 'sqlalchemy', 'url': 'postgres://127.0.0.1:5432/optimus'},
     },
     'executors': {
         'default': {'type': 'processpool', 'max_workers': 10}
     },
     'job_defaults': {
         'coalesce': True,
         'max_instances': 5,
         'misfire_grace_time': 30
     },
     'timezone': 'Asia/Shanghai'
 }

若是咱們把APS_SCHEDULER_CONFIG做爲options傳入給一個scheduler,會產生什麼結果呢?首先,咱們添加了一個默認(名叫default)的jobstore,它的具體實現類型是sqlalchemy,數據庫鏈接url是指向一個本地postgresql數據庫,也就是說添加到這個scheduler的job會默認使用這個jobstore進行存儲。其次,咱們添加了一個默認的executor,他是一個多進程實現,也就是說每一個job在運行時,是經過一個進程池來做爲worker實際執行的,這個進程池最大size是10。job_defaults參數定義了一些特殊行爲:數據庫

  • coalesce:當因爲某種緣由致使某個job積攢了好幾回沒有實際運行(好比說系統掛了5分鐘後恢復,有一個任務是每分鐘跑一次的,按道理說這5分鐘內原本是「計劃」運行5次的,但實際沒有執行),若是coalesce爲True,下次這個job被submit給executor時,只會執行1次,也就是最後此次,若是爲False,那麼會執行5次(不必定,由於還有其餘條件,看後面misfire_grace_time的解釋)
  • max_instance: 就是說同一個job同一時間最多有幾個實例再跑,好比一個耗時10分鐘的job,被指定每分鐘運行1次,若是咱們max_instance值爲5,那麼在第6~10分鐘上,新的運行實例不會被執行,由於已經有5個實例在跑了
  • misfire_grace_time:設想和上述coalesce相似的場景,若是一個job原本14:00有一次執行,可是因爲某種緣由沒有被調度上,如今14:01了,這個14:00的運行實例被提交時,會檢查它預訂運行的時間和當下時間的差值(這裏是1分鐘),大於咱們設置的30秒限制,那麼這個運行實例不會被執行。

這裏還須要指出的一點是,爲何scheduler的配置能夠寫成這種json形式,而scheduler會正確地找到對應的實現類進行初始化?這裏運用了兩個技巧:json

entry point

用python egg的機制把各個組件註冊了成了entry point,以下所示架構

 [apscheduler.executors]
 asyncio = apscheduler.executors.asyncio:AsyncIOExecutor
 debug = apscheduler.executors.debug:DebugExecutor
 gevent = apscheduler.executors.gevent:GeventExecutor
 processpool = apscheduler.executors.pool:ProcessPoolExecutor
 threadpool = apscheduler.executors.pool:ThreadPoolExecutor
 twisted = apscheduler.executors.twisted:TwistedExecutor

 [apscheduler.jobstores]
 memory = apscheduler.jobstores.memory:MemoryJobStore
 mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore
 redis = apscheduler.jobstores.redis:RedisJobStore
 sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore

 [apscheduler.triggers]
 cron = apscheduler.triggers.cron:CronTrigger
 date = apscheduler.triggers.date:DateTrigger
 interval = apscheduler.triggers.interval:IntervalTrigger

這樣,在scheduler模塊中就能夠用entry point的名稱反查出對應組件async

     _trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
     _trigger_classes = {}
     _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors'))
     _executor_classes = {}
     _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores'))
     _jobstore_classes = {}
     _stopped = True

從而實現了一個便利的插件機制函數

ref_to_obj

另外經過一個加載函數完成"apscheduler.executors.pool:ThreadPoolExecutor"字符串到ThreadPoolExecutor類對象的查詢

 def ref_to_obj(ref):
     """
     Returns the object pointed to by ``ref``.

     :type ref: str
     """

     if not isinstance(ref, six.string_types):
         raise TypeError('References must be strings')
     if ':' not in ref:
         raise ValueError('Invalid reference')

     modulename, rest = ref.split(':', 1)
     try:
         obj = __import__(modulename)
     except ImportError:
         raise LookupError('Error resolving reference %s: could not import module' % ref)

     try:
         for name in modulename.split('.')[1:] + rest.split('.'):
             obj = getattr(obj, name)
         return obj
     except Exception:
         raise LookupError('Error resolving reference %s: error looking up object' % ref)

 

scheduler的主循環(main_loop),其實就是反覆檢查是否是有到時須要執行的任務,完成一次檢查的函數是_process_jobs, 這個函數作這麼幾件事:

  1. 詢問本身的每個jobstore,有沒有到期須要執行的任務(jobstore.get_due_jobs())
  2. 若是有,計算這些job中每一個job須要運行的時間點(run_times = job._get_run_times(now))若是run_times有多個,這種狀況咱們上面討論過,有coalesce檢查
  3. 提交給executor排期運行(executor.submit_job(job, run_times))

那麼在這個_process_jobs的邏輯,何時調用合適呢?若是不間斷地調用,而實際上沒有要執行的job,是一種浪費。每次掉用_process_jobs後,其實能夠預先判斷一下,下一次要執行的job(離如今最近的)還要多長時間,做爲返回值告訴main_loop, 這時主循環就能夠去睡一覺,等大約這麼長時間後再喚醒,執行下一次_process_jobs。這裏喚醒的機制就會有IO模型的區別了

scheduler因爲IO模型的不一樣,能夠有多種實現,如

  • BlockingScheduler:main_loop就在當前進程的主線程內運行,因此調用start函數後會阻塞當前線程。經過一個threading.Event條件變量對象完成scheduler的定時喚醒。
  • BackgroundScheduler:和BlockingScheduler基本同樣,除了main_loop放在了單獨線程裏,因此調用start後主線程不會阻塞
  • AsyncIOScheduler:使用asyncio做爲IO模型的scheduler,和AsyncIOExecutor配合使用,用asynio中event_loop的call_later完成定時喚醒
  • GeventScheduler:和BlockingScheduler基本同樣,使用gevent做爲IO模型,和GeventExecutor配合使用
  • QtScheduler:使用QTimer完成定時喚醒
  • TornadoScheduler:使用tornado的IO模型,用ioloop.add_timeout完成定時喚醒
  • TwistedScheduler:配合TwistedExecutor,用reactor.callLater完成定時喚醒

JobStore

jobstore提供給scheduler一個序列化jobs的統一抽象,提供對scheduler中job的增刪改查接口,根據存儲backend的不一樣,分如下幾種

  • MemoryJobStore:沒有序列化,jobs就存在內存裏,增刪改查也都是在內存中操做
  • SQLAlchemyJobStore:全部sqlalchemy支持的數據庫均可以作爲backend,增刪改查操做轉化爲對應backend的sql語句
  • MongoDBJobStore:用mongodb做backend
  • RedisJobStore: 用redis做backend

除了MemoryJobStore外,其餘幾種都使用pickle作序列化工具,因此這裏要指出一點,若是你不是在用內存作jobstore,那麼必須確保你提供給job的可執行函數必須是能夠被全局訪問的,也就是能夠經過ref_to_obj反查出來的,不然沒法序列化。

使用數據庫作jobstore,就會發現,其實建立了一張有三個域的的jobs表,分別是id, next_run_time, job_state,其中job_state是job對象pickle序列化後的二進制,而id和next_run_time則是支持job的兩類查詢(按id和按最近運行時間)

 

Executor

aps把任務最終的執行機制也抽象了出來,能夠根據IO模型選配,不須要講太多,最經常使用的是threadpool和processpoll兩種(來自concurrent.futures的線程/進程池)。

不一樣類型的executor實現本身的_do_submit_job,完成一次實際的任務實例執行。以線程/進程池實現爲例

     def _do_submit_job(self, job, run_times):
         def callback(f):
             exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
                        (f.exception(), getattr(f.exception(), '__traceback__', None)))
             if exc:
                 self._run_job_error(job.id, exc, tb)
             else:
                 self._run_job_success(job.id, f.result())

         f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
         f.add_done_callback(callback)

Trigger

trigger是抽象出了「一個job是什麼時候被觸發」這個策略,每種trigger實現本身的get_next_fire_time函數

     @abstractmethod
     def get_next_fire_time(self, previous_fire_time, now):
         """
         Returns the next datetime to fire on, If no such datetime can be calculated, returns ``None``.

         :param datetime.datetime previous_fire_time: the previous time the trigger was fired
         :param datetime.datetime now: current datetime
         """

aps提供的trigger包括:

  • date:一次性指定日期
  • interval:在某個時間範圍內間隔多長時間執行一次
  • cron:和unix crontab格式兼容,最爲強大

總結

簡要介紹了apscheduler類庫的組成,強調抽象概念的理解

相關文章
相關標籤/搜索