Celery 是一個強大的 分佈式任務隊列 的 異步處理框架,它可讓任務的執行徹底脫離主程序,甚至能夠被分配到其餘主機上運行。咱們一般使用它來實現異步任務(async task)和定時任務(crontab)。linux
Celery的架構由三部分組成,消息中間件(message broker),任務執行單元(worker)和任務執行結果存儲(task result store)組成。redis
能夠看到,Celery 主要包含如下幾個模塊:mongodb
任務模塊 Task數據庫
包含異步任務和定時任務。其中,異步任務一般在業務邏輯中被觸發併發往任務隊列,而定時任務由 Celery Beat 進程週期性地將任務發往任務隊列。django
消息中間件 Brokerwindows
Broker,即爲任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery 自己不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。架構
任務執行單元 Worker併發
Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。app
任務結果存儲 Backend框架
Backend 用於存儲任務的執行結果,以供查詢。同消息中間件同樣,存儲也可以使用 RabbitMQ, redis 和 MongoDB 等。
因此總結一下celery:它是一個處理大量消息的分佈式系統,能異步任務、定時任務,使用場景通常用於耗時操做的多任務或者定時性的任務
pycharm安裝:
pip3 install celery
① 實例化一個celery對象,使用該對象.task裝飾須要管理的任務函數:
# celery_task.py from celery import Celery """ # 若是redis沒有設置密碼 broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' """ broker = 'redis://:12345@127.0.0.1:6379/1' backend = 'redis://:12345@127.0.0.1:6379/2' # c1是實例化產生的celery的名字,由於會存在多個celery app = Celery('c1', broker=broker, backend=backend) # 須要使用一個裝飾器,來管理該任務(函數) @app.task def add(x, y): import time time.sleep(1) return x + y
② 將裝飾的任務函數條件到消息隊列中,此時提交的任務函數並無執行,只是提交到worker,它會返回一個標識任務的字符串
# submit.task.py # 用於提交任務 from celery_task import add # 提交任務到消息隊列中,這裏只是將任務提交,並無執行 res = add. delay(3, 8) print(res) # 結果是標識任務的字符串(id號) # 7811a028-428c-4dd5-9135-788e26e694a7
③ 使用命令啓動worker去剛纔提交的執行任務
linux: celery worker -A celery_task -l info windows下:celery worker -A celery_task -l info -P eventlet
④ 查看結果,根據提交任務返回的字符串去查詢
# check_res.py from celery.result import AsyncResult from celery_task import app async = AsyncResult(id='bd600820-9366-4220-a679-3e435ae91e71', app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中') elif async.status == 'RETRY': print('任務異常後重試') elif async.status == 'STARTED': print('任務正在執行')
celery簡單使用流程:
-celery的使用 -pip3 install celery -寫一個py文件:celery_task -1 指定broker(消息中間件),指定backend(結果存儲) -2 實例化產生一個Celery對象 app=Celery('名字',broker,backend) -3 加裝飾器綁定任務,在函數(add)上加裝飾器app.task -4 其餘程序提交任務,先導入add,add.delay(參,參數),會將該函數提交到消息中間件,可是並不會執行,有個返回值,直接print會打印出任務的id,之後用id去查詢任務是否執行完成 -5 啓動worker去執行任務: linux: celery worker -A celery_task_s1 -l info windows下:celery worker -A celery_task_s1 -l info -P eventlet -6 查看結果:根據id去查詢 async = AsyncResult(id="bd600820-9366-4220-a679-3e435ae91e71", app=app) if async.successful(): #取出它return的值 result = async.get() print(result)
# celery的多任務結構 -項目結構: pro_cel ├── celery_task# celery相關文件夾 │ ├── celery.py # celery鏈接和配置相關文件,必須叫這個名字 │ └── tasks1.py # 全部任務函數 │ └── tasks2.py # 全部任務函數 ├── check_result.py # 檢查結果 └── send_task.py # 觸發任務 -啓動worker,celery_task是包的名字 celery worker -A celery_task -l info -P eventlet
按照多任務文件結構建立文件:
注意celery.py這個文件的文件名是固定的,不能改,task_1和task_2能夠本身定義,他倆表明自定義的任務分類,還能夠再建立task_3。。。等其它名字的任務文件,send_task.py是提交任務到worker,check_result.py是查看結果的
# celery.py from celery import Celery broker = 'redis://:12345@127.0.0.1:6379/1' backend = 'redis://:12345@127.0.0.1:6379/2' # c1是實例化產生的celery的名字,由於會存在多個celery app = Celery('c1', broker=broker, backend=backend, # 包含一些2個任務文件,去相應的py文件找任務,對多個任務進行分類 include=[ 'celery_task.task_1', 'celery_task.task_2', ]) # celery提供一些配置,具體可查看官方文檔 # app.conf.timezone = 'Asia/Shanghai'
在send_task.py種右鍵運行,提交任務到worker(這裏打印了提交的2個任務的id)
# task_1.py from celery_task.celery import app @app.task def add1(x, y): import time time.sleep(0.5) return x + y # task_2.py from celery_task.celery import app @app.task def add2(x, y): import time time.sleep(1) return x * y
# send_task.py from celery_task.task_1 import add1 from celery_task.task_2 import add2 res1 = add1.delay(3, 8) print(res1) # 16e847f3-fc14-4391-89e2-e2b3546872cf res2 = add2.delay(4, 9) print(res2) # 858c0ae5-8516-4473-8be5-7501fb856ff4
啓動worker,celery_task是包的名字
celery worker -A celery_task -l info -P eventlet
而後將打印的2個id在check_result.py中進行查詢結果
# check_reslut.py from celery.result import AsyncResult from celery_task.celery import app for i in ['16e847f3-fc14-4391-89e2-e2b3546872cf', '858c0ae5-8516-4473-8be5-7501fb856ff4']: async = AsyncResult(id=i, app=app) if async.successful(): result = async.get() print(result) elif async.failed(): print('執行失敗') elif async.status == 'PENDING': print('任務等待中') elif async.status == 'RETRY': print('任務異常後重試') elif async.status == 'STARTED': print('任務正在執行')
在提交任務的地方修改:
# send_task.py from celery_task.task_1 import add1 from celery_task.task_2 import add2 # 執行定時任務,3s之後執行add一、add2任務 from datetime import datetime # 設置任務執行時間2019年7月12日21點45分12秒 v1 = datetime(2019, 7, 12, 21, 48, 12) print(v1) # 2019-07-12 21:45:12 # 將v1時間轉成utc時間 v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) # 2019-07-12 13:45:12 # 取出要執行任務的時間對象,調用apply_async方法,args是任務函數傳的參數,eta是執行的時間 result1 = add1.apply_async(args=[3, 8], eta=v2) result2 = add2.apply_async(args=[4, 9], eta=v2) print(result1.id) print(result2.id)
在提交任務的地方修改:
# send_task.py # 方式二:實際上和方法一相似,多了一個延遲時間,也就是用如今時間和推遲執行的時間計算出任務執行的最終utc時間 # 而後也是調用apply_async方法。 from datetime import datetime ctime = datetime.now() # 默認使用utc時間 utc_ctime = datetime.utcfromtimestamp(ctime.timestamp()) from datetime import timedelta # 使用timedelta模塊,拿到10秒後的時間對象,這裏參數能夠傳秒、毫秒、微秒、分、小時、周、天 time_delay = timedelta(seconds=10) # 獲得任務運行時間: task_time = utc_ctime + time_delay result1 = add1.apply_async(args=[3, 8], eta=task_time) result2 = add2.apply_async(args=[4, 9], eta=task_time) print(result1.id) print(result2.id)
計劃任務須要在celery.py中添加代碼,而後須要beat一下,才能將計劃開啓
# celery.py中 from celery import Celery broker = 'redis://:12345@127.0.0.1:6379/1' backend = 'redis://:12345@127.0.0.1:6379/2' # c1是實例化產生的celery的名字,由於會存在多個celery app = Celery('c1', broker=broker, backend=backend, # 包含一些2個任務文件,去相應的py文件找任務,對多個任務進行分類 include=[ 'celery_task.task_1', 'celery_task.task_2', 'celery_task.task_3', ]) # celery提供一些配置,具體可查看官方文檔 # app.conf.timezone = "Asia/Shanghai" # app.conf.enable_utc = True # 計劃任務 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'submit_every_2_seconds': { # 計劃的任務執行函數 'task': 'celery_task.task_1.add1', # 每一個2秒執行一次 'schedule': timedelta(seconds=2), # 傳遞的任務函數參數 'args': (3, 9) }, 'submit_every_3_seconds': { # 計劃的任務執行函數 'task': 'celery_task.task_2.add2', # 每一個3秒執行一次 'schedule': timedelta(seconds=3), # 傳遞的任務函數參數 'args': (4, 7) }, 'submit_in_fix_datetime': { 'task': 'celery_task.task_3.add3', # 好比每一年的7月13日10點53分執行 # 注意:默認使用utc時間,當前的時間中的小時必需要-8個小時纔會到點提交 'schedule': crontab(minute=53, hour=2, day_of_month=13, month_of_year=7), ''' # 若是不想-8,能夠先設置時區,再按正常時間設置 app.conf.timezone = "Asia/Shanghai" app.conf.enable_utc = True ''' 'args': ('Hello World',) } } # 上面寫完後,須要起一個進程,啓動計劃任務 # celery beat -A celery_task -l info # 啓動worker: # celery worker -A celery_task -l info -P eventlet
Django中使用celery
django-celery:因爲djang-celery模塊對版本的要求過於嚴格,並且容易出現不少bug,因此不建議使用
直接使用celery多任務結構的,將celery多任務結構的代碼文件夾celery_task拷貝到Django項目中,而後在視圖函數中進行任務提交、而後進行結構查看。(啓動項目時候記得將worker啓動起來,注意啓動路徑要跟你拷貝的celery_task文件同級)
注意:當咱們在Django項目中使用celery,在celery的任務函數中不能直接調用django的環境(好比orm方法查詢數據庫),須要添加代碼調用Django環境
在Python腳本中調用Django環境
import os # 加載Django環境,bbs是所在的Django項目名稱 os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'bbs.settings') # 引入Django模塊 import django # 初始化Django環境 django.setup() # 從app當中導入models from app01 import models # 調用操做,拿到數據庫中的全部Book數據對象 books = models.Books.objects.all()