Celery
http://docs.celeryproject.org/en/latest/index.htmlhtml
Celery - Distributed Task Queue
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.python
It’s a task queue with focus on real-time processing, while also supporting task scheduling.git
Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.github
Celery is Open Source and licensed under the BSD License.django
DEMO安裝運行:
https://github.com/fanqingsong/celery_runningapp
#install dependency
pipenv install
#run tasks proccess
pipenv run celery -A tasks worker --loglevel=info -P eventlet
# run producer
pipenv run python taskscaller.py
任務客戶端:async
任務發送端:、post
報錯處理
https://blog.csdn.net/qq_30242609/article/details/79047660flex
運行tasks有報錯 「Celery ValueError: not enough values to unpack (expected 3, got 0)」ui
啓動worker的時候加一個參數,以下:
celery -A <mymodule> worker -l info -P eventlet
任務處理狀態
官網解釋:
http://docs.celeryproject.org/en/latest/faq.html#how-do-i-get-the-result-of-a-task-if-i-have-the-id-that-points-there
Answer: Use task.AsyncResult:
>>> result = my_task.AsyncResult(task_id) >>> result.get()
This will give you a AsyncResult
instance using the tasks current result backend.
If you need to specify a custom result backend, or you want to use the current application’s default backend you can use app.AsyncResult
:
>>> result = app.AsyncResult(task_id) >>> result.get()
StackOverflow
https://stackoverflow.com/questions/9034091/how-to-check-task-status-in-celery
Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:
x = method.delay(1,2)
print x.task_id
When asking, get a new AsyncResult using this task_id:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
API
http://docs.celeryproject.org/en/latest/reference/celery.result.html
經過collect接口獲取, 底層機制是python coroutine
from celery import group from proj.celery import app @app.task(trail=True) def A(how_many): return group(B.s(i) for i in range(how_many))() @app.task(trail=True) def B(i): return pow2.delay(i) @app.task(trail=True) def pow2(i): return i ** 2
>>> from celery.result import ResultBase >>> from proj.tasks import A >>> result = A.delay(10) >>> [v for v in result.collect() ... if not isinstance(v, (ResultBase, tuple))] [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
定時任務
https://realpython.com/asynchronous-tasks-with-django-and-celery/
from celery.task.schedules import crontab from celery.decorators import periodic_task from celery.utils.log import get_task_logger from photos.utils import save_latest_flickr_image logger = get_task_logger(__name__) @periodic_task( run_every=(crontab(minute='*/15')), name="task_save_latest_flickr_image", ignore_result=True ) def task_save_latest_flickr_image(): """ Saves latest image from Flickr """ save_latest_flickr_image() logger.info("Saved image from Flickr")
Here, we run the save_latest_flickr_image()
function every fifteen minutes by wrapping the function call in a task
. The @periodic_task
decorator abstracts out the code to run the Celery task, leaving the tasks.py file clean and easy to read!
參考:
https://www.liaoxuefeng.com/article/00137760323922531a8582c08814fb09e9930cede45e3cc000