Python定時任務框架:APScheduler源碼剖析(一)

前言

APScheduler是Python中知名的定時任務框架,能夠很方面的知足定時執行或週期性執行程序任務等需求,相似於Linux上的crontab,但比crontab要更增強大,該框架不只能夠添加、刪除定時任務,還提供多種持久化任務的功能。python

APScheduler弱分佈式的框架,由於每一個任務對象都存儲在當前節點中,只能經過人肉的形式實現分佈式,如利用Redis來作。redis

第一次接觸APScheduler會發它有不少概念,我當年第一次接觸時就是由於概念太多,直接用crontab多舒服,但如今公司項目不少都基於APScheduler實現,因此來簡單扒一扒的它的源碼。mongodb

前置概念

用最簡單的語言提一下APScheduler中的關鍵概念。後端

  • Job:任務對象,就是你要執行的任務
  • JobStores:任務存儲方式,默認是存儲在內存中,還能夠支持redis、mongodb等
  • Executors:執行器,就是執行任務的東西
  • Trigger:觸發器,到達某個條件觸發相應的調用邏輯
  • Scheduler:調度器,將上面幾個部分鏈接起來的東西

APScheduler提供多個Scheduler,不一樣Scheduler適用於不一樣的情景,目前我最多見的就是BackgroundScheduler後臺調度器,該調度器適合要求在後臺運行程序的調度。app

還有多種其餘調度器:框架

BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用。async

AsyncIOScheduler:適合於使用 asyncio 框架的狀況分佈式

GeventScheduler: 適合於使用 gevent 框架的狀況模塊化

TornadoScheduler: 適合於使用 Tornado 框架的應用函數

TwistedScheduler: 適合使用 Twisted 框架的應用

QtScheduler: 適合使用 QT 的狀況

本文只剖析 BackgroundScheduler 相關的邏輯,先簡單看看官方example,而後以此爲入口逐層剖析。

剖析BackgroundScheduler

官方example代碼以下。

from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler

def tick():
    print('Tick! The time is: %s' % datetime.now())

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.add_job(tick, 'interval', seconds=3) # 添加一個任務,3秒後運行
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    try:
        # 這是在這裏模擬應用程序活動(使主線程保持活動狀態)。
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        # 關閉調度器
        scheduler.shutdown()

上述代碼很是簡單,先經過BackgroundScheduler方法實例化一個調度器,而後調用add_job方法,將須要執行的任務添加到JobStores中,默認就是存到內存中,更具體點,就是存到一個dict中,最後經過start方法啓動調度器,APScheduler就會每隔3秒,觸發名爲interval的觸發器,從而讓調度器調度默認的執行器執行tick方法中的邏輯。

當程序所有執行完後,調用shutdown方法關閉調度器。

BackgroundScheduler實際上是基於線程形式構成的,而線程就有守護線程的概念,若是啓動了守護線程模式,調度器不必定要關閉。

先看一下BackgroundScheduler類的源碼。

# apscheduler/schedulers/background.py

class BackgroundScheduler(BlockingScheduler):

    _thread = None

    def _configure(self, config):
        self._daemon = asbool(config.pop('daemon', True))
        super()._configure(config)

    def start(self, *args, **kwargs):
        # 建立事件通知
        # 多個線程能夠等待某個事件的發生,在事件發生後,全部的線程都會被激活。
        self._event = Event() 
        BaseScheduler.start(self, *args, **kwargs)
        self._thread = Thread(target=self._main_loop, name='APScheduler')
        # 設置爲守護線程,Python主線程運行完後,直接結束不會理會守護線程的狀況,
        # 若是是非守護線程,Python主線程會在運行完後,等待其餘非守護線程運行完後,再結束
        self._thread.daemon = self._daemon # daemon 是否爲守護線程
        self._thread.start() # 啓動線程

    def shutdown(self, *args, **kwargs):
        super().shutdown(*args, **kwargs)
        self._thread.join()
        del self._thread

上述代碼中,給出了詳細的註釋,簡單解釋一下。

_configure方法主要用於參數設置,這裏定義了self._daemon 這個參數,而後經過super方法調用父類的_configure方法。

start方法就是其啓動方法,邏輯也很是簡單,建立了線程事件Event,線程事件是一種線程同步機制,你扒開看其源碼,會發現線程事件是基於條件鎖來實現的,線程事件提供了set()、wait()、clear()這3個主要方法。

  • set()方法會將事件標誌狀態設置爲true。
  • clear()方法將事件標誌狀態設置爲false
  • wait()方法會阻塞線程,直到事件標誌狀態爲true。

建立了線程事件後,調用了其父類的start()方法,該方法纔是真正的啓動方法,暫時放放,啓動完後,經過Thread方法建立一個線程,線程的目標函數爲self._main_loop,它是調度器的主訓練,調度器不關閉,就會一直執行主循環中的邏輯,從而實現APScheduler各類功能,是很是重要方法,一樣,暫時放放。建立完後,啓動線程就ok了。

