Celery 初步使用心得

1、 基本介紹

Celery是一個專一於實時處理和任務調度的分佈式任務隊列。所謂任務就是消息,消息中的有效載荷中包含要執行任務須要的所有數據。html

使用Celery常見場景:python

  1. Web應用。當用戶觸發的一個操做須要較長時間才能執行完成時,能夠把它做爲任務交給Celery去異步執行,執行完再返回給用戶。這段時間用戶不須要等待,提升了網站的總體吞吐量和響應時間。
  2. 定時任務。生產環境常常會跑一些定時任務。假若有上千臺的服務器、上千種任務,定時任務的管理會很困難,Celery能夠幫助咱們快速在不一樣的機器設定不一樣種任務。
  3. 其餘能夠異步執行的任務。爲了充分提升網站性能,對於請求和響應以外的那些不要求必須同步完成的附加工做均可以異步完成。好比發送郵件/短信、推送消息、清理/設置緩存等。

Celery特性:redis

  • 方便地查看定時任務的執行狀況,好比執行是否成功、當前狀態、執行任務花費的時間等。
  • 可使用功能齊備的管理後臺或者命令行添加、更新、刪除任務。
  • 方便把任務和配置管理相關聯。
  • 可選多進程、Evenlent和Gevent三種模式併發執行。
  • 提供多種錯誤處理機制。
  • 提供多種任務原語,方便實現任務分組、拆分和調用鏈。
  • 支持多種消息代理和存儲後端。

Celery架構圖: 輸入圖片說明數據庫

產生任務的方式有兩種:django

  1. 發佈者發佈任務(Web應用)
  2. 任務調度定期發佈任務(定時任務)

Celery組件介紹:json

  • Celery Beat: 任務調度器,Beat進程會讀取配置文件的內容,週期性地將配置中到期須要執行的任務發送給任務隊列。
  • Celery Worker: 執行任務的消費者,一般會在多臺服務器運行多個消費者來提升執行效率。
  • Broker: 消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列在按序分發給任務消費方。
  • Producer: 調用Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
  • Result Backend: 任務處理完後保存狀態信息和結果,以供查詢。Celery默認支持Redis、RabbitMQ、MongoDB、Django ORM SQLAlchemy等方式。

Celery目前支持RabbitMQ、Redis、MongoDB、Beanstalk、Zookeeper、SQLAlchemy等做爲消息代理,但用於生產環境只有RabbitMQ和Redis,官方推薦RabbitMQ來做爲Celery的消息代理。後端

在客戶端和消費者之間傳輸數據須要序列化和反序列化,Celery支持的序列化方案有pickle,json,yaml,msgpack,通常使用json瀏覽器

 2、安裝配置Celery

爲了提供更高的性能,採用以下方案:緩存

  • 選擇RabbitMQ做爲消息代理。
  • RabbitMQ的Python客戶端選擇librabbitmq這個C庫。
  • 選擇Msgpack作序列化
  • 選擇Redis作結果存儲
sudo apt-get install rabbitmq-server
sudo apt-get install redis-server
sudo pip install "celery[librabbitmq,redis,msgpack]"

3、示例演示

項目目錄結構服務器

tree project
project
├── celeryconfig.py
├── celery.py
├── __init__.py
└── tasks.py

主程序celery.py:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# 拒絕隱式引入,由於celery.py的名字和celery的包名衝突,須要使用這條語句讓程序正確運行
from __future__ import absolute_import

from celery import Celery

# app是 Celery類的實例,建立的時候添加了project.tasks這個模塊,也就是包含了project/tasks.py這個文件
app = Celery('project', include=['project.tasks'])

# 把Celery配置存放進project/celeryconfig文件,使用app.config_from_object加載配置
app.config_from_object('project.celeryconfig')


if __name__ == '__main__':
    app.start()

任務函數文件tasks.py:

#!/usr/bin/env python
# -*- coding:utf-8 -*-


from __future__ import absolute_import

from project.celery import app

# 讓任務函數生效的方法是添加app.task裝飾器
@app.task
def add(x, y):
    return x + y

