APScheduler是一個小巧而強大的Python類庫,經過它你能夠實現相似Unix系統cronjob相似的定時任務系統。使用之餘,閱讀一下源碼,一方面有助於更好的使用它,另外一方面,我的認爲aps的架構設計質量很高,閱讀它對於提高軟件開發的sense頗有幫助。python
APScheduler整個系統能夠說由這五個概念組成:react
這樣的劃分充分發揮了軟件設計中抽象的威力,咱們下面對每一個模塊進行描述redis
BaseScheduler類是全部scheduler的抽象基類,它的初始化代碼是這樣的:sql
def __init__(self, gconfig={}, **options): super(BaseScheduler, self).__init__() self._executors = {} self._executors_lock = self._create_lock() self._jobstores = {} self._jobstores_lock = self._create_lock() self._listeners = [] self._listeners_lock = self._create_lock() self._pending_jobs = [] self.configure(gconfig, **options)
能夠看到一個scheduler維護了本身的executor和jobstore表,經過configure方法進行初始化。在configure中,scheduler讀取傳入的配置,對executors和jobstores進行初始化,一個典型的配置是這樣的:mongodb
APS_SCHEDULER_CONFIG = { 'jobstores': { 'default': {'type': 'sqlalchemy', 'url': 'postgres://127.0.0.1:5432/optimus'}, }, 'executors': { 'default': {'type': 'processpool', 'max_workers': 10} }, 'job_defaults': { 'coalesce': True, 'max_instances': 5, 'misfire_grace_time': 30 }, 'timezone': 'Asia/Shanghai' }
若是咱們把APS_SCHEDULER_CONFIG做爲options傳入給一個scheduler,會產生什麼結果呢?首先,咱們添加了一個默認(名叫default)的jobstore,它的具體實現類型是sqlalchemy,數據庫鏈接url是指向一個本地postgresql數據庫,也就是說添加到這個scheduler的job會默認使用這個jobstore進行存儲。其次,咱們添加了一個默認的executor,他是一個多進程實現,也就是說每一個job在運行時,是經過一個進程池來做爲worker實際執行的,這個進程池最大size是10。job_defaults參數定義了一些特殊行爲:數據庫
這裏還須要指出的一點是,爲何scheduler的配置能夠寫成這種json形式,而scheduler會正確地找到對應的實現類進行初始化?這裏運用了兩個技巧:json
用python egg的機制把各個組件註冊了成了entry point,以下所示架構
[apscheduler.executors] asyncio = apscheduler.executors.asyncio:AsyncIOExecutor debug = apscheduler.executors.debug:DebugExecutor gevent = apscheduler.executors.gevent:GeventExecutor processpool = apscheduler.executors.pool:ProcessPoolExecutor threadpool = apscheduler.executors.pool:ThreadPoolExecutor twisted = apscheduler.executors.twisted:TwistedExecutor [apscheduler.jobstores] memory = apscheduler.jobstores.memory:MemoryJobStore mongodb = apscheduler.jobstores.mongodb:MongoDBJobStore redis = apscheduler.jobstores.redis:RedisJobStore sqlalchemy = apscheduler.jobstores.sqlalchemy:SQLAlchemyJobStore [apscheduler.triggers] cron = apscheduler.triggers.cron:CronTrigger date = apscheduler.triggers.date:DateTrigger interval = apscheduler.triggers.interval:IntervalTrigger
這樣,在scheduler模塊中就能夠用entry point的名稱反查出對應組件async
_trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers')) _trigger_classes = {} _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors')) _executor_classes = {} _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores')) _jobstore_classes = {} _stopped = True
從而實現了一個便利的插件機制函數
另外經過一個加載函數完成"apscheduler.executors.pool:ThreadPoolExecutor"字符串到ThreadPoolExecutor類對象的查詢
def ref_to_obj(ref): """ Returns the object pointed to by ``ref``. :type ref: str """ if not isinstance(ref, six.string_types): raise TypeError('References must be strings') if ':' not in ref: raise ValueError('Invalid reference') modulename, rest = ref.split(':', 1) try: obj = __import__(modulename) except ImportError: raise LookupError('Error resolving reference %s: could not import module' % ref) try: for name in modulename.split('.')[1:] + rest.split('.'): obj = getattr(obj, name) return obj except Exception: raise LookupError('Error resolving reference %s: error looking up object' % ref)
scheduler的主循環(main_loop),其實就是反覆檢查是否是有到時須要執行的任務,完成一次檢查的函數是_process_jobs, 這個函數作這麼幾件事:
那麼在這個_process_jobs的邏輯,何時調用合適呢?若是不間斷地調用,而實際上沒有要執行的job,是一種浪費。每次掉用_process_jobs後,其實能夠預先判斷一下,下一次要執行的job(離如今最近的)還要多長時間,做爲返回值告訴main_loop, 這時主循環就能夠去睡一覺,等大約這麼長時間後再喚醒,執行下一次_process_jobs。這裏喚醒的機制就會有IO模型的區別了
scheduler因爲IO模型的不一樣,能夠有多種實現,如
jobstore提供給scheduler一個序列化jobs的統一抽象,提供對scheduler中job的增刪改查接口,根據存儲backend的不一樣,分如下幾種
除了MemoryJobStore外,其餘幾種都使用pickle作序列化工具,因此這裏要指出一點,若是你不是在用內存作jobstore,那麼必須確保你提供給job的可執行函數必須是能夠被全局訪問的,也就是能夠經過ref_to_obj反查出來的,不然沒法序列化。
使用數據庫作jobstore,就會發現,其實建立了一張有三個域的的jobs表,分別是id, next_run_time, job_state,其中job_state是job對象pickle序列化後的二進制,而id和next_run_time則是支持job的兩類查詢(按id和按最近運行時間)
aps把任務最終的執行機制也抽象了出來,能夠根據IO模型選配,不須要講太多,最經常使用的是threadpool和processpoll兩種(來自concurrent.futures的線程/進程池)。
不一樣類型的executor實現本身的_do_submit_job,完成一次實際的任務實例執行。以線程/進程池實現爲例
def _do_submit_job(self, job, run_times): def callback(f): exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else (f.exception(), getattr(f.exception(), '__traceback__', None))) if exc: self._run_job_error(job.id, exc, tb) else: self._run_job_success(job.id, f.result()) f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) f.add_done_callback(callback)
trigger是抽象出了「一個job是什麼時候被觸發」這個策略,每種trigger實現本身的get_next_fire_time函數
@abstractmethod def get_next_fire_time(self, previous_fire_time, now): """ Returns the next datetime to fire on, If no such datetime can be calculated, returns ``None``. :param datetime.datetime previous_fire_time: the previous time the trigger was fired :param datetime.datetime now: current datetime """
aps提供的trigger包括:
簡要介紹了apscheduler類庫的組成,強調抽象概念的理解