線程建立完後,定義線程的daemon,若是daemon爲True,則表示當前線程爲守護線程,反之爲非守護線程。

簡單提一下,若是線程爲守護線程,那麼Python主線程邏輯執行完後,會直接退出,不會理會守護線程,若是爲非守護線程,Python主線程執行完後,要等其餘全部非守護線程都執行完纔會退出。

shutdown方法先調用父類的shutdown方法,而後調用join方法,最後將線程對象直接del刪除。

BackgroundScheduler類的代碼看完了,回看一開始的example代碼,經過BackgroundScheduler實例化調度器後,接着調用的是add_job方法,向add_job方法中添加了3個參數,分別是想要定時執行的tick方法,觸發器trigger的名稱,叫interval,而這個觸發器的參數爲seconds=3。

是否能夠將觸發器trigger的名稱改爲任意字符呢?這是不能夠的,APScheduler在這裏其實使用了Python中的entry point技巧,若是你通過過作個Python包並將其打包上傳到PYPI的過程,你對entry point應該有印象。其實entry point不止可能永遠打包,還能夠用於模塊化插件體系結構,這個內容較多,放到後面再聊。

簡單而言,add_job()方法要傳入相應觸發器名稱,interval會對應到apscheduler.triggers.interval.IntervalTrigger類上,seconds參數就是該類的參數。

剖析add_job方法

add_job方法源碼以下。

# apscheduler/schedulers/base.py/BaseScheduler

    def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
                misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
                next_run_time=undefined, jobstore='default', executor='default',
                replace_existing=False, **trigger_args):
        job_kwargs = {
            'trigger': self._create_trigger(trigger, trigger_args),
            'executor': executor,
            'func': func,
            'args': tuple(args) if args is not None else (),
            'kwargs': dict(kwargs) if kwargs is not None else {},
            'id': id,
            'name': name,
            'misfire_grace_time': misfire_grace_time,
            'coalesce': coalesce,
            'max_instances': max_instances,
            'next_run_time': next_run_time
        }
        # 過濾
        job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
                          value is not undefined)
        # 實例化具體的任務對象
        job = Job(self, **job_kwargs)

        # Don't really add jobs to job stores before the scheduler is up and running
        with self._jobstores_lock:
            if self.state == STATE_STOPPED:
                self._pending_jobs.append((job, jobstore, replace_existing))
                self._logger.info('Adding job tentatively -- it will be properly scheduled when '
                                  'the scheduler starts')
            else:
                self._real_add_job(job, jobstore, replace_existing)

        return job

add_job方法代碼很少,一開始,建立了job_kwargs字典,其中含有觸發器、執行器等,簡單理一理。

  • trigger觸發器,經過self._create_trigger()方法建立,該方法須要兩個參數,代碼中的trigger其實就是interval字符串,trigger_args則爲對應的參數。
  • exectuor執行器目前爲default,這個後面再聊。
  • func回調方法,就是咱們本身真正但願被執行的邏輯,觸發器會觸發調度器,調度器會調用執行器去執行的具體邏輯。
  • misfire_grace_time:其註釋解釋爲「指定運行時間後幾秒仍運行該任務運行」,閱讀相關文檔才能夠理解,好比一個任務,本來12:00運行,但12:00因爲某些緣由沒有被調度,如今12:30分了,此時調度時會判斷當前時間與預調度時間的差值,若是misfire_grace_time設置爲20,則不會調度執行這個此前調度失敗的任務,若是misfire_grace_time設置爲60,則會調度。
  • coalesce:若是某個任務由於某些緣由沒有實際運行,從而形成了任務堆積,好比堆積了10個相同的人,coalesce爲True,則只執行最後一層,若是coalesce爲False,則嘗試連續執行10次。
  • max_instances:經過任務同一時間最多能夠有幾個實例在運行
  • next_run_time:任務下次運行時間

接着作了一個過濾,而後將參數傳入Job類,完成任務對象的實例化。

隨後的邏輯比較簡單,先判斷是否能夠拿到self._jobstores_lock鎖,它實際上是一個可重入鎖,Python中,可重入鎖的實現基於普通互斥鎖,只是多了一個變量用於計數,每加一次鎖,該變量加一,每解一次鎖該變量減一,只有在該變量爲0時,才真正去釋放互斥鎖。

獲取到鎖後,先判斷當前調度器的狀態,若是是STATE_STOPPED(中止狀態)則將任務添加到_pending_jobs待定列表中,若是不是中止狀態,則調用_real_add_job方法,隨後返回job對象。

其實_real_add_job方法纔是真正的將任務對象job添加到指定存儲後端的方法。

當任務對象添加到指定存儲後端後(默認直接存到內存中),調度器就會去取來執行。

回到example代碼中,執行完調度器的add_job方法後,緊接着便執行調度器的start方法。

結尾

考慮字數,本文就先到這裏,後面會繼續剖析APScheduler。

若是文章對你有所幫助,點擊「在看」支持二兩,下篇文章見。

WX20191016-193450@2x.png

相關文章
相關標籤/搜索