Python開發異步任務Celery的使用教程!

1. 生產者消費者設計模式html


最經常使用的解耦方式之一,尋找中間人(broker)搭橋,保證兩個業務沒有直接關聯。
咱們稱這一解耦方式爲:生產者消費者設計模式redis

2.中間人broker數據庫


示例:此處演示Redis數據庫做爲中間人broker
Celery須要一種解決消息的發送和接受的方式,咱們把這種用來存儲消息的的中間裝置叫作message broker, 也可叫作消息中間人。
做爲中間人,咱們有幾種方案可選擇:django


1.RabbitMQ設計模式


RabbitMQ是一個功能完備,穩定的而且易於安裝的broker. 它是生產環境中最優的選擇。
使用RabbitMQ的細節參照如下連接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq
若是使用的是Ubuntu或者Debian發行版的Linux,能夠直接經過命令安裝RabbitMQ:服務器

sudo apt-get install rabbitmq-server

 

安裝完畢以後,RabbitMQ-server服務器就已經在後臺運行。
若是用的並非Ubuntu或Debian, 能夠在如下網址:
http://www.rabbitmq.com/download.html
去查找本身所須要的版本軟件。app


2.Redis異步


Redis也是一款功能完備的broker可選項,可是其更可能因意外中斷或者電源故障致使數據丟失的狀況。
關因而由那個Redis做爲Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis分佈式


1. Celery介紹url


Celery介紹:
一個簡單、靈活且可靠、處理大量消息的分佈式系統,能夠在一臺或者多臺機器上運行。
單個 Celery 進程每分鐘可處理數以百萬計的任務。
經過消息進行通訊,使用消息隊列(broker)在客戶端和消費者之間進行協調。
安裝Celery:

$ pip install -U Celery

 

Celery官方文檔
2. 建立Celery實例並加載配置


1.定義Celery包

 

2.建立Celery實例

 

 

celery_tasks.main.py

# celery啓動文件
from celery import Celery

# 爲celery使用django配置文件進行設置
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xxx.settings")

# 建立celery實例
celery_app = Celery('celery_tasks')

 

3.加載Celery配置

 

 

celery_tasks.config.py

broker_url = "redis://127.0.0.1/14"
result_backend = "redis://127.0.0.1/15"

 

celery_tasks.main.py

# celery啓動文件
from celery import Celery


# 建立celery實例
celery_app = Celery('xxx')
# 加載celery配置
celery_app.config_from_object('celery_tasks.config')

 

3. 定義發送短信任務

 

1.註冊任務:celery_tasks.main.py

# celery啓動文件
from celery import Celery


# 建立celery實例
celery_app = Celery('celery_tasks')
# 加載celery配置
celery_app.config_from_object('celery_tasks.config')
# 自動註冊celery任務
celery_app.autodiscover_tasks(['celery_tasks.sms'])

 

2.定義任務:celery_tasks.sms.tasks.py
tasks

from apps.verifications import constants
from celery_tasks.main import celery_app
from libs.yuntongxun.sms import CCP
import logging
logger = logging.getLogger('django')

# bind:保證task對象會做爲第一個參數自動傳入
# name:異步任務別名
# retry_backoff:異常自動重試的時間間隔 第n次(retry_backoff×2^(n-1))s
# max_retries:異常自動重試次數的上限
@celery_app.task(bind=True, name='send_sms_code', retry_backoff=3)
def send_sms_code(self, mobile, sms_code):
"""
發送短信異步任務
:param mobile: 手機號
:param sms_code: 短信驗證碼
:return: 成功0 或 失敗-1
"""
try:
send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)
except Exception as e:
logger.error(e)
# 有異常自動重試三次
raise self.retry(exc=e, max_retries=3)
if send_ret != 0:
# 有異常自動重試三次
raise self.retry(exc=Exception('發送短信失敗'), max_retries=3)

return send_ret

 

4. 啓動Celery服務

$ cd ~/xxx_project/xxx
$ celery -A celery_tasks.main worker -l info

 

-A指對應的應用程序, 其參數是項目中 Celery實例的位置。
worker指這裏要啓動的worker。
-l指日誌等級,好比info等級。

 

 

5. 調用發送短信任務

# 發送短信驗證碼
# CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)
# Celery異步發送短信驗證碼

send_sms_code.delay(mobile, sms_code)

 

6. 補充celery worker的工做模式
默認是進程池方式,進程數以當前機器的CPU核數爲參考,每一個CPU開四個進程。
如何本身指定進程數:

celery worker -A proj --concurrency=4
如何改變進程池方式爲協程方式:

celery worker -A proj --concurrency=1000 -P eventlet -c 1000

# 安裝eventlet模塊
$ pip install eventlet

# 啓用 Eventlet 池
$ celery -A celery_tasks.main worker -l info -P eventlet -c 1000
相關文章
相關標籤/搜索