APscheduler是執行定時任務的python庫,其做用能夠代替Linux系統下的crontab,github上有該庫的例子。html
該模塊由4個基本組件組成:python
其中triggers定義了定時任務的類別、觸發條件以及具體要執行的任務名。
job stores能夠將任務儲存起來,有些須要重複執行的任務,或有數據生成的任務,有將數據儲存的要求,能夠經過數據庫或文件的形式將數據儲存起來以便下次使用。
executors負責具體的任務執行。
schedulers是最高級的組件,負責其它的管理和調度工做,該模塊有幾種可選的調度器可供選擇,使用方法都是同樣的:linux
BlockingScheduler: use when the scheduler is the only thing running in your process BackgroundScheduler: use then you’re not using any of the frameworks below, and want the scheduler to run in the background inside your application AsyncIOScheduler: use if your application uses the asyncio module GeventScheduler: use if your application uses gevent TornadoScheduler: use if you’re building a Tornado application TwistedScheduler: use if you’re building a Twisted application QtScheduler: use if you’re building a Qt application
定時任務的執行內容放在一個函數中使用,而調度的類型和時間等設置則在建立這個job的時候進行定義,在這個模塊中,一個具體的任務是一個JOB實例,所以能夠經過修改這個實例的屬性對任務進行從新設置,而每個job實例都是依賴於一個scheduler,因此也能夠經過sheduler對sheduler進行從新設定。git
參考源碼中scheduler.add_job()的參數說明
'cron'跟linux下crontab同樣的定時類型寫法,
'interval'使用時間間隔的寫法
...github
from apscheduler.schedulers.background import BackgroundScheduler # 建立scheduler scheduler = BackgroundScheduler() # 建立job # 方法1 job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 方法2 @scheduler.scheduled_job def myfunc(): # do something pass # 啓動 scheduler.start() # 添加job scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') # 刪除job scheduler.remove_job('my_job_id') # 暫停job scheduler.pause_job() # 恢復job scheduler.resume_job() # 查看jobs scheduler.get_jobs() scheduler.print_jobs() # 修改job scheduler.modify_job(max_instances=6, name='Alternate name') # 從新設置定時任務類型 scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') # 關閉 scheduler.shutdown(wait=False)
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
from apscheduler.schedulers.background import BackgroundScheduler # The "apscheduler." prefix is hard coded scheduler = BackgroundScheduler({ 'apscheduler.jobstores.mongo': { 'type': 'mongodb' }, 'apscheduler.jobstores.default': { 'type': 'sqlalchemy', 'url': 'sqlite:///jobs.sqlite' }, 'apscheduler.executors.default': { 'class': 'apscheduler.executors.pool:ThreadPoolExecutor', 'max_workers': '20' }, 'apscheduler.executors.processpool': { 'type': 'processpool', 'max_workers': '5' }, 'apscheduler.job_defaults.coalesce': 'false', 'apscheduler.job_defaults.max_instances': '3', 'apscheduler.timezone': 'UTC', })
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, 'processpool': ProcessPoolExecutor(max_workers=5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
flask下使用該模塊能夠使用flask擴展模塊flask-apshedulerweb
from flask import Flask from flask_apscheduler import APScheduler class Config(object): JOBS = [ { 'id': 'job1', 'func': 'jobs:job1', 'args': (1, 2), 'trigger': 'interval', 'seconds': 10 } ] SCHEDULER_API_ENABLED = True def job1(a, b): print(str(a) + ' ' + str(b)) if __name__ == '__main__': app = Flask(__name__) app.config.from_object(Config()) scheduler = APScheduler() # it is also possible to enable the API directly # scheduler.api_enabled = True scheduler.init_app(app) scheduler.start() app.run()
問題在使用gunicorn對flask應用進行控制的時候若是設置了gunicorn的--worker參數大於1時,會出現一個定時任務執行屢次的問題,此時要給gunicorn提供一個額外的--preload參數,這樣,flask的app在run以前的全部操做只會執行一次,所以定時任務就只會執行一次。
env/bin/gunicorn module_containing_app:app -b 0.0.0.0:8080 --workers 3 --preloadsql
問題與上面的問題相似,調試模式下只需給app.run()添加一個參數use_reloader便可。
app.run(debug=True, use_reloader=False)mongodb