基於Flask-APScheduler實現添加動態定時任務


閱讀目錄

1、apSheduler

2、Flask-APScheduler

3、動態定時任務

4、uwsgi部署注意事項

1、apSheduler

第一部份內容限於apSheduler3.0如下版本,以上版本可移步至 FastAPI+apSheduler動態定時任務html

1. 引子(Introduction)

Advanced Python Scheduler (APScheduler) 是一個輕量級但功能強大的進程內任務調度器,容許您調度函數(或任何其餘python可調用文件)在您選擇的時間執行。前端

2. 特性(Features)

  1. 沒有(硬)外部依賴性
  2. api線程安全
  3. 支持CPython、Jython、PyPy
  4. 可配置的調度機制(觸發器):
    1. 相似cron調度
    2. 單次運行延遲調度(如UNIX「at」命令)
    3. 基於時間間隔(以指定的時間間隔運行)
  5. 支持多種存儲空間
    1. RAM
    2. 基於文件的簡單數據庫
    3. SQLAlchem
    4. MongoDB
    5. Redis

3. 使用(Usage)

3.1 安裝

  • pip install apscheduler

3.2 啓動調度程序

from apscheduler.scheduler import Scheduler

sched = Scheduler()
sched.start()

3.3 調度job

3.3.1 簡單日期調度job

在指定時間執行一次job。這是至關於UNIX「at」命令的進程內命令python

from datetime import date
from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# Define the function that is to be executed
def my_job(text):
    print text

# The job will be executed on November 6th, 2009
exec_date = date(2009, 11, 6)

# 添加一個job
job = sched.add_date_job(my_job, exec_date, ['text'])

更具體地安排時間linux

from datetime import datetime

# The job will be executed on November 6th, 2009 at 16:30:05
job = sched.add_date_job(my_job, datetime(2009, 11, 6, 16, 30, 5), ['text'])

甚至能夠將日期指定爲字符串文本sql

job = sched.add_date_job(my_job, '2009-11-06 16:30:05', ['text'])

# 支持微秒級別
job = sched.add_date_job(my_job, '2009-11-06 16:30:05.720400', ['text'])
3.3.2 基於時間間隔的調度job

job的執行在給定延遲後開始,或者在start_date(若是指定)開始,start_date參數能夠做爲date/datetime對象或字符串文本給出。數據庫

from datetime import datetime

from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

def job_function():
    print "Hello World"

# Schedule job_function to be called every two hours
sched.add_interval_job(job_function, hours=2)

# The same as before, but start after a certain time point
sched.add_interval_job(job_function, hours=2, start_date='2010-10-10 09:30')

裝飾語法flask

from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

# Schedule job_function to be called every two hours
@sched.interval_schedule(hours=2)
def job_function():
    print "Hello World"

若是須要取消對裝飾功能的job,能夠這樣作後端

scheduler.unschedule_job(job_function.job)
3.3.3 cron調度job

與crontab表達式不一樣,您能夠省略不須要的字段。大於最低有效明肯定義字段的字段默認爲,而較小的字段默認爲其最小值,除了默認爲。例如,若是僅指定day=1,minute=20,則做業將在每一年每個月的第一天以每小時20分鐘的速度執行。下面的代碼示例應該進一步說明這種行爲。
省略字段默認爲*
api

from apscheduler.scheduler import Scheduler

# Start the scheduler
sched = Scheduler()
sched.start()

def job_function():
    print "Hello World"

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_cron_job(job_function, month='6-8,11-12', day='3rd fri', hour='0-3')

# Schedule a backup to run once from Monday to Friday at 5:30 (am)
sched.add_cron_job(job_function, day_of_week='mon-fri', hour=5, minute=30)

裝飾語法安全

@sched.cron_schedule(day='last sun')
def some_decorated_task():
    print "I am printed at 00:00:00 on the last Sunday of every month!"

