FastAPI+apSheduler動態定時任務


###閱讀目錄linux

####1、apSheduler ####2、實戰應用git

apSheduler

###1.安裝github

pip install apschedulersql

###2.基礎組件mongodb

  • triggers 觸發器
  • job stores job存儲
  • executors 執行器
  • schedulers 調度器

###3.選擇合適的調度器,存儲器,執行器,觸發器 ####3.1調度器(schedulers)後端

  • BlockingScheduler: 進程中只有調度器
  • BackgroundScheduler: 非如下框架,且但願運行在後臺
  • AsyncIOScheduler: 應用程序使用asyncio
  • GeventScheduler: 應用程序使用gevent
  • TornadoScheduler: 構建Tornado
  • TwistedScheduler: 構建Twisted應用
  • QtScheduler: 構建Qt應用

####3.2存儲器(job stores)api

  • 持久化存儲job,經過SQLAlchemyJobStore設置存儲連接
  • 非持久化存儲job,重啓時從新建立job,默認MemoryJobStore內存存儲

####3.3執行器(executors)併發

  • processpoolexecutor,CUP密集型業務,可選進程池,也能夠同線程池同時使用
  • threadpoolexecutor,默人線程池

####3.4觸發器(triggers )app

  • date: 設置日期,針對某個時間點運行一次job
  • interval: 固定時間間隔運行job
  • cron: 相似linux-crontab,某個時間點按期運行job

###4.配置調度器框架

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    # 能夠配置多個存儲
    'mongo': {'type': 'mongodb'}, 
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存儲連接
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工做線程數20
    'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工做進程數爲5
}
job_defaults = {
    'coalesce': False,   # 關閉新job的合併,當job延誤或者異常緣由未執行時
    'max_instances': 3   # 併發運行新job默認最大實例多少
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc做爲調度程序的時區

###5.調度器的增刪改查

import os
import time
from apscheduler.schedulers.background import BackgroundScheduler

def print_time(name):
    print(f'{name} - {time.ctime()}')

def add_job(job_id, func, args, seconds):
    """添加job"""
    print(f"添加job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)

def remove_job(job_id):
    """移除job"""
    scheduler.remove_job(job_id)
    print(f"移除job - {job_id}")

def pause_job(job_id):
    """中止job"""
    scheduler.pause_job(job_id)
    print(f"中止job - {job_id}")

def resume_job(job_id):
    """恢復job"""
    scheduler.resume_job(job_id)
    print(f"恢復job - {job_id}")

def get_jobs():
    """獲取全部job信息,包括已中止的"""
    res = scheduler.get_jobs()
    print(f"全部job - {res}")

def print_jobs():
    print(f"詳細job信息")
    scheduler.print_jobs()

def start():
    """啓動調度器"""
    scheduler.start()

def shutdown():
    """關閉調度器"""
    scheduler.shutdown()

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    start()
    print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
    add_job('job_A', func=print_time, args=("A", ), seconds=1)
    add_job('job_B', func=print_time, args=("B", ), seconds=2)
    time.sleep(6)
    pause_job('job_A')
    get_jobs()
    time.sleep(6)
    print_jobs()
    resume_job('job_A')
    time.sleep(6)
    remove_job('job_A')
    time.sleep(6)
    try:
        shutdown()
    except RuntimeError:
        pass

###6.調度事件 能夠將事件偵聽器附加到調度程序。調度程序事件在某些狀況下會被觸發,而且可能會在其中攜帶有關該特定事件詳細信息的附加信息。經過給add_listener()提供適當的掩碼參數或者將不一樣的常量放在一塊兒,能夠只監聽特定類型的事件。用一個參數調用偵聽器callable,即event對象。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

###7.配置日誌

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

實戰應用

###1.fastapi動態添加定時任務

import asyncio
import datetime
import uvicorn
from fastapi import FastAPI, Body
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger

app = FastAPI(title='fast-api')

scheduler = None


@app.on_event('startup')
def init_scheduler():
    """初始化"""
    jobstores = {
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存儲連接
    }
    executors = {
        'default': {'type': 'threadpool', 'max_workers': 20},  # 最大工做線程數20
        'processpool': ProcessPoolExecutor(max_workers=5)  # 最大工做進程數爲5
    }
    global scheduler
    scheduler = AsyncIOScheduler()
    scheduler.configure(jobstores=jobstores, executors=executors)
    # 添加一個coroutine執行,結果很不理想...
    scheduler.add_job(tick, 'interval', seconds=3)
    print("啓動調度器...")

    scheduler.start()


def print_time(name):
    print(f'{name} - {datetime.datetime.now()}')


async def tick():
    print('Tick! The time is: %s' % datetime.datetime.now())
    await asyncio.sleep(1)


@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
    """添加job"""
    scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron))
    return {"msg": "success!"}


