Celery配置實踐筆記

說點什麼:

整理下工做中配置celery的一些實踐,寫在這裏,一方面是備忘,另一方面是整理成文檔給其餘同事使用。html

演示用的項目,同時也發佈在Github上: https://github.com/blackmatrix7/celery-demopython

這份筆記會隨着經驗的積累,逐步調整完善,不過一般狀況下,Github上的更新會比較快。git


Celery 配置實踐筆記

Celery 配置實踐筆記,目前已記錄:github

  • 異步執行任務
  • 爲不一樣任務分配不一樣的隊列
  • 計劃任務

須要補充:redis

  • 爲不一樣的任務配置不一樣的優先級
  • Celey任務的返回結果

建立Celery

配置Celery參數

在建立celery實例以前,須要對celery的參數進行一些配置。後端

在這裏列出一些比較經常使用的Celery配置項:併發

配置項名稱 說明
CELERY_DEFAULT_QUEUE 默認的隊列名稱,當沒有爲task特別指定隊列時,採用此隊列
CELERY_BROKER_URL 消息代理,用於發佈者傳遞消息給消費者,推薦RabbitMQ
CELERY_RESULT_BACKEND 後端,用於存儲任務執行結果,推薦redis
CELERY_TASK_SERIALIZER 任務的序列化方式
CELERY_RESULT_SERIALIZER 任務執行結果的序列化方式
CELERY_ACCEPT_CONTENT
CELERYD_CONCURRENCY 任務消費者的併發數
CELERY_TIMEZONE 時區設置,計劃任務須要,推薦 Asia/Shanghai
CELERY_QUEUES Celery隊列設定
CELERY_ROUTES Celery路由設定,用於給不一樣的任務指派不一樣的隊列
CELERYBEAT_SCHEDULE Celery計劃任務設定

爲了使用方便,在這裏引入了以前寫的config模塊(支持本地配置不提交到github),自己是非必須的,用dict存儲celery配置同樣能夠。因此在下面的示例代碼中,會以註釋的形式,演示不使用config模塊實現celery配置。異步

# 等價於直接定義一個broker,如
# broker = 'amqp://user:password@127.0.0.1:5672//'
broker = current_config.CELERY_BROKER_URL
# 將config模塊的配置轉換成dict,等價於直接經過dict定義一組celery配置
celery_conf = {k: v for k, v in current_config.items()}

建立celery實例

將以前建立的broker和celery_conf傳遞給celery,用於建立celery實例。async

manage.pyide

# 建立celery實例
celery = Celery('demo',  broker=broker)
# 載入celery配置
celery.config_from_object(celery_conf)

定義模擬函數

在項目的handlers下,定義一些用來異步任務或計劃任務的調用的函數。

這裏定義了兩個py文件,分別是async_tasks.pyschedules.py,前者用於異步任務,後者用於計劃任務。

異步任務下有兩個函數,模擬發送郵件和推送消息;計劃任務下有數個根據計劃任務執行事件定義的模擬函數。

建立celery worker

推薦以celery.start的方式來啓動celery

啓動參數

在建立完celery實例後,調用celery實例的start方法,來啓動celery。

在demo中,在manage.py中建立celery實例。

celery = Celery('demo',  broker=broker)
celery.config_from_object(celery_conf)
# argv中傳入啓動celery所需的參數
celery.start(argv=['celery', 'worker', '-l', 'info', '-f', 'logs/celery.log'])

第一個參數是固定的,用於啓動celery

第二個參數是啓動的celery組件,這裏啓動的是worker,用於執行任務

第三個參數和第四個參數爲一組,指定日誌的級別,這裏記錄級別爲info的日子

第五個參數和第六個參數爲一組,指定日誌文件的位置,這裏將日誌記錄在log/celery.log

指定消費的隊列

在啓動命令中,增長一個-Q的參數,用於指定消費的隊列。

celery.start(argv=['celery', 'worker', '-Q', 'message_queue', '-l', 'info', '-f', 'logs/message.log'])

上述的參數中,'-Q', 'message_queue'兩個參數,是指定這個worker消費名爲「message_queue」的隊列。

建立異步任務

對於須要在celery中異步執行的函數,只須要在函數上增長一個裝飾器。

# 導入manage.py中建立的celery實例
from manage import celery

# 須要在celery中異步執行的函數,增長celery.task裝飾器
@celery.task
def async_send_email(send_from, send_to, subject, content):
    """
    忽略方法實現
    """
    pass

異步執行任務

對於以celery.task裝飾的函數,能夠同步運行,也能夠異步運行。

接上一個例子的async_send_email函數。

同步執行時,直接調用這個函數便可。

async_push_message(send_to=['張三', '李四'], content='老闆喊你來背鍋了')
async_push_message(send_to=['劉五', '趙六'], content='老闆喊你來領獎金了')

須要異步執行時,調用函數的delay()方法執行,此時會將任務委託給celery後臺的worker執行。