若是須要取消對裝飾功能的job,能夠這樣作

scheduler.unschedule_job(job_function.job)
3.3.4 使用自定義觸發器調度

以上事例基於內置觸發器調度job,若是須要使用自定義觸發器調度須要使用add_job()方法

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
  
def aps_test(x):
    print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x)    
scheduler = BlockingScheduler()
scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5')
scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12))
scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3)

scheduler.start()

3.4 關閉調度器

sched.shutdown()

# 默認狀況下,調度程序關閉其線程池,並等待直到全部當前正在執行的job完成。爲了更快地退出,能夠:
sched.shutdown(wait=False)

# 這仍然會關閉線程池,但不會等待任何正在運行的任務完成。此外,若是您給調度程序一個要在其餘地方管理的線程池,您可能但願徹底跳過線程池關閉:
sched.shutdown(shutdown_threadpool=False)

# 自動關閉調度程序的一個巧妙方法是爲此使用atexit掛鉤:
import atexit
sched = Scheduler(daemon=True)
atexit.register(lambda: sched.shutdown(wait=False))
# Proceed with starting the actual application

3.5 Job stores

若是沒有指定stores存儲位置,則將轉到默認job存儲 -> ramjobstore不提供持久化保存
其它存儲stores:

ShelveJobStore
SQLAlchemyJobStore
MongoDBJobStore
RedisJobStore

經過配置選項或add_jobstore()方法添加做業存儲。所以,如下是相等的:

config = {'apscheduler.jobstores.file.class': 'apscheduler.jobstores.shelve_store:ShelveJobStore',
      'apscheduler.jobstores.file.path': '/tmp/dbfile'}
sched = Scheduler(config)

3.6 獲取調度器列表

sched.print_jobs()

2、Flask-APScheduler

1. 引子(Introduction)

  • Flask-APScheduler 是Flask框架的一個擴展庫,增長了Flask對apScheduler的支持

2. 特性(Features)

  1. 根據Flask配置加載調度器配置
  2. 根據Flask配置加載調度器job
  3. 容許指定調度程序將運行的主機名
  4. 提供REST API來管理調度job
  5. 爲REST API提供認證

3. 安裝(Installation)

pip install Flask-APScheduler

4. 使用(Usage)

from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from flask import Flask
from flask_apscheduler import APScheduler

class Config(object):
    # 配置執行job
    JOBS = [
        {
            'id': 'job1',
            'func': 'advanced:job1',
            'args': (1, 2),
            'trigger': 'interval',
            'seconds': 10
        }
    ]
    # 存儲位置
    SCHEDULER_JOBSTORES = {
        'default': SQLAlchemyJobStore(url='sqlite://')
    }
    # 線程池配置
    SCHEDULER_EXECUTORS = {
        'default': {'type': 'threadpool', 'max_workers': 20}
    }

    SCHEDULER_JOB_DEFAULTS = {
        'coalesce': False,
        'max_instances': 3
    }
    # 調度器開關
    SCHEDULER_API_ENABLED = True


def job1(a, b):
    print(str(a) + ' ' + str(b))
    
if __name__ == '__main__':
    app = Flask(__name__)
    app.config.from_object(Config())
    
    scheduler = APScheduler()
    # 註冊app
    scheduler.init_app(app)
    scheduler.start()

    app.run()

3、動態定時任務

  • Flask + flask_apscheduler實現一個相似Jenkins的定時任務的功能,前端設置crontab,後端能夠建立,修改,暫停,移除,恢復一個執行任務
文件目錄
    |--app
    |----config.py      配置文件
    |----run_tasks.py   開啓任務
    |----tasks.py       任務job
    |----apSheduler.py  提供接口函數
    |----extensions.py  flask擴展
    |----__init__.py    初始化文件
    |----views.py       業務代碼
    |--manage.py        項目啓動文件

1. config.py配置flask_apscheduler

