APScheduler是Python中知名的定時任務框架,能夠很方面的知足定時執行或週期性執行程序任務等需求,相似於Linux上的crontab,但比crontab要更增強大,該框架不只能夠添加、刪除定時任務,還提供多種持久化任務的功能。python
APScheduler弱分佈式的框架,由於每一個任務對象都存儲在當前節點中,只能經過人肉的形式實現分佈式,如利用Redis來作。redis
第一次接觸APScheduler會發它有不少概念,我當年第一次接觸時就是由於概念太多,直接用crontab多舒服,但如今公司項目不少都基於APScheduler實現,因此來簡單扒一扒的它的源碼。mongodb
用最簡單的語言提一下APScheduler中的關鍵概念。後端
APScheduler提供多個Scheduler,不一樣Scheduler適用於不一樣的情景,目前我最多見的就是BackgroundScheduler後臺調度器,該調度器適合要求在後臺運行程序的調度。app
還有多種其餘調度器:框架
BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用。async
AsyncIOScheduler:適合於使用 asyncio 框架的狀況分佈式
GeventScheduler: 適合於使用 gevent 框架的狀況模塊化
TornadoScheduler: 適合於使用 Tornado 框架的應用函數
TwistedScheduler: 適合使用 Twisted 框架的應用
QtScheduler: 適合使用 QT 的狀況
本文只剖析 BackgroundScheduler 相關的邏輯,先簡單看看官方example,而後以此爲入口逐層剖析。
官方example代碼以下。
from datetime import datetime import time import os from apscheduler.schedulers.background import BackgroundScheduler def tick(): print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.add_job(tick, 'interval', seconds=3) # 添加一個任務,3秒後運行 scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: # 這是在這裏模擬應用程序活動(使主線程保持活動狀態)。 while True: time.sleep(2) except (KeyboardInterrupt, SystemExit): # 關閉調度器 scheduler.shutdown()
上述代碼很是簡單,先經過BackgroundScheduler方法實例化一個調度器,而後調用add_job方法,將須要執行的任務添加到JobStores中,默認就是存到內存中,更具體點,就是存到一個dict中,最後經過start方法啓動調度器,APScheduler就會每隔3秒,觸發名爲interval的觸發器,從而讓調度器調度默認的執行器執行tick方法中的邏輯。
當程序所有執行完後,調用shutdown方法關閉調度器。
BackgroundScheduler實際上是基於線程形式構成的,而線程就有守護線程的概念,若是啓動了守護線程模式,調度器不必定要關閉。
先看一下BackgroundScheduler類的源碼。
# apscheduler/schedulers/background.py class BackgroundScheduler(BlockingScheduler): _thread = None def _configure(self, config): self._daemon = asbool(config.pop('daemon', True)) super()._configure(config) def start(self, *args, **kwargs): # 建立事件通知 # 多個線程能夠等待某個事件的發生,在事件發生後,全部的線程都會被激活。 self._event = Event() BaseScheduler.start(self, *args, **kwargs) self._thread = Thread(target=self._main_loop, name='APScheduler') # 設置爲守護線程,Python主線程運行完後,直接結束不會理會守護線程的狀況, # 若是是非守護線程,Python主線程會在運行完後,等待其餘非守護線程運行完後,再結束 self._thread.daemon = self._daemon # daemon 是否爲守護線程 self._thread.start() # 啓動線程 def shutdown(self, *args, **kwargs): super().shutdown(*args, **kwargs) self._thread.join() del self._thread
上述代碼中,給出了詳細的註釋,簡單解釋一下。
_configure方法主要用於參數設置,這裏定義了self._daemon 這個參數,而後經過super方法調用父類的_configure方法。
start方法就是其啓動方法,邏輯也很是簡單,建立了線程事件Event,線程事件是一種線程同步機制,你扒開看其源碼,會發現線程事件是基於條件鎖來實現的,線程事件提供了set()、wait()、clear()這3個主要方法。
建立了線程事件後,調用了其父類的start()方法,該方法纔是真正的啓動方法,暫時放放,啓動完後,經過Thread方法建立一個線程,線程的目標函數爲self._main_loop
,它是調度器的主訓練,調度器不關閉,就會一直執行主循環中的邏輯,從而實現APScheduler各類功能,是很是重要方法,一樣,暫時放放。建立完後,啓動線程就ok了。
線程建立完後,定義線程的daemon,若是daemon爲True,則表示當前線程爲守護線程,反之爲非守護線程。
簡單提一下,若是線程爲守護線程,那麼Python主線程邏輯執行完後,會直接退出,不會理會守護線程,若是爲非守護線程,Python主線程執行完後,要等其餘全部非守護線程都執行完纔會退出。
shutdown方法先調用父類的shutdown方法,而後調用join方法,最後將線程對象直接del刪除。
BackgroundScheduler類的代碼看完了,回看一開始的example代碼,經過BackgroundScheduler實例化調度器後,接着調用的是add_job方法,向add_job方法中添加了3個參數,分別是想要定時執行的tick方法,觸發器trigger的名稱,叫interval,而這個觸發器的參數爲seconds=3。
是否能夠將觸發器trigger的名稱改爲任意字符呢?這是不能夠的,APScheduler在這裏其實使用了Python中的entry point技巧,若是你通過過作個Python包並將其打包上傳到PYPI的過程,你對entry point應該有印象。其實entry point不止可能永遠打包,還能夠用於模塊化插件體系結構,這個內容較多,放到後面再聊。
簡單而言,add_job()方法要傳入相應觸發器名稱,interval會對應到apscheduler.triggers.interval.IntervalTrigger類上,seconds參數就是該類的參數。
add_job方法源碼以下。
# apscheduler/schedulers/base.py/BaseScheduler def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args): job_kwargs = { 'trigger': self._create_trigger(trigger, trigger_args), 'executor': executor, 'func': func, 'args': tuple(args) if args is not None else (), 'kwargs': dict(kwargs) if kwargs is not None else {}, 'id': id, 'name': name, 'misfire_grace_time': misfire_grace_time, 'coalesce': coalesce, 'max_instances': max_instances, 'next_run_time': next_run_time } # 過濾 job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined) # 實例化具體的任務對象 job = Job(self, **job_kwargs) # Don't really add jobs to job stores before the scheduler is up and running with self._jobstores_lock: if self.state == STATE_STOPPED: self._pending_jobs.append((job, jobstore, replace_existing)) self._logger.info('Adding job tentatively -- it will be properly scheduled when ' 'the scheduler starts') else: self._real_add_job(job, jobstore, replace_existing) return job
add_job方法代碼很少,一開始,建立了job_kwargs字典,其中含有觸發器、執行器等,簡單理一理。
接着作了一個過濾,而後將參數傳入Job類,完成任務對象的實例化。
隨後的邏輯比較簡單,先判斷是否能夠拿到self._jobstores_lock鎖,它實際上是一個可重入鎖,Python中,可重入鎖的實現基於普通互斥鎖,只是多了一個變量用於計數,每加一次鎖,該變量加一,每解一次鎖該變量減一,只有在該變量爲0時,才真正去釋放互斥鎖。
獲取到鎖後,先判斷當前調度器的狀態,若是是STATE_STOPPED(中止狀態)則將任務添加到_pending_jobs
待定列表中,若是不是中止狀態,則調用_real_add_job
方法,隨後返回job對象。
其實_real_add_job
方法纔是真正的將任務對象job添加到指定存儲後端的方法。
當任務對象添加到指定存儲後端後(默認直接存到內存中),調度器就會去取來執行。
回到example代碼中,執行完調度器的add_job方法後,緊接着便執行調度器的start方法。
考慮字數,本文就先到這裏,後面會繼續剖析APScheduler。
若是文章對你有所幫助,點擊「在看」支持二兩,下篇文章見。