1. 生產者消費者設計模式html
最經常使用的解耦方式之一,尋找中間人(broker)搭橋,保證兩個業務沒有直接關聯。
咱們稱這一解耦方式爲:生產者消費者設計模式redis
2.中間人broker數據庫
示例:此處演示Redis數據庫做爲中間人broker
Celery須要一種解決消息的發送和接受的方式,咱們把這種用來存儲消息的的中間裝置叫作message broker, 也可叫作消息中間人。
做爲中間人,咱們有幾種方案可選擇:django
1.RabbitMQ設計模式
RabbitMQ是一個功能完備,穩定的而且易於安裝的broker. 它是生產環境中最優的選擇。
使用RabbitMQ的細節參照如下連接:http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#broker-rabbitmq
若是使用的是Ubuntu或者Debian發行版的Linux,能夠直接經過命令安裝RabbitMQ:服務器
sudo apt-get install rabbitmq-server
安裝完畢以後,RabbitMQ-server服務器就已經在後臺運行。
若是用的並非Ubuntu或Debian, 能夠在如下網址:
http://www.rabbitmq.com/download.html
去查找本身所須要的版本軟件。app
2.Redis異步
Redis也是一款功能完備的broker可選項,可是其更可能因意外中斷或者電源故障致使數據丟失的狀況。
關因而由那個Redis做爲Broker,可訪下面網址:http://docs.celeryproject.org/en/latest/getting-started/brokers/redis.html#broker-redis分佈式
1. Celery介紹url
Celery介紹:
一個簡單、靈活且可靠、處理大量消息的分佈式系統,能夠在一臺或者多臺機器上運行。
單個 Celery 進程每分鐘可處理數以百萬計的任務。
經過消息進行通訊,使用消息隊列(broker)在客戶端和消費者之間進行協調。
安裝Celery:
$ pip install -U Celery
Celery官方文檔
2. 建立Celery實例並加載配置
1.定義Celery包
2.建立Celery實例
celery_tasks.main.py
# celery啓動文件 from celery import Celery # 爲celery使用django配置文件進行設置 import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xxx.settings") # 建立celery實例 celery_app = Celery('celery_tasks')
3.加載Celery配置
celery_tasks.config.py
broker_url = "redis://127.0.0.1/14" result_backend = "redis://127.0.0.1/15"
celery_tasks.main.py
# celery啓動文件 from celery import Celery # 建立celery實例 celery_app = Celery('xxx') # 加載celery配置 celery_app.config_from_object('celery_tasks.config')
3. 定義發送短信任務
1.註冊任務:celery_tasks.main.py
# celery啓動文件 from celery import Celery # 建立celery實例 celery_app = Celery('celery_tasks') # 加載celery配置 celery_app.config_from_object('celery_tasks.config') # 自動註冊celery任務 celery_app.autodiscover_tasks(['celery_tasks.sms'])
2.定義任務:celery_tasks.sms.tasks.py
tasks
from apps.verifications import constants from celery_tasks.main import celery_app from libs.yuntongxun.sms import CCP import logging logger = logging.getLogger('django') # bind:保證task對象會做爲第一個參數自動傳入 # name:異步任務別名 # retry_backoff:異常自動重試的時間間隔 第n次(retry_backoff×2^(n-1))s # max_retries:異常自動重試次數的上限 @celery_app.task(bind=True, name='send_sms_code', retry_backoff=3) def send_sms_code(self, mobile, sms_code): """ 發送短信異步任務 :param mobile: 手機號 :param sms_code: 短信驗證碼 :return: 成功0 或 失敗-1 """ try: send_ret = CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) except Exception as e: logger.error(e) # 有異常自動重試三次 raise self.retry(exc=e, max_retries=3) if send_ret != 0: # 有異常自動重試三次 raise self.retry(exc=Exception('發送短信失敗'), max_retries=3) return send_ret
4. 啓動Celery服務
$ cd ~/xxx_project/xxx
$ celery -A celery_tasks.main worker -l info
-A指對應的應用程序, 其參數是項目中 Celery實例的位置。
worker指這裏要啓動的worker。
-l指日誌等級,好比info等級。
5. 調用發送短信任務
# 發送短信驗證碼 # CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID) # Celery異步發送短信驗證碼 send_sms_code.delay(mobile, sms_code)
6. 補充celery worker的工做模式
默認是進程池方式,進程數以當前機器的CPU核數爲參考,每一個CPU開四個進程。
如何本身指定進程數:
celery worker -A proj --concurrency=4
如何改變進程池方式爲協程方式:
celery worker -A proj --concurrency=1000 -P eventlet -c 1000
# 安裝eventlet模塊 $ pip install eventlet # 啓用 Eventlet 池 $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000