高性能千萬級定時任務管理服務forsun擴展開發之整合celery

forsun是一個高性能高精度定時服務,能夠輕鬆管理千萬級定時任務。
項目地址: https://github.com/snower/forsunpython

forsun內置支持 shell、http、redis、thrift、beanstalk、mysql 六種到時觸發回調執行器,可是不少時候本身的項目需求千奇百怪,單一的內置執行器並不能很好的在本身的項目中整合,因此forsun也支持經過擴展Extension開發的方式將本身編寫的觸發執行器Action註冊進去。mysql

那麼咱們就來輕鬆愉快實現一個整合celery的擴展吧。git

示例(實現一個celery執行器擴展)

添加代碼celery_extension.py

# -*- coding: utf-8 -*-

import json
import logging
from concurrent.futures import ThreadPoolExecutor
from celery import Celery
from forsun.extension import Extension
from forsun.action.action import Action

app = Celery('hello', broker='amqp://guest@localhost//')
executor = ThreadPoolExecutor()


@app.task
def hello():
    return 'hello world'


@app.task
def add(x, y):
    return x + y


class CeleryAction(Action):
    METHODS = {
        "hello": hello,
        "add": add,
    }

    async def execute(self, *args, **kwargs):
        method = self.params.get("method", '')
        args = json.loads(self.params.get('args', '[]'))
        kwargs = json.loads(self.params.get('kwargs', '{}'))
        if method not in self.METHODS:
            logging.info("celery action execute unknow method %s", method)
            return
        executor.submit(self.METHODS[method], *tuple(args), **kwargs)
        logging.info("celery action execute %s", method)


class CeleryExtension(Extension):
    name = "celery"

    def register(self):
        self.register_action("celery", CeleryAction)

能夠看出實現一個擴展很是簡單,定義一個擴展類CeleryExtension繼承自forsun.extension.Extension,添加一個執行器CeleryAction繼承自forsun.extension.Action,起個名字,在擴展類register函數中註冊執行器CeleryAction,搞定。github

當建立的定時任務到期觸發時會自動調用CeleryAction的execute函數,其中當前Action實例的ts屬性保存着任務觸發時間,params即爲建立定時的params參數,提取參數解析繼續完成便可。redis

須要注意的是由於整個forsun服務使用tornado異步IO實現,因此Action的execute會使用異步調用,若是你須要作同步阻塞調用時,推薦將須要執行的方法放到ThreadPoolExecutor去執行,這樣性能會更好哦。sql

添加啓動參數加載擴展

forsund --bind=0.0.0.0 --port=6458 --http=0.0.0.0:8001 --log=/var/log/forsun.log --log-level=INFO --driver=mem --driver-mem-store-file=/var/lib/fousun/forsun.session --extension-path=./ --extension=celery_extension.CeleryExtension

如若使用conf文件配置時,那麼也在conf文件中添加擴展加載參數便可。shell

# 擴展配置
[extension]

# 擴展目錄
path=./
# 載入擴展,已;分隔
extensions=celery_extension.CeleryExtension

這時候查看日誌輸出,你會發現擴展已經成功加載了。編程

2020-03-20 14:09:20,650 1022 INFO register extension path ./
2020-03-20 14:09:20,762 1022 INFO load extension celery_extension.CeleryExtension <class 'celery_extension.CeleryExtension'>
2020-03-20 14:09:20,762 1022 INFO action register celery <class 'celery_extension.CeleryAction'>

再經過info名稱查看下當前狀態信息,能夠發如今支持的actions列表裏已經有celery支持了,很是棒,如今愉快的開始本身的項目之旅吧。json

forsun info

python_version: 3.6.9 (default, Nov  7 2019, 10:44:02)  [GCC 8.3.0]
forsun_version: 0.1.3
start_time:     2020-03-20 14:31:27.538081+08:00
cpu_user:       0.18
cpu_system:     0.1
mem_rss:        28.06M
mem_vms:        122.46M
current_time:   2020-03-20 14:31:38+08:00

stores: mem;redis
current_store:  mem
actions:        shell;http;redis;thrift;celery
bind_port:      0.0.0.0:6458
http_bind_port:
extensions:     celery_extension.CeleryExtension

Http請求測試一下

curl -X PUT -H 'Content-Type: application/json' -d '{"key": "test", "seconds": 5, "minute": 0, "hour": 0, "day": 0, "month": 0, "count": 1, "action": "celery", "params": {"method": "hello"}}' http://127.0.0.1:8001/v1/plan

{"errcode": 0, "errmsg": "", "data": {"key": "test", "second": 5, "minute": 0, "hour": 0, "day": 0, "month": 0, "week": -1, "status": 0, "count": 0, "is_time_out": true, "next_time": 1584657610, "current_count": 0, "last_timeout": 0, "created_time": 1584657605.0, "action": "celery", "params": {"method": "hello"}}}

等待5秒,你就會看到celery成功調用。bash

最後

forsun除了能夠經過Extension添加自定義執行器Action外,固然也能夠經過Extension自定義實現持久化存儲,這個後面咱有時間再介紹。

在實際工程中咱們有很是多相似訂單支付超時、配送超時之類的量大而又須要可編程控制的定時調度需求,一個方便管理而又高性能精準的定時任務調度服務顯然能夠大大節省咱們的時間,遊戲那麼好玩爲啥不能多一點呢。

相關文章
相關標籤/搜索