Flask 擴展: 使用 Flask-Apscheduler 集成定時任務

Flask-Apscheduler 是集成了 Apscheduler 的擴展, 給 Flask 程序提供了 定時任務 的支持, 使用此擴展能夠很方便的添加定時任務, 下面咱們看一下具體的使用方法.html

安裝

pip install Flask-APScheduler
複製代碼

示例

from flask import Flask
from flask_apscheduler import APScheduler

app = Flask(__name__)
scheduler = APScheduler(app=app)

@scheduler.task(trigger='interval', id='test_job', seconds=1)
def test_job():
    print('hello world')

if __name__ == '__main__':
    scheduler.start()
    app.run()
複製代碼

以上代碼將實現 每隔一秒 執行一次 test_job 任務, id 表示當前任務的 惟一標識. 全部被 task 裝飾器裝飾的函數將被視爲 定時任務, 定時調度邏輯將經過裝飾器參數設置. 經過以上代碼能夠看到, 集成一個定時任務是十分方便的.python

觸發器

APScheduler 提供了一個 trigger 的概念, 也就是任務調度的觸發方式, 可選的值爲:git

  • interval: 間隔任務, 即在固定的時間間隔循環執行
  • cron: 指定在某一天的固定時間點執行
  • date: 在指定時間點執行, 僅執行一次

選擇了 trigger 後, 將根據 關鍵字參數 設置具體的調度時間, 每個 trigger 對應的參數都不同, 下面將一一介紹.github

interval

interval 對應 apscheduler.triggers.interval.IntervalTrigger 類, 下面是它的初始化方法:redis

IntervalTrigger(weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None, jitter=None)
複製代碼

其中的 初始化參數 是須要在 task 裝飾器中傳入的 關鍵字參數. 其中 run_date 表示任務開始執行時間, 若是爲 None 則從任務生成時開始算起, end_date 是結束時間, 到達這個時間點後, 任務將中止. 如下是每個參數的含義:sql

  • weeks(int): 須要等待的週數
  • days(int): 須要等待的天數
  • hours(int): 須要等待的小時數
  • minutes(int): 須要等待的分鐘數
  • seconds(int): 須要等待的秒數
  • start_data(datetime|str): 任務開始時間
  • end_date(datetime|str): 任務結束時間
  • timezone(datetime.tzinfo|str): 時區
  • jitter(int|None): 延遲窗口, 設置此值後, 真正的間隔時間將在 ±jitter 間浮動

cron

cron 對應 apscheduler.triggers.cron.CronTrigger 類, 下面是它的初始化方法:mongodb

CronTrigger(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None, jitter=None)
複製代碼

最後四個參數與 interval 一致, 再也不解釋, 如下爲每一個參數的含義:數據庫

  • year(int|str): YYYY 格式的年
  • month(int|str): 1-12
  • day(int|str): 1-31
  • week(int|str): 1-53
  • day_of_week(int|str): 一週的某一天, 能夠爲 數字 或者 名字
    • 數字爲 0-6,
    • 名字爲 mon,tue,wed,thu,fri,sat,sun
  • hour(int|str): 0-23 小時
  • minute(int|str): 0-59 分鐘
  • second(int|str): 0-59

cronApscheduler 中最強大的 trigger, 它的使用方式相似於 *nix 下的 crontab, 因此以上參數的值可使用 cron 表達式 設置, 如下爲可用的表達式, 表格中 任意字段含義 將以 進行解釋, 使用時表示具體使用的 參數字段:flask

表達式 參數字段 含義
* 任意 任意秒, 即每秒執行
*/a 任意 當前秒數爲 a 的倍數時執行
a-b 任意 當前秒數在 ab 之間時執行
a-b/c 任意 當前秒數在 ab 之間且是 c 的倍數時執行
xth y day 這個月第 x, 例如 3rd mon 表示 第三個週一
last x day 這個月最後一個
last day 這個月最後的一天
x,y,z 任意 以上表達式能夠寫多個, 逗號分隔

