###閱讀目錄linux
####1、apSheduler ####2、實戰應用git
apSheduler
###1.安裝github
pip install apschedulersql
###2.基礎組件mongodb
- triggers 觸發器
- job stores job存儲
- executors 執行器
- schedulers 調度器
###3.選擇合適的調度器,存儲器,執行器,觸發器 ####3.1調度器(schedulers)後端
- BlockingScheduler: 進程中只有調度器
- BackgroundScheduler: 非如下框架,且但願運行在後臺
- AsyncIOScheduler: 應用程序使用asyncio
- GeventScheduler: 應用程序使用gevent
- TornadoScheduler: 構建Tornado
- TwistedScheduler: 構建Twisted應用
- QtScheduler: 構建Qt應用
####3.2存儲器(job stores)api
- 持久化存儲job,經過SQLAlchemyJobStore設置存儲連接
- 非持久化存儲job,重啓時從新建立job,默認MemoryJobStore內存存儲
####3.3執行器(executors)併發
- processpoolexecutor,CUP密集型業務,可選進程池,也能夠同線程池同時使用
- threadpoolexecutor,默人線程池
####3.4觸發器(triggers )app
- date: 設置日期,針對某個時間點運行一次job
- interval: 固定時間間隔運行job
- cron: 相似linux-crontab,某個時間點按期運行job
###4.配置調度器框架
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor jobstores = { # 能夠配置多個存儲 'mongo': {'type': 'mongodb'}, 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存儲連接 } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工做線程數20 'processpool': ProcessPoolExecutor(max_workers=5) # 最大工做進程數爲5 } job_defaults = { 'coalesce': False, # 關閉新job的合併,當job延誤或者異常緣由未執行時 'max_instances': 3 # 併發運行新job默認最大實例多少 } scheduler = BackgroundScheduler() # .. do something else here, maybe add jobs etc. scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc做爲調度程序的時區
###5.調度器的增刪改查
import os import time from apscheduler.schedulers.background import BackgroundScheduler def print_time(name): print(f'{name} - {time.ctime()}') def add_job(job_id, func, args, seconds): """添加job""" print(f"添加job - {job_id}") scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds) def remove_job(job_id): """移除job""" scheduler.remove_job(job_id) print(f"移除job - {job_id}") def pause_job(job_id): """中止job""" scheduler.pause_job(job_id) print(f"中止job - {job_id}") def resume_job(job_id): """恢復job""" scheduler.resume_job(job_id) print(f"恢復job - {job_id}") def get_jobs(): """獲取全部job信息,包括已中止的""" res = scheduler.get_jobs() print(f"全部job - {res}") def print_jobs(): print(f"詳細job信息") scheduler.print_jobs() def start(): """啓動調度器""" scheduler.start() def shutdown(): """關閉調度器""" scheduler.shutdown() if __name__ == '__main__': scheduler = BackgroundScheduler() start() print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C')) add_job('job_A', func=print_time, args=("A", ), seconds=1) add_job('job_B', func=print_time, args=("B", ), seconds=2) time.sleep(6) pause_job('job_A') get_jobs() time.sleep(6) print_jobs() resume_job('job_A') time.sleep(6) remove_job('job_A') time.sleep(6) try: shutdown() except RuntimeError: pass
###6.調度事件 能夠將事件偵聽器附加到調度程序。調度程序事件在某些狀況下會被觸發,而且可能會在其中攜帶有關該特定事件詳細信息的附加信息。經過給add_listener()提供適當的掩碼參數或者將不一樣的常量放在一塊兒,能夠只監聽特定類型的事件。用一個參數調用偵聽器callable,即event對象。
def my_listener(event): if event.exception: print('The job crashed :(') else: print('The job worked :)') scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
###7.配置日誌
import logging logging.basicConfig() logging.getLogger('apscheduler').setLevel(logging.DEBUG)
實戰應用
###1.fastapi動態添加定時任務
import asyncio import datetime import uvicorn from fastapi import FastAPI, Body from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ProcessPoolExecutor from apscheduler.triggers.cron import CronTrigger app = FastAPI(title='fast-api') scheduler = None @app.on_event('startup') def init_scheduler(): """初始化""" jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # SQLAlchemyJobStore指定存儲連接 } executors = { 'default': {'type': 'threadpool', 'max_workers': 20}, # 最大工做線程數20 'processpool': ProcessPoolExecutor(max_workers=5) # 最大工做進程數爲5 } global scheduler scheduler = AsyncIOScheduler() scheduler.configure(jobstores=jobstores, executors=executors) # 添加一個coroutine執行,結果很不理想... scheduler.add_job(tick, 'interval', seconds=3) print("啓動調度器...") scheduler.start() def print_time(name): print(f'{name} - {datetime.datetime.now()}') async def tick(): print('Tick! The time is: %s' % datetime.datetime.now()) await asyncio.sleep(1) @app.post('/add-job') async def add_job(job_id: str = Body(...), cron: str = Body(...)): """添加job""" scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron)) return {"msg": "success!"} @app.post('/remove-job') async def remove_job(job_id: str = Body(..., embed=True)): """移除job""" scheduler.remove_job(job_id) return {"msg": "success!"} if __name__ == '__main__': uvicorn.run(app, host='127.0.0.1', port=5566)
####1.1思考
- 在項目中定時任務要和後端運行須要運行在一個進程中嗎?
- 後端頻繁發佈代碼,怎麼避免影響定時任務執行呢?
- 把定時任務抽離搭建服務,如何去作呢?
- apscheduler在多進程中會屢次加載job,致使job重複執行,怎麼解決呢?
###2.rpyc實現定時任務服務註冊
針對上面的三個思考,官方也給出了基於rpyc解決的demo
####2.1rpc負責添加job,執行函數在rpc,serve只作添加調用後關閉
- 見官方demo
####2.2rpc負責添加job,執行函數在server #####server.py
import datetime import uvicorn from fastapi import FastAPI, Body import rpyc app = FastAPI(title='fast-api') conn = None bgsrv = None mon = None @app.on_event('startup') def init_scheduler(): """初始化""" global conn,bgsrv,mon conn = rpyc.connect("localhost", 12345) # create a bg thread to process incoming events bgsrv = rpyc.BgServingThread(conn) mon = conn.root.Monitor(print_time) def print_time(name): print(f'{name} - {datetime.datetime.now()}') def from_crontab(cron): values = cron.split(' ') return { 'minute': values[0], 'hour': values[1], 'day': values[2], 'month': values[3], 'day_of_week': values[4], } @app.post('/add-job') async def add_job(job_id: str = Body(...), cron: str = Body(...)): """添加job""" mon.add_job(id=job_id, args=(job_id, ), trigger='cron', **from_crontab(cron)) return {"msg": "success!"} @app.post('/remove-job') async def remove_job(job_id: str = Body(..., embed=True)): """移除job""" mon.remove_job(job_id) return {"msg": "success!"} if __name__ == '__main__': uvicorn.run(app, host='127.0.0.1', port=5566)
rpc.py
import rpyc from rpyc.utils.server import ThreadedServer from apscheduler.schedulers.background import BackgroundScheduler class SchedulerService(rpyc.Service): class exposed_Monitor(object): # exposing names is not limited to methods :) def __init__(self, callback): # callback方法是server.py的回調方法,假如想添加不一樣事件函數,建議所有傳進來在__init__函數初始化全部 # 這裏須要用rpyc.async_異步加載回調函數 self.callback = rpyc.async_(callback) def exposed_add_job(self, *args, **kwargs): print("添加任務:", args, kwargs) return scheduler.add_job(self.callback, *args, **kwargs) def exposed_pause_job(self, job_id, jobstore=None): return scheduler.pause_job(job_id, jobstore) def exposed_resume_job(self, job_id, jobstore=None): return scheduler.resume_job(job_id, jobstore) def exposed_remove_job(self, job_id, jobstore=None): scheduler.remove_job(job_id, jobstore) if __name__ == '__main__': scheduler = BackgroundScheduler() scheduler.start() protocol_config = {'allow_public_attrs': True} server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config) try: server.start() except (KeyboardInterrupt, SystemExit): pass finally: scheduler.shutdown()
####2.3總結
rpc負責添加job,執行函數在rpc,serve只作添加調用後關閉,這種方式會致使業務代碼須要在rpc在寫一遍,看項目進展是在什麼狀態lou... rpc負責添加job,執行函數在server,這種方式的弊端就是和rpc的連接不能斷開(加入須要一個按期執行的任務),只能保持一個長鏈接狀態...