Flask-Apscheduler
是集成了 Apscheduler
的擴展, 給 Flask
程序提供了 定時任務
的支持, 使用此擴展能夠很方便的添加定時任務, 下面咱們看一下具體的使用方法.html
pip install Flask-APScheduler
複製代碼
from flask import Flask
from flask_apscheduler import APScheduler
app = Flask(__name__)
scheduler = APScheduler(app=app)
@scheduler.task(trigger='interval', id='test_job', seconds=1)
def test_job():
print('hello world')
if __name__ == '__main__':
scheduler.start()
app.run()
複製代碼
以上代碼將實現 每隔一秒
執行一次 test_job
任務, id
表示當前任務的 惟一標識
. 全部被 task
裝飾器裝飾的函數將被視爲 定時任務
, 定時調度邏輯將經過裝飾器參數設置. 經過以上代碼能夠看到, 集成一個定時任務是十分方便的.python
APScheduler
提供了一個 trigger
的概念, 也就是任務調度的觸發方式, 可選的值爲:git
interval
: 間隔任務, 即在固定的時間間隔循環執行cron
: 指定在某一天的固定時間點執行date
: 在指定時間點執行, 僅執行一次選擇了 trigger
後, 將根據 關鍵字參數
設置具體的調度時間, 每個 trigger
對應的參數都不同, 下面將一一介紹.github
interval
對應 apscheduler.triggers.interval.IntervalTrigger
類, 下面是它的初始化方法:redis
IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)
複製代碼
其中的 初始化參數
是須要在 task
裝飾器中傳入的 關鍵字參數
. 其中 run_date
表示任務開始執行時間, 若是爲 None
則從任務生成時開始算起, end_date
是結束時間, 到達這個時間點後, 任務將中止. 如下是每個參數的含義:sql
weeks(int)
: 須要等待的週數days(int)
: 須要等待的天數hours(int)
: 須要等待的小時數minutes(int)
: 須要等待的分鐘數seconds(int)
: 須要等待的秒數start_data(datetime|str)
: 任務開始時間end_date(datetime|str)
: 任務結束時間timezone(datetime.tzinfo|str)
: 時區jitter(int|None)
: 延遲窗口, 設置此值後, 真正的間隔時間將在 ±jitter
間浮動cron
對應 apscheduler.triggers.cron.CronTrigger
類, 下面是它的初始化方法:mongodb
CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)
複製代碼
最後四個參數與 interval
一致, 再也不解釋, 如下爲每一個參數的含義:數據庫
year(int|str)
: YYYY
格式的年month(int|str)
: 1-12
月day(int|str)
: 1-31
天week(int|str)
: 1-53
周day_of_week(int|str)
: 一週的某一天, 能夠爲 數字
或者 名字
0-6
,mon,tue,wed,thu,fri,sat,sun
hour(int|str)
: 0-23
小時minute(int|str)
: 0-59
分鐘second(int|str)
: 0-59
秒cron
是 Apscheduler
中最強大的 trigger
, 它的使用方式相似於 *nix
下的 crontab
, 因此以上參數的值可使用 cron 表達式
設置, 如下爲可用的表達式, 表格中 任意字段
的 含義
將以 秒
進行解釋, 使用時表示具體使用的 參數字段
:flask
表達式 | 參數字段 | 含義 |
---|---|---|
* |
任意 | 任意秒, 即每秒執行 |
*/a |
任意 | 當前秒數爲 a 的倍數時執行 |
a-b |
任意 | 當前秒數在 a 和 b 之間時執行 |
a-b/c |
任意 | 當前秒數在 a 和 b 之間且是 c 的倍數時執行 |
xth y |
day | 這個月第 x 個 周 , 例如 3rd mon 表示 第三個週一 |
last x |
day | 這個月最後一個 周 |
last |
day | 這個月最後的一天 |
x,y,z |
任意 | 以上表達式能夠寫多個, 逗號分隔 |
在設置參數時,沒有設置的參數將設置默認值. 默認參數規則以一個例子解釋. 若是定義 day=1, minute=20
兩個字段, 則這兩個字段中 含義
最小的爲 minute
, 而後比 minute
小的字段設爲 最小值
, 比它大的字段設置爲 *
, 可是 week
和 day_of_week
一直設置爲 *
. 因此比 minute
小的字段 second
設置爲 0
, 其餘設置爲 *
. 以上例子對應的結果爲:api
year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
複製代碼
date
比較簡單, 只有一個 run_date
參數, 用於設置執行時間, 只執行一次.
API
管理flask-apscheduler
提供了使用 api
管理任務的功能, 可是默認是關閉的, 能夠在 flask
配置中設置 app.config['SCHEDULER_API_ENABLED'] = True
打開, 打開後在 app
實例所在目錄執行 flask routes | grep scheduler
便可查看. flask routes
是 flask
提供的命令行交互程序, 只在 1.*
版本有效, 若是使用的是舊版本的, 可使用 app.url_map.iter_rules()
查看路由信息, 全部路由信息以下:
scheduler.add_job POST /scheduler/jobs
scheduler.delete_job DELETE /scheduler/jobs/<job_id>
scheduler.get_job GET /scheduler/jobs/<job_id>
scheduler.get_jobs GET /scheduler/jobs
scheduler.get_scheduler_info GET /scheduler
scheduler.pause_job POST /scheduler/jobs/<job_id>/pause
scheduler.resume_job POST /scheduler/jobs/<job_id>/resume
scheduler.run_job POST /scheduler/jobs/<job_id>/run
scheduler.update_job PATCH /scheduler/jobs/<job_id>
複製代碼
這些 API
分別調用了 apscheduler
提供的方法, 有了這些 API
咱們就能夠很方便的對任務進行管理.
上節提到咱們能夠 API
對任務進行管理, 即 增刪改查
和 暫停重啓
. 可是默認狀況下, 全部 任務
都存儲在內存中, 一旦項目重啓, 全部已修改的任務都將失效, 不會被保存下來, 因此就須要能夠保存數據的地方可讓任務持久化保存, 也就是 數據庫
, apscheduler
提供瞭如下數據庫做爲存儲介質:
mongodb
redis
sqlalchemy
支持的數據庫rethinkdb
zookeeper
flask-apscheduler
將直接從 flask
配置讀取 SCHEDULER_JOBSTORES
的值, 值的結構以下:
SCHEDULER_JOBSTORES = {
'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db'),
'redis': RedisJobStore()
}
複製代碼
存儲介質能夠配置多個, 在使用 task
裝飾器設置任務時傳入 jobstore='default'
或者 jobstore='redis'
便可. 當使用了 存儲
後, 定時任務一啓動就從 數據庫
獲取任務列表, 而後根據每一個任務的描述進行任務調度. 使用 task
裝飾器添加 任務
時, 默認會覆蓋 數據庫
中已存在的任務, 若是不想被覆蓋則須要手動調用 add_job
方法並設置 replace_existing
參數爲 False
, 以後再添加已存在的任務會拋異常.
我不知道使用持久化存儲的正確姿式是什麼樣的, 根據個人理解, 在添加一系列任務而且進行了修改後, 下次再次啓動項目, 應該仍是保持當時的任務狀態, 而使用
task
裝飾器添加的任務將默認覆蓋已有的任務, 因此只能使用add_job
手動添加任務, 而且不容許覆蓋已存在任務, 若是遇到重複拋出異常, 捕捉便可. 這樣就可讓任務狀態一直保持.
apscheduler
裏有一個 executor
的概念, 用來執行全部的任務, 他們能夠是 線程池
或者 進程池
. flask-apscheduler
經過 SCHEDULER_EXECUTORS
配置來設置執行器, 如下爲示例:
from flask import Flask
from flask_apscheduler import APScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
app = Flask(__name__)
app.config['SCHEDULER_EXECUTORS'] = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
scheduler = APScheduler()
scheduler.init_app(app)
複製代碼
在添加任務的時候使用 executor
參數選擇便可.
調度器對任務中涉及到的 jobstore
, trigger
, exector
以及 增刪改查
進行調度, 使全部任務平穩運行. 通常一個應用只存在一個調度器. 下面是提供的調度器:
BlockingScheduler
: 阻塞運行, 僅在應用爲獨立的定時任務程序時使用BackgroundScheduler
: 後臺運行, flask-apscheduler
默認爲此項AsyncIOScheduler
: 在應用使用了 asyncio
模塊時使用GeventScheduler
: 在應用使用了 gevent
模塊時使用TornadoScheduler
: 在應用使用了 tornado
模塊時使用TwistedScheduler
: 在應用使用了 twisted
模塊時使用QtScheduler
: 在構建 Qt
應用時使用若是選擇了以上的任何一個 Scheduler
, 將會默認選擇合適的 executor
, 不然默認的 executor
爲 ThreadPoolExecutor
.
apscheduler
提供了名爲 apscheduler
的 日誌器
, 能夠對其配置定製日誌輸出, 如下代碼僅供參考:
logger = logging.getLogger('apscheduler')
logger.setLevel(logging.DEBUG)
logger_handler = logging.FileHandler('scheduler.log', mode='a', encoding='utf-8')
logger_handler.setLevel(logging.DEBUG)
logger_handler.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(logger_handler)
複製代碼
以上代碼設置日誌等級爲 DEBUG
, 將日誌輸出到 scheduler.log
文件.
若是任務在執行過程當中拋出了異常, 會將異常堆棧寫入日誌. 可是但願在出錯時能夠作一些其餘的邏輯, 不如出錯後進行郵件提醒等功能就須要添加事件監聽. 如下代碼爲示例:
def my_listener(event):
if event.exception:
print('The job crashed :(')
else:
print('The job worked :)')
scheduler.add_listener(my_listener, EVENT_JOB_ERROR)
複製代碼
以上代碼將添加 錯誤事件
監聽, 當任務執行出錯時, 將會執行 my_listener
函數. 具體事件從 apscheduler.events
導入.
經過以上這些, 對於簡單的 定時任務
已經能夠很方便的集成了. 其實 flask-apscheduler
我也是初次使用, 大多數都是從過閱讀 文檔 後根據本身理解翻譯的, 下面是我使用過程當中遇到過的問題.
我在使用裝飾器註冊一個任務後, 使用 gunicorn
服務器部署應用後, 每一個任務會同時運行屢次, 排查才知道使用 gunicorn
時採用了多個 worker
, 每一個 worker
都會在內存中維護一個獨立的 應用實例
, 而後每一個任務在每一個應用實例中都存在一份. 因此定時任務應該在應用初始化後初始化, 這時 鉤子函數
就起做用了. 使用 app.before_first_request
鉤子函數, 就能夠在第一個請求開始以前將全部任務激活了.
咱們通常須要使用 flask
實例的相關信息時會使用 current_app
代理進行獲取. 因爲 flask
使用了 本地線程
技術來封裝對象, 因此 current_app
僅指向當前線程下的 flask
實例, 可是 定時任務
是另開一個全新的 線程
, 這個線程是脫離了應用上下文的, 若是想要使用, 就須要使用 _get_current_object()
獲取真實實例後做爲參數傳遞到新的線程內, 而後使用 app.app_context()
獲取應用上下文.
flask-apscheduler
中初始化 scheduler
時會將 flask
實例掛載到 scheduler
上, 須要使用到應用上下文時, 只須要使用 scheduler.app.app_context()
便可.