@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
    """移除job"""
    scheduler.remove_job(job_id)
    return {"msg": "success!"}


if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5566)

####1.1思考

  • 在項目中定時任務要和後端運行須要運行在一個進程中嗎?
  • 後端頻繁發佈代碼,怎麼避免影響定時任務執行呢?
  • 把定時任務抽離搭建服務,如何去作呢?
  • apscheduler在多進程中會屢次加載job,致使job重複執行,怎麼解決呢?

###2.rpyc實現定時任務服務註冊

針對上面的三個思考,官方也給出了基於rpyc解決的demo

####2.1rpc負責添加job,執行函數在rpc,serve只作添加調用後關閉

  • 見官方demo

####2.2rpc負責添加job,執行函數在server #####server.py

import datetime
import uvicorn
from fastapi import FastAPI, Body
import rpyc

app = FastAPI(title='fast-api')

conn = None
bgsrv = None
mon = None

@app.on_event('startup')
def init_scheduler():
    """初始化"""
    global conn,bgsrv,mon
    conn = rpyc.connect("localhost", 12345)
    # create a bg thread to process incoming events
    bgsrv = rpyc.BgServingThread(conn)
    mon = conn.root.Monitor(print_time)

def print_time(name):
    print(f'{name} - {datetime.datetime.now()}')

def from_crontab(cron):
    values = cron.split(' ')
    return {
        'minute': values[0],
        'hour': values[1],
        'day': values[2],
        'month': values[3],
        'day_of_week': values[4],
    }


@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
    """添加job"""
    mon.add_job(id=job_id, args=(job_id, ), trigger='cron',  **from_crontab(cron))
    return {"msg": "success!"}

@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
    """移除job"""
    mon.remove_job(job_id)
    return {"msg": "success!"}

if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5566)
rpc.py
import rpyc
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler

class SchedulerService(rpyc.Service):

    class exposed_Monitor(object):   # exposing names is not limited to methods :)
        def __init__(self, callback):
            # callback方法是server.py的回調方法,假如想添加不一樣事件函數,建議所有傳進來在__init__函數初始化全部
            # 這裏須要用rpyc.async_異步加載回調函數
            self.callback = rpyc.async_(callback)

        def exposed_add_job(self, *args, **kwargs):
            print("添加任務:", args, kwargs)
            return scheduler.add_job(self.callback, *args, **kwargs)

        def exposed_pause_job(self, job_id, jobstore=None):
            return scheduler.pause_job(job_id, jobstore)

        def exposed_resume_job(self, job_id, jobstore=None):
            return scheduler.resume_job(job_id, jobstore)

        def exposed_remove_job(self, job_id, jobstore=None):
            scheduler.remove_job(job_id, jobstore)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.start()
    protocol_config = {'allow_public_attrs': True}
    server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
    try:
        server.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        scheduler.shutdown()

####2.3總結

rpc負責添加job,執行函數在rpc,serve只作添加調用後關閉,這種方式會致使業務代碼須要在rpc在寫一遍,看項目進展是在什麼狀態lou... rpc負責添加job,執行函數在server,這種方式的弊端就是和rpc的連接不能斷開(加入須要一個按期執行的任務),只能保持一個長鏈接狀態...

相關文章
相關標籤/搜索