Celery

 Celery

http://docs.celeryproject.org/en/latest/index.htmlhtml

Celery - Distributed Task Queue

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.python

It’s a task queue with focus on real-time processing, while also supporting task scheduling.git

Celery has a large and diverse community of users and contributors, you should come join us on IRC or our mailing-list.github

Celery is Open Source and licensed under the BSD License.django

 

DEMO安裝運行:

https://github.com/fanqingsong/celery_runningapp

#install dependency
pipenv install

#run tasks proccess
pipenv run celery -A tasks worker --loglevel=info -P eventlet

# run producer
pipenv run python taskscaller.py

 

任務客戶端:async

 

 任務發送端:、post

 

 

報錯處理

https://blog.csdn.net/qq_30242609/article/details/79047660flex

運行tasks有報錯 「Celery ValueError: not enough values to unpack (expected 3, got 0)」ui

 

啓動worker的時候加一個參數,以下:

celery -A <mymodule> worker -l info -P eventlet

 

任務處理狀態

官網解釋:

http://docs.celeryproject.org/en/latest/faq.html#how-do-i-get-the-result-of-a-task-if-i-have-the-id-that-points-there

How do I get the result of a task if I have the ID that points there?

Answer: Use task.AsyncResult:

>>> result = my_task.AsyncResult(task_id) >>> result.get() 

This will give you a AsyncResult instance using the tasks current result backend.

If you need to specify a custom result backend, or you want to use the current application’s default backend you can use app.AsyncResult:

>>> result = app.AsyncResult(task_id) >>> result.get() 

 

StackOverflow

https://stackoverflow.com/questions/9034091/how-to-check-task-status-in-celery

 

Return the task_id (which is given from .delay()) and ask the celery instance afterwards about the state:

x = method.delay(1,2)
print x.task_id

When asking, get a new AsyncResult using this task_id:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

 

API

http://docs.celeryproject.org/en/latest/reference/celery.result.html

經過collect接口獲取, 底層機制是python coroutine

from celery import group from proj.celery import app @app.task(trail=True) def A(how_many): return group(B.s(i) for i in range(how_many))() @app.task(trail=True) def B(i): return pow2.delay(i) @app.task(trail=True) def pow2(i): return i ** 2 
>>> from celery.result import ResultBase >>> from proj.tasks import A >>> result = A.delay(10) >>> [v for v in result.collect() ... if not isinstance(v, (ResultBase, tuple))] [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 

 

定時任務

https://realpython.com/asynchronous-tasks-with-django-and-celery/

from celery.task.schedules import crontab from celery.decorators import periodic_task from celery.utils.log import get_task_logger from photos.utils import save_latest_flickr_image logger = get_task_logger(__name__) @periodic_task( run_every=(crontab(minute='*/15')), name="task_save_latest_flickr_image", ignore_result=True ) def task_save_latest_flickr_image(): """  Saves latest image from Flickr  """ save_latest_flickr_image() logger.info("Saved image from Flickr")

Here, we run the save_latest_flickr_image() function every fifteen minutes by wrapping the function call in a task. The @periodic_task decorator abstracts out the code to run the Celery task, leaving the tasks.py file clean and easy to read!

 

 參考:

https://www.liaoxuefeng.com/article/00137760323922531a8582c08814fb09e9930cede45e3cc000

相關文章
相關標籤/搜索
本站公眾號
   歡迎關注本站公眾號,獲取更多信息