async_push_message.delay(send_to=['張三', '李四'], content='老闆喊你來背鍋了')
async_push_message.delay(send_to=['劉五', '趙六'], content='老闆喊你來領獎金了')

爲任務指定不一樣隊列

其實在以前配置celery參數時,就已經實現爲不一樣任務指定不一樣隊列的目標。

在這裏詳細說明下配置項:

# 導入Queue
from kombu import Queue
# 導入Task所在的模塊,全部使用celery.task裝飾器裝飾過的函數,所須要把所在的模塊導入
# 咱們以前建立的幾個測試用函數,都在handlers.async_tasks和handlers.schedules中
# 因此在這裏須要導入這兩個模塊,以str表示模塊的位置,模塊組成tuple後賦值給CELERY_IMPORTS
# 這樣Celery在啓動時,會自動找到這些模塊,並導入模塊內的task
CELERY_IMPORTS = ('handlers.async_tasks', 'handlers.schedules')
# 爲Celery設定多個隊列,CELERY_QUEUES是個tuple,每一個tuple的元素都是由一個Queue的實例組成
# 建立Queue的實例時,傳入name和routing_key,name即隊列名稱
CELERY_QUEUES = (
    Queue(name='email_queue', routing_key='email_router'),
    Queue(name='message_queue', routing_key='message_router'),
    Queue(name='schedules_queue', routing_key='schedules_router'),
)
# 最後,爲不一樣的task指派不一樣的隊列
# 將全部的task組成dict,key爲task的名稱,即task所在的模塊,及函數名
# 如async_send_email所在的模塊爲handlers.async_tasks
# 那麼task名稱就是handlers.async_tasks.async_send_email
# 每一個task的value值也是爲dict,設定須要指派的隊列name,及對應的routing_key
# 這裏的name和routing_key須要和CELERY_QUEUES設定的徹底一致
CELERY_ROUTES = {
    'handlers.async_tasks.async_send_email': {
        'queue': 'email_queue',
        'routing_key': 'vcan.email_router',
    },
    'handlers.async_tasks.async_push_message': {
        'queue': 'message_queue',
        'routing_key': 'message_router',
    },
    'handlers.schedules.every_30_seconds': {
        'queue': 'schedules_queue',
        'routing_key': 'schedules_router',
    }
}

配置完成後,不一樣的task會根據CELERY_ROUTES的設置,指派到不一樣的消息隊列。

計劃任務

celery除用於異步執行任務外,還能夠用於執行計劃任務。

設定celery時區

在開始配置計劃任務以前對配置文件中設定celery時區。

# 設定 Celery 時區
CELERY_TIMEZONE = 'Asia/Shanghai'

定義計劃任務

在配置文件中定義計劃任務

計劃任務也是異步執行任務的一種方式,因此也須要參考以前的說明,爲計劃任務分配不一樣的隊列。

在配置文件中,新增一項CELERYBEAT_SCHEDULE的配置

from datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    # 給計劃任務取一個獨一無二的名字吧
    'every-30-seconds': {
        # task就是須要執行計劃任務的函數
         'task': 'handlers.schedules.every_30_seconds',
         # 配置計劃任務的執行時間,這裏是每30秒執行一次
         'schedule': timedelta(seconds=30),
         # 傳入給計劃任務函數的參數
         'args': {'value': '2333333'}
    }
}

CELERYBEAT_SCHEDULE是一個dict,每一個key爲計劃任務的名稱,value也是dict,包含task、schedule、args。

task即須要執行計劃任務的函數,這裏是handlers.schedules.every_30_seconds,即handlers的schedules的every_30_seconds函數,如需配置其餘的函數,依照此規則定義便可。

args是傳遞給計劃任務函數的參數,在這個例子中,即傳遞給every_30_seconds的參數,若是無需參數,則args配置爲None。

schedule即配置計劃任務的執行時間,例子中使用的是timedelta實例,用於實現固定間隔某些時間執行。除此以外,還能夠設定某個時間點執行,或重複某個時間點執行。這個就須要用到celery的crontab類。

'push_occupancy_rates': {
            'task': 'handlers.schedules.test_func_b',
            # 天天中午12點執行
            'schedule': crontab(hour='12', minute='0'),
            'args': None
        },

關於crontab更詳細的配置方式,能夠參考官方手冊

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour='*/3') Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0, hour='0,3,6,9,12,15,18,21') Same as previous.
crontab(minute='*/15') Execute every 15 minutes.
crontab(day_of_week='sunday') Execute every minute (!) at Sundays.
crontab(minute='', hour='', day_of_week='sun') Same as previous.
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour='/2,/3') Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour='*/5') Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of 「15」, which is divisible by 5).
crontab(minute=0,hour='*/3,8-17') Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month='2') Execute on the second day of every month.
crontab(0, 0, day_of_month='2-30/3') Execute on every even numbered day.
crontab(0, 0, day_of_month='1-7,15-21') Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month='11', month_of_year='5') Execute on the eleventh of May every year.
crontab(0, 0, month_of_year='*/3') Execute on the first month of every quarter.
相關文章
相關標籤/搜索