在設置參數時,沒有設置的參數將設置默認值. 默認參數規則以一個例子解釋. 若是定義 day=1, minute=20 兩個字段, 則這兩個字段中 含義 最小的爲 minute, 而後比 minute 小的字段設爲 最小值, 比它大的字段設置爲 *, 可是 weekday_of_week 一直設置爲 *. 因此比 minute 小的字段 second 設置爲 0, 其餘設置爲 *. 以上例子對應的結果爲:api

year='*', month='*', day=1, week='*', day_of_week='*', hour='*', minute=20, second=0
複製代碼

date

date 比較簡單, 只有一個 run_date 參數, 用於設置執行時間, 只執行一次.

使用 API 管理

flask-apscheduler 提供了使用 api 管理任務的功能, 可是默認是關閉的, 能夠在 flask 配置中設置 app.config['SCHEDULER_API_ENABLED'] = True 打開, 打開後在 app 實例所在目錄執行 flask routes | grep scheduler 便可查看. flask routesflask 提供的命令行交互程序, 只在 1.* 版本有效, 若是使用的是舊版本的, 可使用 app.url_map.iter_rules() 查看路由信息, 全部路由信息以下:

scheduler.add_job                POST     /scheduler/jobs
scheduler.delete_job             DELETE   /scheduler/jobs/<job_id>
scheduler.get_job                GET      /scheduler/jobs/<job_id>
scheduler.get_jobs               GET      /scheduler/jobs
scheduler.get_scheduler_info     GET      /scheduler
scheduler.pause_job              POST     /scheduler/jobs/<job_id>/pause
scheduler.resume_job             POST     /scheduler/jobs/<job_id>/resume
scheduler.run_job                POST     /scheduler/jobs/<job_id>/run
scheduler.update_job             PATCH    /scheduler/jobs/<job_id>
複製代碼

這些 API 分別調用了 apscheduler 提供的方法, 有了這些 API 咱們就能夠很方便的對任務進行管理.

持久化存儲

上節提到咱們能夠 API 對任務進行管理, 即 增刪改查暫停重啓. 可是默認狀況下, 全部 任務 都存儲在內存中, 一旦項目重啓, 全部已修改的任務都將失效, 不會被保存下來, 因此就須要能夠保存數據的地方可讓任務持久化保存, 也就是 數據庫, apscheduler 提供瞭如下數據庫做爲存儲介質:

  • mongodb
  • redis
  • 全部被 sqlalchemy 支持的數據庫
  • rethinkdb
  • zookeeper

flask-apscheduler 將直接從 flask 配置讀取 SCHEDULER_JOBSTORES 的值, 值的結構以下:

SCHEDULER_JOBSTORES = {
    'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db'),
    'redis': RedisJobStore()
}
複製代碼

存儲介質能夠配置多個, 在使用 task 裝飾器設置任務時傳入 jobstore='default' 或者 jobstore='redis' 便可. 當使用了 存儲 後, 定時任務一啓動就從 數據庫 獲取任務列表, 而後根據每一個任務的描述進行任務調度. 使用 task 裝飾器添加 任務 時, 默認會覆蓋 數據庫 中已存在的任務, 若是不想被覆蓋則須要手動調用 add_job 方法並設置 replace_existing 參數爲 False, 以後再添加已存在的任務會拋異常.

我不知道使用持久化存儲的正確姿式是什麼樣的, 根據個人理解, 在添加一系列任務而且進行了修改後, 下次再次啓動項目, 應該仍是保持當時的任務狀態, 而使用 task 裝飾器添加的任務將默認覆蓋已有的任務, 因此只能使用 add_job 手動添加任務, 而且不容許覆蓋已存在任務, 若是遇到重複拋出異常, 捕捉便可. 這樣就可讓任務狀態一直保持.

執行器

apscheduler 裏有一個 executor 的概念, 用來執行全部的任務, 他們能夠是 線程池 或者 進程池. flask-apscheduler 經過 SCHEDULER_EXECUTORS 配置來設置執行器, 如下爲示例:

from flask import Flask
from flask_apscheduler import APScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

