Celery 是一個簡單、靈活且可靠的,處理大量消息的分佈式系統,而且提供維護這樣一個系統的必需工具。它是一個專一於實時處理的任務隊列,同時也支持任務調度html
pip install celery
建立s1文件node
from celery import Celery #tasks:當前模塊的名稱,可隨意寫,可是必須存在 #broker:指定要使用的消息中間件, #中間件可參考:http://docs.jinkan.org/docs/celery/getting-started/first-steps-with-celery.html#id4 app=Celery("tasks",broker="redis://10.0.0.23:6379/1") @app.task def add(): return "add" @app.task def add1(): return "add1"
運行celery服務python
celery worker -A s1 -l info #s1爲文件名 info:日誌級別
建立一個新文件,使用 delay() 方法來調用任務linux
from s1 import add s=add.delay() print(s)
此時s1文件會報錯,由於在celery4.0後再也不執行windows系統,能夠安裝eventlet來解決報錯,當讓也能夠使用4.0之前的版本redis
Traceback (most recent call last): File "c:\program files\python36\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs))) File "c:\program files\python36\lib\site-packages\celery\app\trace.py", line 544, in _fast_trace_task tasks, accept, hostname = _loc ValueError: not enough values to unpack (expected 3, got 0)
pip install eventlet
指定eventlet啓動數據庫
celery worker -A s1 -l info -P eventlet #windows下須要使用 -P eventlet
from celery import Celery #使用backend保存結果,這裏使用了redis,推薦使用RabbitMQ app=Celery("tasks",broker="redis://10.0.0.23:6379/1",backend="redis://10.0.0.23:6379/2") @app.task def add(): return "add" @app.task def add1(): return "add1"
查看redis數據庫django
[root@node1 ~]# redis-cli -h 10.0.0.23 10.0.0.23:6379> SELECT 2 10.0.0.23:6379[2]> KEYS * 1) "celery-task-meta-908f60a1-2c23-4403-ab89-518691d76f48" 2) "celery-task-meta-8172677a-a5ad-48ac-93e6-8473ba564766" 3) "celery-task-meta-4e116ff9-f51f-4797-beb0-b106ce609bac" 4) "celery-task-meta-282dafcf-1bca-4414-bbc1-87759140bcd6" 5) "celery-task-meta-6fe880a6-e9e2-4609-9fc3-6d284169c8f7" 6) "celery-task-meta-1ad7acd9-bdf7-4a55-a7ca-4a7157975036" 7) "celery-task-meta-6382ea5d-e328-427b-be22-4a6cca74078a" 8) "celery-task-meta-7a260767-6246-457f-aa91-5598007a7dc2" 9) "celery-task-meta-8d7b6686-ed63-473b-b4c9-db3ebc807421" 10) "celery-task-meta-29acbc9d-539c-4db3-b07b-7ce6b7365825"
查看數據windows
10.0.0.23:6379[2]> get celery-task-meta-908f60a1-2c23-4403-ab89-518691d76f48 "{\"status\": \"SUCCESS\", \"result\": \"add\", \"traceback\": null, \"children\": [], \"task_id\": \"908f60a1-2c23-4403-ab89-518691d76f48\", \"date_done\": \"2019-04-16T12:26:04.267013\"}"
from celery.result import AsyncResult #導入AsyncResult from s1 import add,app #導入s1文件中的app for i in range(10): s = add.delay() r = AsyncResult(id=s.id,app=app) print(r.get())
s1文件app
from celery import Celery #使用backend保存結果,這裏使用了redis,推薦使用RabbitMQ app=Celery("tasks",broker="redis://10.0.0.23:6379/1",backend="redis://10.0.0.23:6379/2") @app.task def add(a,b): return ("add",a+b) @app.task def add1(): return "add1"
s2文件異步
from celery.result import AsyncResult #導入AsyncResult from s1 import add,app #導入s1文件中的app for i in range(10): s = add.delay(3,5) # r = AsyncResult(id=str(s),app=app) #能夠使用str r = AsyncResult(id=s.id,app=app) #也能夠使用s.id print(r.get()) #返回值 print(r.status) #獲取執行狀態 print(r.successful()) # 獲取執行狀態
from celery.result import AsyncResult #導入AsyncResult from s1 import add,app #導入s1文件中的app s = add.delay(3,5) print(s.id) r = AsyncResult(id=s.id,app=app) #也能夠使用s.id #只獲取報錯信息 print(r.get(propagate=False)) #獲取源文件的報錯信息內容 print(r.traceback)
t=add.apply_async((1,2),countdown=5) #表示延遲5秒鐘執行任務 print(t) print(t.get()) 問題:是延遲5秒發送仍是當即發送,消費者延遲5秒在執行那?
支持的參數 :
countdown : 等待一段時間再執行.
add.apply_async((2,3), countdown=5)
eta : 定義任務的開始時間.這裏的時間是UTC時間,這裏有坑
add.apply_async((2,3), eta=now+tiedelta(second=10))
expires : 設置超時時間.
add.apply_async((2,3), expires=60)
retry : 定時若是任務失敗後, 是否重試.
add.apply_async((2,3), retry=False)
retry_policy : 重試策略.
max_retries : 最大重試次數, 默認爲 3 次. interval_start : 重試等待的時間間隔秒數, 默認爲 0 , 表示直接重試不等待. interval_step : 每次重試讓重試間隔增長的秒數, 能夠是數字或浮點數, 默認爲 0.2 interval_max : 重試間隔最大的秒數, 即 經過 interval_step 增大到多少秒以後, 就不在增長了, 能夠是數字或者浮點數, 默認爲 0.2 .
from c import task task.conf.beat_schedule={ timezone='Asia/Shanghai', "each10s_task":{ "task":"c.add", "schedule":3, # 每3秒鐘執行一次 "args":(10,10) }, }
其實celery也支持linux裏面的crontab格式的書寫的
from celery.schedules import crontab task.conf.beat_schedule={ timezone='Asia/Shanghai', "each3m_task":{ "task":"c.add", "schedule":crontab(minute=3), #每小時的第3分鐘執行 "args":(10,10) }, "each3m_task":{ "task":"c.add", "schedule":crontab(minute=*/3), #每小時的第3分鐘執行 "args":(10,10) }, }
啓動
celery best -A s2 -l info
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings') #與項目關聯 tests爲項目名稱 app = Celery('tests',backend='redis://10.211.55.19/3',broker='redis://10.211.55.19/4') # app = Celery('tests',backend='redis://:password@10.211.55.19/3',broker='redis://:password@10.211.55.19/4') #若是redis存在密碼,password爲密碼 #建立celery對象 # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') #在django中建立celery的命名空間 # Load task modules from all registered Django app configs. app.autodiscover_tasks() #自動加載任務
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app __all__ = ['celery_app']
from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
from .tasks import add def index(request): result = add.delay(2, 3) return HttpResponse('返回數據{}'.format(result.get()))
celery -A tests worker -l info
須要安裝一個django的組件來完成這個事情
pip install django-celery-beat
將django-celery-beat添加到INSTALLED_APPS裏面
INSTALLED_APPS = ( ..., 'django_celery_beat', )
刷新到數據庫
python3 manage.py makemigrations #不執行這個會有問題 python3 manage.py migrate
admin配置
啓動beat
celery -A tests beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
啓動worker
celery -A tests worker -l info