class Config(object):
    # 開關
    SCHEDULER_API_ENABLED = True
    # 持久化配置
    SCHEDULER_JOBSTORES = {
            'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db')
        }
    SCHEDULER_EXECUTORS = {
        'default': {'type': 'threadpool', 'max_workers': 20}
    }

2. init.py建立app

from app.config import Config
from app.extensions import scheduler

# 建立app
def create_app(config=None, app_name=None, blueprints=None):
    app = Flask(app_name, static_folder='thanos/static',
        template_folder='thanos/resource/report')
    # 導入flask配置 -> 這裏根據本身的項目導入配置就好哇
    # config = Config.get_config_from_host(app.name)
    app.config.from_object(config)
    # 初始化調度器配置    
    configure_scheduler(app)
        
def configure_scheduler(app):
    """Configure Scheduler"""
    scheduler.init_app(app)
    scheduler.start()
    # 加載任務,選擇了第一次請求flask後端時加載,能夠選擇別的方式...
    @app.before_first_request   
    def load_tasks():
        # 開啓任務
        from app import run_tasks

3. extensions.py實例化scheduler

from flask_apscheduler import APScheduler
scheduler = APScheduler()

4. apSheduler.py提供調度器接口

"""此文件能夠根據具體業務複雜化選擇寫或者直接調用原apscheduler接口"""
from flask import current_app
# from .extensions import scheduler  直接導入單例對象操做也行

class APScheduler(object):
"""調度器控制方法""" 
    def add_job(self, jobid, func, args, **kwargs):
        """
        添加任務
        :param args:  元祖 -> (1,2)
        :param jobstore:  存儲位置
        :param trigger:
                        data ->  run_date   datetime表達式
                        cron ->  second/minute/day_of_week
                        interval ->  seconds 延遲時間
                        next_run_time ->  datetime.datetime.now() + datetime.timedelta(seconds=12))
        :return:
        """
        job_def = dict(kwargs)
        job_def['id'] = jobid
        job_def['func'] = func
        job_def['args'] = args
        job_def = self.fix_job_def(job_def)
        self.remove_job(jobid)  # 刪除原job
        current_app.apscheduler.scheduler.add_job(**job_def)

    def remove_job(self, jobid, jobstore=None):
        """刪除任務"""
        current_app.apscheduler.remove_job(jobid, jobstore=jobstore)

    def resume_job(self, jobid, jobstore=None):
        """恢復任務"""
        current_app.apscheduler.resume_job(jobid, jobstore=jobstore)

    def pause_job(self, jobid, jobstore=None):
        """恢復任務"""
        current_app.apscheduler.pause_job(jobid, jobstore=jobstore)

    def fix_job_def(self, job_def):
        """維修job工程"""
        if job_def.get('trigger') == 'date':
            job_def['run_date'] = job_def.get('run_date') or None
        elif job_def.get('trigger') == 'cron':
            job_def['hour'] = job_def.get('hour') or "*"
            job_def['minute'] = job_def.get('minute') or "*"
            job_def['week'] = job_def.get('week') or "*"
            job_def['day'] = job_def.get('day') or "*"
            job_def['month'] = job_def.get('month') or "*"
        elif job_def.get('trigger') == 'interval':
            job_def['seconds'] = job_def.get('seconds') or "*"
        else:
            if job_def.get("andTri"):
                job_def['trigger'] = AndTrigger([job_def.pop("andTri", None), ])
            # job_def['next_run_time'] = job_def.get('next_run_time') or None
        return job_def

5. views.py 實現調度器接口

from app.apSheduler import APScheduler

# croniter庫解析Linux cron格式的計劃

