forsun是一個高性能高精度定時服務,能夠輕鬆管理千萬級定時任務。
項目地址: https://github.com/snower/forsunpython
forsun內置支持 shell、http、redis、thrift、beanstalk、mysql 六種到時觸發回調執行器,可是不少時候本身的項目需求千奇百怪,單一的內置執行器並不能很好的在本身的項目中整合,因此forsun也支持經過擴展Extension開發的方式將本身編寫的觸發執行器Action註冊進去。mysql
那麼咱們就來輕鬆愉快實現一個整合celery的擴展吧。git
# -*- coding: utf-8 -*- import json import logging from concurrent.futures import ThreadPoolExecutor from celery import Celery from forsun.extension import Extension from forsun.action.action import Action app = Celery('hello', broker='amqp://guest@localhost//') executor = ThreadPoolExecutor() @app.task def hello(): return 'hello world' @app.task def add(x, y): return x + y class CeleryAction(Action): METHODS = { "hello": hello, "add": add, } async def execute(self, *args, **kwargs): method = self.params.get("method", '') args = json.loads(self.params.get('args', '[]')) kwargs = json.loads(self.params.get('kwargs', '{}')) if method not in self.METHODS: logging.info("celery action execute unknow method %s", method) return executor.submit(self.METHODS[method], *tuple(args), **kwargs) logging.info("celery action execute %s", method) class CeleryExtension(Extension): name = "celery" def register(self): self.register_action("celery", CeleryAction)
能夠看出實現一個擴展很是簡單,定義一個擴展類CeleryExtension繼承自forsun.extension.Extension,添加一個執行器CeleryAction繼承自forsun.extension.Action,起個名字,在擴展類register函數中註冊執行器CeleryAction,搞定。github
當建立的定時任務到期觸發時會自動調用CeleryAction的execute函數,其中當前Action實例的ts屬性保存着任務觸發時間,params即爲建立定時的params參數,提取參數解析繼續完成便可。redis
須要注意的是由於整個forsun服務使用tornado異步IO實現,因此Action的execute會使用異步調用,若是你須要作同步阻塞調用時,推薦將須要執行的方法放到ThreadPoolExecutor去執行,這樣性能會更好哦。sql
forsund --bind=0.0.0.0 --port=6458 --http=0.0.0.0:8001 --log=/var/log/forsun.log --log-level=INFO --driver=mem --driver-mem-store-file=/var/lib/fousun/forsun.session --extension-path=./ --extension=celery_extension.CeleryExtension
如若使用conf文件配置時,那麼也在conf文件中添加擴展加載參數便可。shell
# 擴展配置 [extension] # 擴展目錄 path=./ # 載入擴展,已;分隔 extensions=celery_extension.CeleryExtension
這時候查看日誌輸出,你會發現擴展已經成功加載了。編程
2020-03-20 14:09:20,650 1022 INFO register extension path ./ 2020-03-20 14:09:20,762 1022 INFO load extension celery_extension.CeleryExtension <class 'celery_extension.CeleryExtension'> 2020-03-20 14:09:20,762 1022 INFO action register celery <class 'celery_extension.CeleryAction'>
再經過info名稱查看下當前狀態信息,能夠發如今支持的actions列表裏已經有celery支持了,很是棒,如今愉快的開始本身的項目之旅吧。json
forsun info python_version: 3.6.9 (default, Nov 7 2019, 10:44:02) [GCC 8.3.0] forsun_version: 0.1.3 start_time: 2020-03-20 14:31:27.538081+08:00 cpu_user: 0.18 cpu_system: 0.1 mem_rss: 28.06M mem_vms: 122.46M current_time: 2020-03-20 14:31:38+08:00 stores: mem;redis current_store: mem actions: shell;http;redis;thrift;celery bind_port: 0.0.0.0:6458 http_bind_port: extensions: celery_extension.CeleryExtension
curl -X PUT -H 'Content-Type: application/json' -d '{"key": "test", "seconds": 5, "minute": 0, "hour": 0, "day": 0, "month": 0, "count": 1, "action": "celery", "params": {"method": "hello"}}' http://127.0.0.1:8001/v1/plan {"errcode": 0, "errmsg": "", "data": {"key": "test", "second": 5, "minute": 0, "hour": 0, "day": 0, "month": 0, "week": -1, "status": 0, "count": 0, "is_time_out": true, "next_time": 1584657610, "current_count": 0, "last_timeout": 0, "created_time": 1584657605.0, "action": "celery", "params": {"method": "hello"}}}
等待5秒,你就會看到celery成功調用。bash
forsun除了能夠經過Extension添加自定義執行器Action外,固然也能夠經過Extension自定義實現持久化存儲,這個後面咱有時間再介紹。
在實際工程中咱們有很是多相似訂單支付超時、配送超時之類的量大而又須要可編程控制的定時調度需求,一個方便管理而又高性能精準的定時任務調度服務顯然能夠大大節省咱們的時間,遊戲那麼好玩爲啥不能多一點呢。