app = Flask(__name__)
app.config['SCHEDULER_EXECUTORS'] = {
    'default': ThreadPoolExecutor(20),
    'processpool': ProcessPoolExecutor(5)
}

scheduler = APScheduler()
scheduler.init_app(app)
複製代碼

在添加任務的時候使用 executor 參數選擇便可.

調度器

調度器對任務中涉及到的 jobstore, trigger, exector 以及 增刪改查 進行調度, 使全部任務平穩運行. 通常一個應用只存在一個調度器. 下面是提供的調度器:

  • BlockingScheduler: 阻塞運行, 僅在應用爲獨立的定時任務程序時使用
  • BackgroundScheduler: 後臺運行, flask-apscheduler 默認爲此項
  • AsyncIOScheduler: 在應用使用了 asyncio 模塊時使用
  • GeventScheduler: 在應用使用了 gevent 模塊時使用
  • TornadoScheduler: 在應用使用了 tornado 模塊時使用
  • TwistedScheduler: 在應用使用了 twisted 模塊時使用
  • QtScheduler: 在構建 Qt 應用時使用

若是選擇了以上的任何一個 Scheduler, 將會默認選擇合適的 executor, 不然默認的 executorThreadPoolExecutor.

日誌及事件監聽

日誌

apscheduler 提供了名爲 apscheduler日誌器, 能夠對其配置定製日誌輸出, 如下代碼僅供參考:

logger = logging.getLogger('apscheduler')
logger.setLevel(logging.DEBUG)

logger_handler = logging.FileHandler('scheduler.log', mode='a', encoding='utf-8')
logger_handler.setLevel(logging.DEBUG)
logger_handler.setFormatter(logging.Formatter(fmt='%(asctime)s %(levelname)s: %(message)s',
                                              datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(logger_handler)
複製代碼

以上代碼設置日誌等級爲 DEBUG, 將日誌輸出到 scheduler.log 文件.

事件監聽

若是任務在執行過程當中拋出了異常, 會將異常堆棧寫入日誌. 可是但願在出錯時能夠作一些其餘的邏輯, 不如出錯後進行郵件提醒等功能就須要添加事件監聽. 如下代碼爲示例:

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')
        
scheduler.add_listener(my_listener, EVENT_JOB_ERROR)
複製代碼

以上代碼將添加 錯誤事件 監聽, 當任務執行出錯時, 將會執行 my_listener 函數. 具體事件從 apscheduler.events 導入.

常見問題

經過以上這些, 對於簡單的 定時任務 已經能夠很方便的集成了. 其實 flask-apscheduler 我也是初次使用, 大多數都是從過閱讀 文檔 後根據本身理解翻譯的, 下面是我使用過程當中遇到過的問題.

任務運行屢次

我在使用裝飾器註冊一個任務後, 使用 gunicorn 服務器部署應用後, 每一個任務會同時運行屢次, 排查才知道使用 gunicorn 時採用了多個 worker, 每一個 worker 都會在內存中維護一個獨立的 應用實例, 而後每一個任務在每一個應用實例中都存在一份. 因此定時任務應該在應用初始化後初始化, 這時 鉤子函數 就起做用了. 使用 app.before_first_request 鉤子函數, 就能夠在第一個請求開始以前將全部任務激活了.

須要使用應用上下文

咱們通常須要使用 flask 實例的相關信息時會使用 current_app 代理進行獲取. 因爲 flask 使用了 本地線程 技術來封裝對象, 因此 current_app 僅指向當前線程下的 flask 實例, 可是 定時任務 是另開一個全新的 線程, 這個線程是脫離了應用上下文的, 若是想要使用, 就須要使用 _get_current_object() 獲取真實實例後做爲參數傳遞到新的線程內, 而後使用 app.app_context() 獲取應用上下文.

flask-apscheduler 中初始化 scheduler 時會將 flask 實例掛載到 scheduler 上, 須要使用到應用上下文時, 只須要使用 scheduler.app.app_context() 便可.

參考

flask-apscheduler

apscheduler

相關文章
相關標籤/搜索