python
redis
專一於實時處理的異步任務隊列django
同時也支持任務調度windows
架構詳解:架構
併發
Celery自己不提供消息服務,可是能夠方便的和第三方提供的消息中間件集成。包括,RabbitMQ, Redis等等app
Worker是Celery提供的任務執行的單元,worker併發的運行在分佈式的系統節點中。異步
Task result store用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括AMQP, redis等async
二:使用場景分佈式
定時任務:定時執行某件事情,好比天天數據統計
三:Celery的安裝與配置
消息中間件:RabbitMQ/Redis
app=Celery('任務名',backend='xxx',broker='xxx')
Demo介紹:
建立py文件:celery_app_task.py
import celery import time # broker='redis://127.0.0.1:6379/2' 不加密碼 backend='redis://:123456@127.0.0.1:6379/1' broker='redis://:123456@127.0.0.1:6379/2' cel=celery.Celery('test',backend=backend,broker=broker) @cel.task def add(x,y): return x+y
建立py文件:add_task.py,添加任務
from celery_app_task import add result = add.delay(4,5) print(result.id)
注:windows下:celery worker -A celery_app_task -l info -P eventlet
from celery_app_task import cel if __name__ == '__main__': cel.worker_main() # cel.worker_main(argv=['--loglevel=info')
建立py文件:result.py,查看任務執行結果
from celery.result import AsyncResult from celery_app_task import cel async = AsyncResult(id="e919d97d-2938-4d0f-9265-fd8237dc2aa3", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
執行 run.py ,或者執行命令:celery worker -A celery_app_task -l info
執行 result.py,檢查任務狀態並獲取結果
pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery鏈接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 全部任務函數 │ └── tasks2.py # 全部任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務
celery.py
from celery import Celery cel = Celery('celery_demo', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', # 包含如下兩個任務文件,去相應的py文件中找任務,對多個任務作分類 include=['celery_task.tasks1', 'celery_task.tasks2' ]) # 時區 cel.conf.timezone = 'Asia/Shanghai' # 是否使用UTC cel.conf.enable_utc = False
tasks1.py
import time from celery_task.celery import cel @cel.task def test_celery(res): time.sleep(5) return "test_celery任務結果:%s"%res
tasks2.py
import time from celery_task.celery import cel @cel.task def test_celery2(res): time.sleep(5) return "test_celery2任務結果:%s"%res
check_result.py
from celery.result import AsyncResult from celery_task.celery import cel async = AsyncResult(id="08eb2778-24e1-44e4-a54b-56990b3519ef", app=cel) if async.successful(): result = async.get() print(result) # result.forget() # 將結果刪除,執行完成,結果不會自動刪除 # async.revoke(terminate=True) # 不管如今是何時,都要終止 # async.revoke(terminate=False) # 若是任務尚未開始執行呢,那麼就能夠終止。 elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中被執行') elif async.status == 'RETRY': print('任務異常後正在重試') elif async.status == 'STARTED': print('任務已經開始被執行')
send_task.py
from celery_task.tasks1 import test_celery from celery_task.tasks2 import test_celery2 # 當即告知celery去執行test_celery任務,並傳入一個參數 result = test_celery.delay('第一個的執行') print(result.id) result = test_celery2.delay('第二個的執行') print(result.id)
添加任務(執行send_task.py),開啓work:celery worker -A celery_task -l info -P eventlet,檢查任務執行結果(執行check_result.py)
from celery_app_task import add from datetime import datetime # 方式一 # v1 = datetime(2019, 2, 13, 18, 19, 56) # print(v1) # v2 = datetime.utcfromtimestamp(v1.timestamp()) # print(v2) # result = add.apply_async(args=[1, 3], eta=v2) # print(result.id) # 方式二 ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=10) task_time = utc_ctime + time_delay # 使用apply_async並設定時間 result = add.apply_async(args=[4, 3], eta=task_time) print(result.id)
多任務結構中celery.py修改以下
from datetime import timedelta from celery import Celery from celery.schedules import crontab cel = Celery('tasks', broker='redis://127.0.0.1:6379/1', backend='redis://127.0.0.1:6379/2', include=[ 'celery_task.tasks1', 'celery_task.tasks2', ]) cel.conf.timezone = 'Asia/Shanghai' cel.conf.enable_utc = False cel.conf.beat_schedule = { # 名字隨意命名 'add-every-10-seconds': { # 執行tasks1下的test_celery函數 'task': 'celery_task.tasks1.test_celery', # 'schedule': 1.0, # 'schedule': crontab(minute="*/1"), # 每隔2秒執行一次 'schedule': timedelta(seconds=2), # 傳遞參數 'args': ('test',) }, # 'add-every-12-seconds': { # 'task': 'celery_task.tasks1.test_celery', # 每一年4月11號,8點42分執行 # 'schedule': crontab(minute=42, hour=8, day_of_month=11, month_of_year=4), # 'args': ('test2',) # }, }
啓動work執行:celery worker -A celery_task -l info -P eventlet
安裝django-celery
在項目目錄下建立celeryconfig.py
import djcelery djcelery.setup_loader() CELERY_IMPORTS=( 'app01.tasks', ) #有些狀況能夠防止死鎖 CELERYD_FORCE_EXECV=True # 設置併發worker數量 CELERYD_CONCURRENCY=4 #容許重試 CELERY_ACKS_LATE=True # 每一個worker最多執行100個任務被銷燬,能夠防止內存泄漏 CELERYD_MAX_TASKS_PER_CHILD=100 # 超時時間 CELERYD_TASK_TIME_LIMIT=12*30
在app01目錄下建立tasks.py
from celery import task @task def add(a,b): with open('a.text', 'a', encoding='utf-8') as f: f.write('a') print(a+b)
視圖函數views.py
from django.shortcuts import render,HttpResponse from app01.tasks import add from datetime import datetime def test(request): # result=add.delay(2,3) ctime = datetime.now() # 默認用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta time_delay = timedelta(seconds=5) task_time = utc_ctime + time_delay result = add.apply_async(args=[4, 3], eta=task_time) print(result.id) return HttpResponse('ok')
settings.py
INSTALLED_APPS = [ ... 'djcelery', 'app01' ] ... from djagocele import celeryconfig BROKER_BACKEND='redis' BOOKER_URL='redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/2'
urls.py
from django.conf.urls import url from django.contrib import admin from app01 import views urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^test/', views.test), ]
開啓worker: python3 manage.py celery worker