配置文件celeryconfig.py:

# -*- coding:utf-8 -*-
BROKER_URL = 'amqp://guest:guest@localhost:5672//'  # 使用RabbitMQ做爲消息代理
CELERY_TASK_PROTOCOL = 1  # 如今celery升級到了4.0,是老版本的librabbitmq與最新的celery4.0 Message Protocol協議不兼容,celery4.0默認使用Task messages Version 2 ,而librabbitmq使用Task messages Version 1
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0' # 把結果存在Redis
CELERY_TASK_SERIALIZER = 'msgpack'  # 任務序列化肯反序列化使用msgpack方案
CELERY_RESULT_SERIALIZER = 'json'   # 讀取任務結果通常性能要求不高,因此使用可讀性更好的json
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24  # 任務過時時間
CELERY_ACCEPT_CONTENT = ['json', 'msgpack'] # 指定接收的內容類型

啓動消費者:

celery -A project.celery  worker -l info
終端界面提供了消息代理和存儲結果的地址、併發數量、任務列表、交換類型等

開啓另外一個終端,用ipython調用add函數

In [1]: from project.tasks import add

In [2]: r = add.delay(1,3)

In [3]: r
Out[3]: <AsyncResult: a14d2045-ad40-4240-bbcf-1a8f07899485>

In [4]: r.result
Out[4]: 4

In [5]: r.status
Out[5]: u'SUCCESS'

In [6]: r.successful()
Out[6]: True

In [7]: r.backend
Out[7]: <celery.backends.redis.RedisBackend at 0x7faae433a450>  # 保存在redis中

任務的task_id根據上面提到的task_id得到,能夠用下面方法得到結果

# 方法一:

In [9]: task_id = 'a14d2045-ad40-4240-bbcf-1a8f07899485'

In [10]: add.AsyncResult(task_id).get()
Out[10]: 4

# 方法二:

In [12]: from celery.result import AsyncResult

In [13]: AsyncResult(task_id).get()
Out[13]: 4 

4、Django中測試Celery異步郵件

下面在django中模擬一下如何用celery:

首先先看沒有用celery的程序

        

這就形成不良好的用戶體驗了,那你先給用戶返回程序執行成功,再在後臺執行這5秒

 

下面就用selery

 

1.安裝selery ,不寫版本號默認最新,新版本只須要安裝一個celery

pip install celery==3.1.25

pip install celery-with-redis==3.0

pip install django-celery==3.2.1

 

2.在應用目錄下建立名爲task.py的文件,該文件用於封裝耗時任務的

 

3.配置setting.py

 

  3.1註冊進app

  INSTALLED_APP=[

    . . . .  +

 ‘djcelery’]

 

   3.2 再加上,

   import djcelery

   djcelery.setup_loader()

   BROKER_URL='redis://:密碼@數據庫ip地址:6379/0'      #格式不能錯 ,注意分號

   CELERY_IMPORTS=(‘myApp.task’)        #任務文件

 

4.遷移,生成celery須要的表

python manage.migrate

 

5.將名爲celery.py的文件加入到同工程目錄同名的目錄下,這個文件是官網給咱們寫好的

 

from __future__ import absolute_import

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'whthas_home.settings')

app = Celery('project')     #這裏改爲本身工程名 app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

 

 

6.修改與工程目錄同名目錄下的__init__.py文件,上圖紅色箭頭指處

 

from project.celery import app as celery_app
 
 

7.將耗時程序封裝成任務,在task.py文件裏

 

在view.py文件裏

這時你去跑程序,會發現很快,但卻沒有打印開始與結束這兩句,其實它沒有執行到耗時程序,還缺最後一步

 

8.啓動redis,確保服務器啓動

 

9.啓動worker,在黑窗口進入工程目錄下,執行命令

python manage.py celery worker --loglevel=info

 

訪問瀏覽器,就能夠看到很快的返回程序執行成功,而耗時程序在黑窗口進行打印:開始程序執行,等待5秒,結束程序執行

相互不影響

瀏覽器返回

 

 黑窗口打印

相關文章
相關標籤/搜索