celery簡介

Celery簡介

celery userguidehtml

知乎大神解釋celerypython

Celery(芹菜)是基於Python開發的分佈式任務隊列。它支持使用任務隊列的方式在分佈的機器/進程/線程上執行任務調度。redis

Celery架構

架構圖以下:
Celery架構數據庫

Celery包括以下組件:json

  • Celery Beat

任務調度器,Beat進程會讀取配置文件的內容,週期性的將配置中到期須要執行的任務發送給任務隊列安全

  • celery Worker

執行任務的消費者,一般會在多臺服務器運行多個消費者來提升執行效率bash

  • Broker

消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列在按序分發給任務消費方(一般是消息隊列或者數據庫)服務器

  • Producer

調用了Celery提供的API,函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者數據結構

  • Result Backend

任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持 Redis,RabbitMQ, MongoDB,Django ORM,SQLAIchemy等方式架構

中間件選擇

Celery目前支持不少第三方軟件做爲消息代理,但適用於生產環境的只有RabbitMQ和Redis,至於其餘的方式,一是支持有限,二是可能得不到更好的技術支持。Celery官方推薦的是RabbitMQ。

Celery序列化

在客戶端和消費者之間傳輸數據須要序列化和反序列化,Celery支持以下表的序列化方案:

方案 說明
pickle pickle是Python標準庫中的一個模塊,支持Python內置的數據結構,可是它是Python的專有協議。
從Celery3.2開始,因爲安全性等緣由Celery將拒絕pickle這個方案
json json支持多種語言,可用於跨語言方案
yaml yaml的表達能力更強,支持的數據類型比json多,可是python客戶端的性能不如json
msgpack msgpack是一個二進制的類json的序列化方案,可是比json的數據結構更小、更快

簡單項目

項目目錄結構以下:

/root/test/proj/celery
├── celeryconfig.py
├── celery.py
├── __init__.py
└── tasks.py

先看一下主程序celery.py:

#!/usr/bin/env python
#coding:utf8

#拒絕隱式引入,由於celery.py的名字和celery的包名衝突,須要使用這條語句讓程序正常運行,不然「from celery import Celery」這條語句將會報錯,由於首先找到的celery.py文件中並無Celery這個類
from __future__ import absolute_import
from celery import Celery


# app是Celery類的實例,建立的時候添加了celery.tasks這個模塊,也就是包含了celery/tasks.py這個文件
app = Celery('celery',include=['celery.tasks'])

# 把Celery配置存放進celery/celeryconfig.py文件,使用app.config_from_object加載配置
app.config_from_object('celery.celeryconfig')

if __name__ == "__main__":
    app.start()

存聽任務函數的文件tasks.py:

#!/usr/bin/env python
#coding:utf8

from __future__ import absolute_import
from celery.celery import app



@app.task
def add(x, y):
    return x+y

tasks.py只有一個任務函數add,讓它生效的最直接的方法就是添加app.task這個裝飾器。

celery配置文件celeryconfig.py:

# 使用Redis做爲消息代理
BROKER_URL = 'redis://192.168.189.100:6379/0'

# 把任務結果保存在Redis中
CELERY_RESULT_BACKEND = 'redis://192.168.189.100:6379/1'

# 任務序列化和反序列化使用msgpack方案
CELERY_TASK_SERIALIZER = 'msgpack'

# 讀取任務結果通常性能要求不高,因此使用了可讀性更好的JSON
CELERY_RESULT_SERIALIZER = 'json'

# 任務過時時間,這樣寫更加明顯
CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24

# 指定接受的內容類型
CELERY_ACCEPT_CONTENT = ['json', 'msgpack']

這個例子中沒有任務調度相關的內容,若是有的話就要使用Queue類了,因此只須要啓動消費者:

celery -A proj worker -l info

-A參數默認會尋找proj.celery這個模塊,其實使用celery做爲模塊文件名字不怎麼合理。可使用其餘名字。舉個例子,假如是proj/app.py,可使用以下命令啓動:

celery -A proj.app worker -l info
相關文章
相關標籤/搜索