Java中任務調度通常用Quartz,Python中的任務調度工具也有很多:Celery,RQ,APScheduler等。
Celery:很是強大的分佈式任務調度框架
RQ:基於Redis的做業隊列工具
APScheduler:一款強大的任務調度工具html
RQ參考Celery,聽說要比Celery輕量級(Really?)
APScheduler感受更像Quartz。
本人小小的建議是通常項目用APScheduler,由於不用像Celery那樣再單獨啓動worker、beat進程,並且API也很簡潔。
對於大點分佈式項目用Celerygit
官網:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...
當前版本:3.3.0
安裝:$ pip install apscheduler
例子:https://github.com/agronholm/...github
Advanced Python Scheduler (APScheduler) 一款強大的任務調度工具.
內置了三種調度模式:sql
Cron風格mongodb
間隔性(Interval-based)執行併發
僅在某個時間執行一次app
做業存儲支持如下幾種方式:
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
除了Memory方式不須要序列化以外(一個例外是使用ProcessPoolExecutor),其他都須要做業函數參數可序列化。框架
支持與如下框架集成:
asyncio (PEP 3156)
gevent
Tornado
Twisted
Qt (using either PyQt or PySide)async
四大組件:
triggers
job stores
executors
schedulers分佈式
內置如下幾種調度器實現:
BlockingScheduler:
BackgroundScheduler: 默認採用ThreadPoolExecutor池(默認10),能夠配置ProcessPoolExecutor,或同時使用
AsyncIOScheduler: 使用asyncio模塊
GeventScheduler: 使用gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application
基類:BaseScheduler,能夠經過此類查詢相關配置選項。
方式1、
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
方式2、三:略。請看官網
經過調用start()方法來啓動
scheduler.start() scheduler.shutdown() scheduler.shutdown(wait=False)
內置如下三種觸發器實現:
date: 對已僅執行一次的情緒,指定某個之間點。請參考這裏
interval: 指定時間間隔(fixed intervals)週期性執行。請參考這裏
cron: 使用cron風格表達式週期性執行。請參考這裏
添加jobs:
調用scheduler.add_job()方法,會返回apscheduler.job.Job
實例(可用於job修改、移除等)
使用裝飾器scheduled_job()
做業存儲注意事項:
除了Memory方式不須要序列化以外(一個例外是使用ProcessPoolExecutor),其他都須要做業函數參數可序列化。
若是須要存儲做業,並且每次啓動時你的應用都會從新添加一遍做業,那麼請在添加job時指定一個惟一的ID
,以及指定replace_existing=True
,不然每次啓動應用都會添加一次job的副本。
若是須要當即啓動該任務,請在添加job時指定trigger參數。
移除job:
調用scheduler.remove_job()放到,參數爲 job’s ID and job store alias
調用job實例的remove()方法 on the Job instance you got from add_job()
注意:若是任務已經調度完畢,而且以後也不會再被執行的狀況下,會被自動移除。
job = scheduler.add_job(myfunc, 'interval', minutes=2) job.remove() scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
暫停和恢復job:
apscheduler.job.Job.pause() apscheduler.schedulers.base.BaseScheduler.pause_job() apscheduler.job.Job.resume() apscheduler.schedulers.base.BaseScheduler.resume_job()
獲取jobs列表
apscheduler.get_jobs()
修改job:
能夠經過apscheduler.job.Job.modify() or apscheduler.modify_job()
修改除了id以外
的job屬性。
job.modify(max_instances=6, name='Alternate name')
若是你想修改job的調度器,你可使用apscheduler.job.Job.reschedule() or reschedule_job()
scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
默認狀況下同一個job,只容許一個job實例運行。這在某個job在下次運行時間到達以後仍未執行完畢時,能達到控制的目的。你也能夠該變這一行爲,在你調用add_job時能夠傳遞max_instances
=5來運行同時運行同一個job的5個job實例。
一個job可能因爲某些狀況錯過執行時間,好比上一點提到的,或者是線程池或進程池用光了,或者是當要調度job時,忽然down機了等。
這時能夠經過設置job的misfire_grace_time選項來指示以後嘗試執行的次數。
固然若是這不符合你的指望,你能夠合併全部錯過期間的job到一個job來執行,經過設定job的coalesce=True
。
能夠監聽調度、任務執行狀況相關的事件。
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
支持的事件列表:
http://apscheduler.readthedoc...
有木有很是強大,又很是易於理解的感受。Good Job!