APScheduler是Python中知名的定時任務框架,能夠很方面的知足定時執行或週期性執行程序任務等需求,相似於Linux上的crontab,但比crontab要更增強大,該框架不只能夠添加、刪除定時任務,還提供多種持久化任務的功能。python
APScheduler弱分佈式的框架,由於每一個任務對象都存儲在當前節點中,只能經過人肉的形式實現分佈式,如利用Redis來作。redis
第一次接觸APScheduler會發它有不少概念,我當年第一次接觸時就是由於概念太多,直接用crontab多舒服,但如今公司項目不少都基於APScheduler實現,因此來簡單扒一扒的它的源碼。mongodb
用最簡單的語言提一下APScheduler中的關鍵概念。後端
APScheduler提供多個Scheduler,不一樣Scheduler適用於不一樣的情景,目前我最多見的就是BackgroundScheduler後臺調度器,該調度器適合要求在後臺運行程序的調度。app
還有多種其餘調度器:框架
BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用。async
AsyncIOScheduler:適合於使用 asyncio 框架的狀況分佈式
GeventScheduler: 適合於使用 gevent 框架的狀況模塊化
TornadoScheduler: 適合於使用 Tornado 框架的應用函數
TwistedScheduler: 適合使用 Twisted 框架的應用
QtScheduler: 適合使用 QT 的狀況
本文只剖析 BackgroundScheduler 相關的邏輯,先簡單看看官方example,而後以此爲入口逐層剖析。
官方example代碼以下。
from datetime import datetime
import time
import os
from apscheduler.schedulers.background import BackgroundScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BackgroundScheduler()
scheduler.add_job(tick, 'interval', seconds=3) # 添加一個任務,3秒後運行
scheduler.start()
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
# 這是在這裏模擬應用程序活動(使主線程保持活動狀態)。
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
# 關閉調度器
scheduler.shutdown()
複製代碼
上述代碼很是簡單,先經過BackgroundScheduler方法實例化一個調度器,而後調用add_job方法,將須要執行的任務添加到JobStores中,默認就是存到內存中,更具體點,就是存到一個dict中,最後經過start方法啓動調度器,APScheduler就會每隔3秒,觸發名爲interval的觸發器,從而讓調度器調度默認的執行器執行tick方法中的邏輯。
當程序所有執行完後,調用shutdown方法關閉調度器。
BackgroundScheduler實際上是基於線程形式構成的,而線程就有守護線程的概念,若是啓動了守護線程模式,調度器不必定要關閉。
先看一下BackgroundScheduler類的源碼。
# apscheduler/schedulers/background.py
class BackgroundScheduler(BlockingScheduler):
_thread = None
def _configure(self, config):
self._daemon = asbool(config.pop('daemon', True))
super()._configure(config)
def start(self, *args, **kwargs):
# 建立事件通知
# 多個線程能夠等待某個事件的發生,在事件發生後,全部的線程都會被激活。
self._event = Event()
BaseScheduler.start(self, *args, **kwargs)
self._thread = Thread(target=self._main_loop, name='APScheduler')
# 設置爲守護線程,Python主線程運行完後,直接結束不會理會守護線程的狀況,
# 若是是非守護線程,Python主線程會在運行完後,等待其餘非守護線程運行完後,再結束
self._thread.daemon = self._daemon # daemon 是否爲守護線程
self._thread.start() # 啓動線程
def shutdown(self, *args, **kwargs):
super().shutdown(*args, **kwargs)
self._thread.join()
del self._thread
複製代碼
上述代碼中,給出了詳細的註釋,簡單解釋一下。
_configure方法主要用於參數設置,這裏定義了self._daemon 這個參數,而後經過super方法調用父類的_configure方法。
start方法就是其啓動方法,邏輯也很是簡單,建立了線程事件Event,線程事件是一種線程同步機制,你扒開看其源碼,會發現線程事件是基於條件鎖來實現的,線程事件提供了set()、wait()、clear()這3個主要方法。
建立了線程事件後,調用了其父類的start()方法,該方法纔是真正的啓動方法,暫時放放,啓動完後,經過Thread方法建立一個線程,線程的目標函數爲self._main_loop
,它是調度器的主訓練,調度器不關閉,就會一直執行主循環中的邏輯,從而實現APScheduler各類功能,是很是重要方法,一樣,暫時放放。建立完後,啓動線程就ok了。
線程建立完後,定義線程的daemon,若是daemon爲True,則表示當前線程爲守護線程,反之爲非守護線程。
簡單提一下,若是線程爲守護線程,那麼Python主線程邏輯執行完後,會直接退出,不會理會守護線程,若是爲非守護線程,Python主線程執行完後,要等其餘全部非守護線程都執行完纔會退出。
shutdown方法先調用父類的shutdown方法,而後調用join方法,最後將線程對象直接del刪除。
BackgroundScheduler類的代碼看完了,回看一開始的example代碼,經過BackgroundScheduler實例化調度器後,接着調用的是add_job方法,向add_job方法中添加了3個參數,分別是想要定時執行的tick方法,觸發器trigger的名稱,叫interval,而這個觸發器的參數爲seconds=3。
是否能夠將觸發器trigger的名稱改爲任意字符呢?這是不能夠的,APScheduler在這裏其實使用了Python中的entry point技巧,若是你通過過作個Python包並將其打包上傳到PYPI的過程,你對entry point應該有印象。其實entry point不止可能永遠打包,還能夠用於模塊化插件體系結構,這個內容較多,放到後面再聊。
簡單而言,add_job()方法要傳入相應觸發器名稱,interval會對應到apscheduler.triggers.interval.IntervalTrigger類上,seconds參數就是該類的參數。
add_job方法源碼以下。
# apscheduler/schedulers/base.py/BaseScheduler
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', executor='default', replace_existing=False, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
# 過濾
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
value is not undefined)
# 實例化具體的任務對象
job = Job(self, **job_kwargs)
# Don't really add jobs to job stores before the scheduler is up and running
with self._jobstores_lock:
if self.state == STATE_STOPPED:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when '
'the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing)
return job
複製代碼
add_job方法代碼很少,一開始,建立了job_kwargs字典,其中含有觸發器、執行器等,簡單理一理。
接着作了一個過濾,而後將參數傳入Job類,完成任務對象的實例化。
隨後的邏輯比較簡單,先判斷是否能夠拿到self._jobstores_lock鎖,它實際上是一個可重入鎖,Python中,可重入鎖的實現基於普通互斥鎖,只是多了一個變量用於計數,每加一次鎖,該變量加一,每解一次鎖該變量減一,只有在該變量爲0時,才真正去釋放互斥鎖。
獲取到鎖後,先判斷當前調度器的狀態,若是是STATE_STOPPED(中止狀態)則將任務添加到_pending_jobs
待定列表中,若是不是中止狀態,則調用_real_add_job
方法,隨後返回job對象。
其實_real_add_job
方法纔是真正的將任務對象job添加到指定存儲後端的方法。
當任務對象添加到指定存儲後端後(默認直接存到內存中),調度器就會去取來執行。
回到example代碼中,執行完調度器的add_job方法後,緊接着便執行調度器的start方法。
考慮字數,本文就先到這裏,後面會繼續剖析APScheduler。
若是文章對你有所幫助,點擊「在看」支持二兩,下篇文章見。