####1、apSheduler ####2、Flask-APScheduler ####3、動態定時任務 ####4、uwsgi部署注意事項前端
<font color=red>第一部份內容限於apSheduler3.0如下版本,以上版本可移步至 FastAPI+apSheduler動態定時任務</font>python
1. 引子(Introduction)
Advanced Python Scheduler (APScheduler) 是一個輕量級但功能強大的進程內任務調度器,容許您調度函數(或任何其餘python可調用文件)在您選擇的時間執行。linux
2. 特性(Features)
- 沒有(硬)外部依賴性
- api線程安全
- 支持CPython、Jython、PyPy
- 可配置的調度機制(觸發器):
- 相似cron調度
- 單次運行延遲調度(如UNIX「at」命令)
- 基於時間間隔(以指定的時間間隔運行)
- 支持多種存儲空間
- 基於文件的簡單數據庫
- SQLAlchem
- MongoDB
- Redis
3. 使用(Usage)
3.1 安裝
- pip install apscheduler
3.2 啓動調度程序
from apscheduler.scheduler import Scheduler sched = Scheduler() sched.start()
3.3 調度job
3.3.1 簡單日期調度job
from datetime import date from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() # Define the function that is to be executed def my_job(text): print text # The job will be executed on November 6th, 2009 exec_date = date(2009, 11, 6) # 添加一個job job = sched.add_date_job(my_job, exec_date, ['text'])
from datetime import datetime # The job will be executed on November 6th, 2009 at 16:30:05 job = sched.add_date_job(my_job, datetime(2009, 11, 6, 16, 30, 5), ['text'])
job = sched.add_date_job(my_job, '2009-11-06 16:30:05', ['text']) # 支持微秒級別 job = sched.add_date_job(my_job, '2009-11-06 16:30:05.720400', ['text'])
3.3.2 基於時間間隔的調度job
from datetime import datetime from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() def job_function(): print "Hello World" # Schedule job_function to be called every two hours sched.add_interval_job(job_function, hours=2) # The same as before, but start after a certain time point sched.add_interval_job(job_function, hours=2, start_date='2010-10-10 09:30')
from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() # Schedule job_function to be called every two hours @sched.interval_schedule(hours=2) def job_function(): print "Hello World"
3.3.3 cron調度job
與crontab表達式不一樣,您能夠省略不須要的字段。大於最低有效明肯定義字段的字段默認爲*,而較小的字段默認爲其最小值,除了默認爲*。例如,若是僅指定day=1,minute=20,則做業將在每一年每個月的第一天以每小時20分鐘的速度執行。下面的代碼示例應該進一步說明這種行爲。 省略字段默認爲*
from apscheduler.scheduler import Scheduler # Start the scheduler sched = Scheduler() sched.start() def job_function(): print "Hello World" # Schedules job_function to be run on the third Friday # of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00 sched.add_cron_job(job_function, month='6-8,11-12', day='3rd fri', hour='0-3') # Schedule a backup to run once from Monday to Friday at 5:30 (am) sched.add_cron_job(job_function, day_of_week='mon-fri', hour=5, minute=30)
@sched.cron_schedule(day='last sun') def some_decorated_task(): print "I am printed at 00:00:00 on the last Sunday of every month!"
3.3.4 使用自定義觸發器調度
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), x) scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=('定時任務',), trigger='cron', second='*/5') scheduler.add_job(func=aps_test, args=('一次性任務',), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=('循環任務',), trigger='interval', seconds=3) scheduler.start()
3.4 關閉調度器
sched.shutdown() # 默認狀況下,調度程序關閉其線程池,並等待直到全部當前正在執行的job完成。爲了更快地退出,能夠: sched.shutdown(wait=False) # 這仍然會關閉線程池,但不會等待任何正在運行的任務完成。此外,若是您給調度程序一個要在其餘地方管理的線程池,您可能但願徹底跳過線程池關閉: sched.shutdown(shutdown_threadpool=False) # 自動關閉調度程序的一個巧妙方法是爲此使用atexit掛鉤: import atexit sched = Scheduler(daemon=True) atexit.register(lambda: sched.shutdown(wait=False)) # Proceed with starting the actual application
3.5 Job stores
若是沒有指定stores存儲位置,則將轉到默認job存儲 -> ramjobstore不提供持久化保存 其它存儲stores:
ShelveJobStore SQLAlchemyJobStore MongoDBJobStore RedisJobStore
config = {'apscheduler.jobstores.file.class': 'apscheduler.jobstores.shelve_store:ShelveJobStore', 'apscheduler.jobstores.file.path': '/tmp/dbfile'} sched = Scheduler(config)
3.6 獲取調度器列表
1. 引子(Introduction)
- Flask-APScheduler 是Flask框架的一個擴展庫,增長了Flask對apScheduler的支持
2. 特性(Features)
- 根據Flask配置加載調度器配置
- 根據Flask配置加載調度器job
- 容許指定調度程序將運行的主機名
- 提供REST API來管理調度job
- 爲REST API提供認證
3. 安裝(Installation)
pip install Flask-APScheduler
4. 使用(Usage)
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from flask import Flask from flask_apscheduler import APScheduler class Config(object): # 配置執行job JOBS = [ { 'id': 'job1', 'func': 'advanced:job1', 'args': (1, 2), 'trigger': 'interval', 'seconds': 10 } ] # 存儲位置 SCHEDULER_JOBSTORES = { 'default': SQLAlchemyJobStore(url='sqlite://') } # 線程池配置 SCHEDULER_EXECUTORS = { 'default': {'type': 'threadpool', 'max_workers': 20} } SCHEDULER_JOB_DEFAULTS = { 'coalesce': False, 'max_instances': 3 } # 調度器開關 SCHEDULER_API_ENABLED = True def job1(a, b): print(str(a) + ' ' + str(b)) if __name__ == '__main__': app = Flask(__name__) app.config.from_object(Config()) scheduler = APScheduler() # 註冊app scheduler.init_app(app) scheduler.start() app.run()
- Flask + flask_apscheduler實現一個相似Jenkins的定時任務的功能,前端設置crontab,後端能夠建立,修改,暫停,移除,恢復一個執行任務
文件目錄 |--app |----config.py 配置文件 |----run_tasks.py 開啓任務 |----tasks.py 任務job |----apSheduler.py 提供接口函數 |----extensions.py flask擴展 |----__init__.py 初始化文件 |----views.py 業務代碼 |--manage.py 項目啓動文件
1. config.py配置flask_apscheduler
class Config(object): # 開關 SCHEDULER_API_ENABLED = True # 持久化配置 SCHEDULER_JOBSTORES = { 'default': SQLAlchemyJobStore(url='sqlite:///flask_context.db') } SCHEDULER_EXECUTORS = { 'default': {'type': 'threadpool', 'max_workers': 20} }
2. init.py建立app
from app.config import Config from app.extensions import scheduler # 建立app def create_app(config=None, app_name=None, blueprints=None): app = Flask(app_name, static_folder='thanos/static', template_folder='thanos/resource/report') # 導入flask配置 -> 這裏根據本身的項目導入配置就好哇 # config = Config.get_config_from_host(app.name) app.config.from_object(config) # 初始化調度器配置 configure_scheduler(app) def configure_scheduler(app): """Configure Scheduler""" scheduler.init_app(app) scheduler.start() # 加載任務,選擇了第一次請求flask後端時加載,能夠選擇別的方式... @app.before_first_request def load_tasks(): # 開啓任務 from app import run_tasks
3. extensions.py實例化scheduler
from flask_apscheduler import APScheduler scheduler = APScheduler()
4. apSheduler.py提供調度器接口
"""此文件能夠根據具體業務複雜化選擇寫或者直接調用原apscheduler接口""" from flask import current_app # from .extensions import scheduler 直接導入單例對象操做也行 class APScheduler(object): """調度器控制方法""" def add_job(self, jobid, func, args, **kwargs): """ 添加任務 :param args: 元祖 -> (1,2) :param jobstore: 存儲位置 :param trigger: data -> run_date datetime表達式 cron -> second/minute/day_of_week interval -> seconds 延遲時間 next_run_time -> datetime.datetime.now() + datetime.timedelta(seconds=12)) :return: """ job_def = dict(kwargs) job_def['id'] = jobid job_def['func'] = func job_def['args'] = args job_def = self.fix_job_def(job_def) self.remove_job(jobid) # 刪除原job current_app.apscheduler.scheduler.add_job(**job_def) def remove_job(self, jobid, jobstore=None): """刪除任務""" current_app.apscheduler.remove_job(jobid, jobstore=jobstore) def resume_job(self, jobid, jobstore=None): """恢復任務""" current_app.apscheduler.resume_job(jobid, jobstore=jobstore) def pause_job(self, jobid, jobstore=None): """恢復任務""" current_app.apscheduler.pause_job(jobid, jobstore=jobstore) def fix_job_def(self, job_def): """維修job工程""" if job_def.get('trigger') == 'date': job_def['run_date'] = job_def.get('run_date') or None elif job_def.get('trigger') == 'cron': job_def['hour'] = job_def.get('hour') or "*" job_def['minute'] = job_def.get('minute') or "*" job_def['week'] = job_def.get('week') or "*" job_def['day'] = job_def.get('day') or "*" job_def['month'] = job_def.get('month') or "*" elif job_def.get('trigger') == 'interval': job_def['seconds'] = job_def.get('seconds') or "*" else: if job_def.get("andTri"): job_def['trigger'] = AndTrigger([job_def.pop("andTri", None), ]) # job_def['next_run_time'] = job_def.get('next_run_time') or None return job_def
5. views.py 實現調度器接口
from app.apSheduler import APScheduler # croniter庫解析Linux cron格式的計劃 # 以添加爲例子 暫停 刪除 恢復能夠根據業務場景本身寫接口 def add_crontab_task(self, params): """添加一個crontab任務""" try: self.crontab = params.get("crontab") self.id = params.get("id") self.task_id = params.get("task_id") except Exception as e: return False, str(e) # 記錄數據庫 res = addSql() # 更新任務信息 APScheduler().add_job(jobid=self.id, func=task_func, args=(self.task_id,), andTri=CronTrigger.from_crontab(self.crontab)) if res is False: return False, "數據庫操做異常" return True, croniter(self.crontab, datetime.now()).get_next(datetime) def get_next_execute_time(self, params): """獲取下一次執行時間""" try: self.crontab = params.get("crontab") except Exception as e: return False, str(e) return True, str(croniter(self.crontab, datetime.now()).get_next(datetime))
6. tasks.py 任務job
def task_func(task_id): """業務邏輯""" # 發郵件、寫詩、畫畫 -> 愛幹啥幹啥
7. run_tasks.py 開啓任務調度大門
from .task import task_func from apscheduler.triggers.cron import CronTrigger # 能夠很友好的支持添加一個crontab表達式 def run_task(): # 查詢數據庫的crontab信息 -> 定時任務信息 res = fetall("select * from crontab_table") # 遍歷添加任務 shche = APScheduler() for rs in res: shche.add_job(jobid=rs.get(id), func=task_func, args=(rs.get(task_id)), andTri=CronTrigger.from_crontab(rs.get(crontab))) # 最重要的 run_task() # 這樣當__init__.py建立app時加載這個文件,就會執行添加歷史任務啦!
8. manage.py 啓動項目
from app import create_app app = create_app() app.run()
1. 常見問題及解決方案
####1.1 線上部署uWSGI+APScheduler執行定時任務卡死 #####1.1.1問題分析: APScheduler運行環境須要爲多線程,uwsgi默認是one thread ,one process,須要在配置文件裏面加上一條 enable-thread = true,也就是容許程序內部啓動多線程。 #####1.1.2解決方案:
# uwsgi.ini文件追加如下配置 enable-threads = true preload=True #用--preload啓動uWSGI,確保scheduler只在loader的時候建立一次 lazy-apps=true
####1.2 定時任務屢次執行的問題 #####1.2.1問題分析: 1.本地緣由,錯過了上次執行時間,下次會屢次執行 2.線上部署的,如uWSGI部署,配置了processes>1致使加載了多此apscheduler(apscheduler當前沒有任何進程間同步和信令方案)
#####1.2.3解決方案: 1. 本地屢次執行能夠在Flask啓動方法中加use_reloader=False
app.run(host="", port=8888, use_reloader=False)
2.線上linux能夠借鑑下面的方法,網上借鑑的 在__init__.py文件中修改中configure_scheduler(),用全局鎖確保scheduler只運行一次, 代碼以下:
import atexit import fcntl # 只能用於linux from .extensions import scheduler def configure_scheduler(app): """Configure Scheduler""" f = open("scheduler.lock", "wb") try: fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB) scheduler.init_app(app) scheduler.start() # 加載任務 @app.before_first_request def load_tasks(): from thanos import run_tasks except: pass def unlock(): fcntl.flock(f, fcntl.LOCK_UN) f.close() atexit.register(unlock) init函數爲flask項目初始化所調用,這裏爲scheduler模塊的初始化部分。首先打開(或建立)一個scheduler.lock文件,並加上非阻塞互斥鎖。成功後建立scheduler並啓動。若是加文件鎖失敗,說明scheduler已經建立,就略過建立scheduler的部分。 最後註冊一個退出事件,若是這個flask項目退出,則解鎖並關閉scheduler.lock文件的鎖。
3.官網推薦rpyc/grpc解決 能夠查看<font> FastAPI+apSheduler動態定時任務</font>