1、APScheduler簡介:html
Python的一個定時任務框架,知足用戶定時執行或者週期性執行任務的需求,提供了基於日期date、固定的時間間隔interval、以及相似於Linux上的定時任務crontab類型的定時任務。而且該框架不只能夠添加、刪除定時任務,還能夠將任務存儲到數據庫中,實現任務的持久化。python
Python的第三方庫,用來提供Python的後臺程序。包含四個組件,分別是:react
triggers:任務觸發器組件,提供任務觸發方式redis
job stores: 任務商店組件,提供任務保存方式sql
executors:任務調度組件,提供任務調度方式mongodb
schedulers: 任務調度組件,提供任務工做方式數據庫
2、APScheduler安裝框架
1)利用pip安裝(推薦)async
# pip install apscheduler
2 基於源碼:https://pypi.python.org/pypi/APScheduler/tornado
# python setup.py install
3、基本概念
一、 APScheduler有四種組件及相關說明
1) triggers(觸發器):觸發器包含調度邏輯,每個做業有它本身的觸發器,用於決定接下來哪個做業會運行,除了他們本身初始化配置外,觸發器徹底是無狀態的。
2 )job stores(做業存儲):用來存儲被調度的做業,默認的做業存儲器是簡單地把做業任務保存在內存中,其餘做業存儲器能夠將任務保存到各類數據庫中,支持MongoDB、Redis、SQLAlchemy存儲方式。當對做業任務進行持久化存儲的時候,做業的數據將被序列化,從新讀取做業時在反序列化。
3)executors(執行器):執行器用來執行定時任務,只是將須要執行的任務放在新的線程或者線程池中運行。看成業任務完成時,執行器就會通知調度器。對於執行器,默認狀況下選擇ThreadPoolExecutor就能夠了,可是若是涉及到一下特殊任務如比較消耗CPU的任務則能夠選擇ProcessPoolExecutor,固然根據實際需求能夠同時使用兩種執行器
4)schedulers(調度器):調度器是將其餘部分聯繫在一塊兒,通常在應用程序中只有一個調度器,應用開發者不會直接操做觸發器、任務存儲以及執行器。相反調度器提供了處理的接口。經過調度器完成任務的存儲以及執行器的配置操做,如能夠添加、移除、修改任務做業。
APScheduler提供了多種調度器,能夠根據具體需求來選擇合適的調度器,經常使用的調度器有:
BlockingScheduler:適合於只在進程中運行單個任務的狀況,一般在調度器是你惟一要運行的東西時使用
BackgroundScheduler:適合於要求任務在程序後臺運行的狀況,當但願調度器在應用後臺執行時使用。
AsyncIOScheduler:適合於使用asyncio框架的狀況
GeventScheduler:適合於使用gevent框架的狀況
TornadoScheduler:適合於使用Tornado框架的應用
TwistedScheduler:適合使用Twisted框架的應用
QtScheduler:適合使用QT的狀況
二、配置調度器
APScheduler提供了許多不一樣的方式來配置調度器,你可使用一個配置字典或者做爲參數關鍵字的方式傳入。你也能夠先建立調度器。在配置和添加做業,這樣能夠在不一樣的環境中獲得更大的靈活性。
三、簡單的實例
from apscheduler.schedulers.blocking import BlockingScheduler import time #實例化一個調度器 scheduler = BlockingScheduler() def job1(): # 添加任務並設置觸發方式爲3s一次 scheduler.add_job(job1, 'interval', seconds=3) #開始運行調度器 scheduler.start() |
4、各組件功能
一、trigger組件
trigger提供任務的觸發方式,共三種方式:
date:只在某個時間點執行一次run_date(datetime|str)
scheduler.add_job(my_job, 'date', run_date=date(2017, 9, 8), args=[]) scheduler.add_job(my_job, 'date', run_date=datetime(2017, 9, 8, 21, 30, 5), args=[]) scheduler.add_job(my_job, 'date', run_date='2019-6-12 21:30:05', args=[]) # The 'date' trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=[[]) |
interval:每隔一段時間執行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0,
start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, 'interval', hours=2) scheduler.add_job(my_job, 'interval', hours=2, start_date='2017-9-8 21:30:00', end_date='2019-06-12 21:30:00) @scheduler.scheduled_job('interval', id='my_job_id', hours=2) def my_job(): print("Hello World") |
cron:使用Linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
sched.add_job(my_job, 'cron', hour=3, minute=30)
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2017-10- 30')
@sched.scheduled_job('cron', id='my_job_id', day='last sun') def some_decorated_task():
print("I am printed at 00:00:00 on the last Sunday of every month!")
二、scheduler組件
scheduler組件提供執行的方式,在不一樣的運行環境中選擇合適的方式
BlockingScheduler:進程中只運行調度器時的方式
from apscheduler.schedulers.blocking import BlockingScheduler import time scheduler = BlockingScheduler() def job1(): print "%s: 執行任務" % time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start() |
BackgroundScheduler:不想使用任何框架時的方式
from apscheduler.schedulers.background import BackgroundScheduler import time scheduler = BackgroundScheduler() def job1(): print "%s:執行任務 " % time.asctime() scheduler.add_job(job1, 'interval', seconds=3) scheduler.start() while True: pass |
AsyncIOScheduler: asyncio module的方式( Python3)
from apscheduler.schedulers.asyncio import AsyncIOSchedulertry:import asyncioexcept ImportError:import trollius as asyncio ......# while True pass
try:asyncio.get_event_loop().run_forever()except (KeyboardInterrupt, SystemExit):pass
GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler ... ... g = scheduler.start() # while True:pass try: g.join() except (KeyboardInterrupt, SystemExit): pass |
TornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop from apscheduler.schedulers.tornado import TornadoScheduler ... ... # while True:pass try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass |
TwistedScheduler: Twisted方式
from twisted.internet import reactor from apscheduler.schedulers.twisted import TwistedScheduler ... ... # while True:pass try: reactor.run() except (KeyboardInterrupt, SystemExit): pass |
QtScheduler: Qt方式
三、executors組件
executors組件提供任務的調度方式
base
debug
gevent
pool(max_workers=10)
twisted
四、jobstore組件
jobstore提供任務的各類持久化方式
base
memory
mongodb
scheduler.add_jobstore('mongodb', collection='example_jobs')
redis
scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
rethinkdb
scheduler.add_jobstore('rethinkdb', database='apscheduler_example')
sqlalchemy
scheduler.add_jobstore('sqlalchemy', url=url)
zookeeper
scheduler.add_jobstore('zookeeper', path='/example_jobs')
5、任務操做
一、添加任務add_job(如上)
若是使用了任務的存儲,開啓時最好添加replace_existing=True,不然每次開啓時都會建立任務的副本,開啓後任務不會立刻啓動,可修改triger參數
二、刪除任務remove_job
#根據任務實例刪除
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() # 根據任務id刪除 scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id') |
三、任務的暫停pause_job和繼續resume_job
job = scheduler.add_job(myfunc, 'interval', minutes=2)#根據任務實例job.pause()job.resume()# 根據任務id暫停
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')scheduler.pause_job('my_job_id')
四、任務的修飾modify和重設reschedule_job
修飾:job.modify(max_instances=6, name='Alternate name')
重設:scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
五、調度器操做
開啓:scheduler.start()
關閉:scheduler.shotdown(wait=True | False)
暫停:scheduler.pause()
繼續:scheduler.resume()
監聽:http://apscheduler.readthedocs.io/en/v3.3.0/modules/events.html#module-apscheduler.events
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) |
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
} executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5) }
job_defaults = { 'coalesce': False, 'max_instances': 3
} scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_default