celery是一個基於Python的分佈式調度系統,文檔在這 ,最近有個需求,想要動態的添加任務而不用重啓celery服務,找了一圈沒找到什麼好辦法(也有多是文檔沒看仔細),因此只能本身實現囉python
爲celery動態添加任務,首先我想到的是傳遞一個函數進去,讓某個特定任務去執行這個傳遞過去的函數,就像這樣app
@app.task def execute(func, *args, **kwargs): return func(*args, **kwargs)
很惋惜,會出現這樣的錯誤async
kombu.exceptions.EncodeError: Object of type 'function' is not JSON serializable
換一種序列化方式分佈式
@app.task(serializer='pickle') def execute(func, *args, **kwargs): return func(*args, **kwargs)
結果又出現一大串錯誤信息函數
ERROR/MainProcess] Pool callback raised exception: ContentDisallowed('Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)',) Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: 'chord' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__ return obj.__dict__[self.__name__] KeyError: '_payload'
換一種思路測試
func = import_string(func)
不知道這樣是否能夠,結果測試: Noui
哎,流年不利.code
最後一直測試,一直測試,終於找到了一種辦法,直接上代碼orm
from importlib import import_module, reload app.conf.CELERY_IMPORTS = ['task', 'task.all_task'] def import_string(import_name): import_name = str(import_name).replace(':', '.') modules = import_name.split('.') mod = import_module(modules[0]) for comp in modules[1:]: if not hasattr(mod, comp): reload(mod) mod = getattr(mod, comp) return mod @app.task def execute(func, *args, **kwargs): func = import_string(func) return func(*args, **kwargs)
項目結構是這樣的隊列
├── celery_app.py
├── config.py
├── task
│ ├── all_task.py
│ ├── __init__.py
注意: 任務必須大於等於兩層目錄
之後每次添加任務均可以先添加到all_task.py裏,調用時不用再重啓celery服務
# task/all_task.py def ee(c, d): return c, d, '你好' # example from celery_app import execute execute.delay('task.all_task.ee', 2, 444)
ok,另外發現celery也支持任務定時調用,就像這樣
execute.apply_async(args=['task.all_task.aa'], eta=datetime(2017, 7, 9, 8, 12, 0))
簡單實現一個任務重複調用的功能
@app.task def interval(func, seconds, args=(), task_id=None): next_run_time = current_time() + timedelta(seconds=seconds) kwargs = dict(args=(func, seconds, args), eta=next_run_time) if task_id is not None: kwargs.update(task_id=task_id) interval.apply_async(**kwargs) func = import_string(func) return func(*args)
大概意思就是先計算下次運行的時間,而後把任務添加到celery隊列裏,這裏有個task_id有些問題,由於假設添加了每隔3s執行一個任務,
它的task_id默認會使用uuid生成,若是想要再移除這個任務就不太方便,自定task_id可能會好一些,另外也許須要判斷task_id是否存在
AsyncResult(task_id).state
ok,再獻上一個好用的函數
from inspect import getmembers, isfunction def get_tasks(module='task'): return [{ 'name': 'task:{}'.format(f[1].__name__), 'doc': f[1].__doc__, } for f in getmembers(import_module(module), isfunction)]
就這樣.