# 以添加爲例子 暫停 刪除 恢復能夠根據業務場景本身寫接口
def add_crontab_task(self, params):
    """添加一個crontab任務"""
    try:
        self.crontab = params.get("crontab")
        self.id = params.get("id")
        self.task_id = params.get("task_id")
    except Exception as e:
        return False, str(e)
    # 記錄數據庫
    res = addSql()
    # 更新任務信息
    APScheduler().add_job(jobid=self.id, func=task_func,
                          args=(self.task_id,), andTri=CronTrigger.from_crontab(self.crontab))
    if res is False:
        return False, "數據庫操做異常"
    return True, croniter(self.crontab, datetime.now()).get_next(datetime)

def get_next_execute_time(self, params):
    """獲取下一次執行時間"""
    try:
        self.crontab = params.get("crontab")
    except Exception as e:
        return False, str(e)
    return True, str(croniter(self.crontab, datetime.now()).get_next(datetime))

6. tasks.py 任務job

def task_func(task_id):
    """業務邏輯"""
    # 發郵件、寫詩、畫畫 -> 愛幹啥幹啥

7. run_tasks.py 開啓任務調度大門

from .task import task_func
from apscheduler.triggers.cron import CronTrigger  # 能夠很友好的支持添加一個crontab表達式

def run_task():
    # 查詢數據庫的crontab信息 -> 定時任務信息
    res = fetall("select * from crontab_table")
    # 遍歷添加任務
    shche = APScheduler()
    for rs in res:
        shche.add_job(jobid=rs.get(id), func=task_func,
                  args=(rs.get(task_id)), andTri=CronTrigger.from_crontab(rs.get(crontab)))

# 最重要的       
run_task()   # 這樣當__init__.py建立app時加載這個文件,就會執行添加歷史任務啦!

8. manage.py 啓動項目

from app import create_app
app = create_app()
app.run()

4、uwsgi部署注意事項

1. 常見問題及解決方案

1.1 線上部署uWSGI+APScheduler執行定時任務卡死

1.1.1問題分析:

APScheduler運行環境須要爲多線程,uwsgi默認是one thread ,one process,須要在配置文件裏面加上一條 enable-thread = true,也就是容許程序內部啓動多線程。

1.1.2解決方案:
# uwsgi.ini文件追加如下配置
enable-threads = true
preload=True  #用--preload啓動uWSGI,確保scheduler只在loader的時候建立一次
lazy-apps=true

1.2 定時任務屢次執行的問題

1.2.1問題分析:

1.本地緣由,錯過了上次執行時間,下次會屢次執行
2.線上部署的,如uWSGI部署,配置了processes>1致使加載了多此apscheduler(apscheduler當前沒有任何進程間同步和信令方案)

1.2.3解決方案:

1. 本地屢次執行能夠在Flask啓動方法中加use_reloader=False

app.run(host="0.0.0.0", port=8888, use_reloader=False)

2.線上linux能夠借鑑下面的方法,網上借鑑的
在__init__.py文件中修改中configure_scheduler(),用全局鎖確保scheduler只運行一次, 代碼以下:

import atexit
import fcntl  # 只能用於linux
from .extensions import scheduler

def configure_scheduler(app):
    """Configure Scheduler"""
    f = open("scheduler.lock", "wb")
    try:
        fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
        scheduler.init_app(app)
        scheduler.start()
        # 加載任務
        @app.before_first_request
        def load_tasks():
            from thanos import run_tasks
    except:
        pass
    def unlock():
        fcntl.flock(f, fcntl.LOCK_UN)
        f.close()
    atexit.register(unlock)

init函數爲flask項目初始化所調用,這裏爲scheduler模塊的初始化部分。首先打開(或建立)一個scheduler.lock文件,並加上非阻塞互斥鎖。成功後建立scheduler並啓動。若是加文件鎖失敗,說明scheduler已經建立,就略過建立scheduler的部分。
最後註冊一個退出事件,若是這個flask項目退出,則解鎖並關閉scheduler.lock文件的鎖。

3.官網推薦rpyc/grpc解決
能夠查看 FastAPI+apSheduler動態定時任務

相關文章
相關標籤/搜索