Celery是一個專一於實時處理和任務調度的分佈式任務隊列。所謂任務就是消息,消息中的有效載荷中包含要執行任務須要的所有數據。html
使用Celery常見場景:python
Celery特性:redis
Celery架構圖: 數據庫
產生任務的方式有兩種:django
Celery組件介紹:json
Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、Zookeeper、SQLAlchemy等做爲消息代理,但用於生產環境只有RabbitMQ和Redis,官方推薦RabbitMQ來做爲Celery的消息代理。後端
在客戶端和消費者之間傳輸數據須要序列化和反序列化,Celery支持的序列化方案有pickle,json,yaml,msgpack,通常使用json瀏覽器
爲了提供更高的性能,採用以下方案:緩存
sudo apt-get install rabbitmq-server sudo apt-get install redis-server sudo pip install "celery[librabbitmq,redis,msgpack]"
項目目錄結構服務器
tree project project ├── celeryconfig.py ├── celery.py ├── __init__.py └── tasks.py
主程序celery.py:
#!/usr/bin/env python # -*- coding:utf-8 -*- # 拒絕隱式引入,由於celery.py的名字和celery的包名衝突,須要使用這條語句讓程序正確運行 from __future__ import absolute_import from celery import Celery # app是 Celery類的實例,建立的時候添加了project.tasks這個模塊,也就是包含了project/tasks.py這個文件 app = Celery('project', include=['project.tasks']) # 把Celery配置存放進project/celeryconfig文件,使用app.config_from_object加載配置 app.config_from_object('project.celeryconfig') if __name__ == '__main__': app.start()
任務函數文件tasks.py:
#!/usr/bin/env python # -*- coding:utf-8 -*- from __future__ import absolute_import from project.celery import app # 讓任務函數生效的方法是添加app.task裝飾器 @app.task def add(x, y): return x + y
配置文件celeryconfig.py:
# -*- coding:utf-8 -*- BROKER_URL = 'amqp://guest:guest@localhost:5672//' # 使用RabbitMQ做爲消息代理 CELERY_TASK_PROTOCOL = 1 # 如今celery升級到了4.0,是老版本的librabbitmq與最新的celery4.0 Message Protocol協議不兼容,celery4.0默認使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1 CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把結果存在Redis CELERY_TASK_SERIALIZER = 'msgpack' # 任務序列化肯反序列化使用msgpack方案 CELERY_RESULT_SERIALIZER = 'json' # 讀取任務結果通常性能要求不高,因此使用可讀性更好的json CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任務過時時間 CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接收的內容類型
啓動消費者:
celery -A project.celery worker -l info
終端界面提供了消息代理和存儲結果的地址、併發數量、任務列表、交換類型等
開啓另外一個終端,用ipython調用add函數
In [1]: from project.tasks import add In [2]: r = add.delay(1,3) In [3]: r Out[3]: <AsyncResult: a14d2045-ad40-4240-bbcf-1a8f07899485> In [4]: r.result Out[4]: 4 In [5]: r.status Out[5]: u'SUCCESS' In [6]: r.successful() Out[6]: True In [7]: r.backend Out[7]: <celery.backends.redis.RedisBackend at 0x7faae433a450> # 保存在redis中
任務的task_id根據上面提到的task_id得到,能夠用下面方法得到結果
# 方法一: In [9]: task_id = 'a14d2045-ad40-4240-bbcf-1a8f07899485' In [10]: add.AsyncResult(task_id).get() Out[10]: 4 # 方法二: In [12]: from celery.result import AsyncResult In [13]: AsyncResult(task_id).get() Out[13]: 4
下面在django中模擬一下如何用celery:
首先先看沒有用celery的程序
這就形成不良好的用戶體驗了,那你先給用戶返回程序執行成功,再在後臺執行這5秒
下面就用selery
1.安裝selery ,不寫版本號默認最新,新版本只須要安裝一個celery
pip install celery==3.1.25
pip install celery-with-redis==3.0
pip install django-celery==3.2.1
2.在應用目錄下建立名爲task.py的文件,該文件用於封裝耗時任務的
3.配置setting.py
3.1註冊進app
INSTALLED_APP=[
. . . . +
‘djcelery’]
3.2 再加上,
import djcelery
djcelery.setup_loader()
BROKER_URL='redis://:密碼@數據庫ip地址:6379/0' #格式不能錯 ,注意分號
CELERY_IMPORTS=(‘myApp.task’) #任務文件
4.遷移,生成celery須要的表
python manage.migrate
5.將名爲celery.py的文件加入到同工程目錄同名的目錄下,這個文件是官網給咱們寫好的
from __future__ import absolute_import import os from celery import Celery from django.conf import settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings') app = Celery('project') #這裏改爲本身工程名 app.config_from_object('django.conf:settings') app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
6.修改與工程目錄同名目錄下的__init__.py文件,上圖紅色箭頭指處
from project.celery import app as celery_app
7.將耗時程序封裝成任務,在task.py文件裏
在view.py文件裏
這時你去跑程序,會發現很快,但卻沒有打印開始與結束這兩句,其實它沒有執行到耗時程序,還缺最後一步
8.啓動redis,確保服務器啓動
9.啓動worker,在黑窗口進入工程目錄下,執行命令
python manage.py celery worker --loglevel=info
訪問瀏覽器,就能夠看到很快的返回程序執行成功,而耗時程序在黑窗口進行打印:開始程序執行,等待5秒,結束程序執行
相互不影響
瀏覽器返回
黑窗口打印