整理下工做中配置celery的一些實踐,寫在這裏,一方面是備忘,另一方面是整理成文檔給其餘同事使用。html
演示用的項目,同時也發佈在Github上: https://github.com/blackmatrix7/celery-demopython
這份筆記會隨着經驗的積累,逐步調整完善,不過一般狀況下,Github上的更新會比較快。git
Celery 配置實踐筆記,目前已記錄:github
須要補充:redis
在建立celery實例以前,須要對celery的參數進行一些配置。後端
在這裏列出一些比較經常使用的Celery配置項:併發
配置項名稱 | 說明 |
---|---|
CELERY_DEFAULT_QUEUE | 默認的隊列名稱,當沒有爲task特別指定隊列時,採用此隊列 |
CELERY_BROKER_URL | 消息代理,用於發佈者傳遞消息給消費者,推薦RabbitMQ |
CELERY_RESULT_BACKEND | 後端,用於存儲任務執行結果,推薦redis |
CELERY_TASK_SERIALIZER | 任務的序列化方式 |
CELERY_RESULT_SERIALIZER | 任務執行結果的序列化方式 |
CELERY_ACCEPT_CONTENT | |
CELERYD_CONCURRENCY | 任務消費者的併發數 |
CELERY_TIMEZONE | 時區設置,計劃任務須要,推薦 Asia/Shanghai |
CELERY_QUEUES | Celery隊列設定 |
CELERY_ROUTES | Celery路由設定,用於給不一樣的任務指派不一樣的隊列 |
CELERYBEAT_SCHEDULE | Celery計劃任務設定 |
爲了使用方便,在這裏引入了以前寫的config模塊(支持本地配置不提交到github),自己是非必須的,用dict存儲celery配置同樣能夠。因此在下面的示例代碼中,會以註釋的形式,演示不使用config模塊實現celery配置。異步
# 等價於直接定義一個broker,如 # broker = 'amqp://user:password@127.0.0.1:5672//' broker = current_config.CELERY_BROKER_URL # 將config模塊的配置轉換成dict,等價於直接經過dict定義一組celery配置 celery_conf = {k: v for k, v in current_config.items()}
將以前建立的broker和celery_conf傳遞給celery,用於建立celery實例。async
manage.pyide
# 建立celery實例 celery = Celery('demo', broker=broker) # 載入celery配置 celery.config_from_object(celery_conf)
在項目的handlers下,定義一些用來異步任務或計劃任務的調用的函數。
這裏定義了兩個py文件,分別是async_tasks.py和schedules.py,前者用於異步任務,後者用於計劃任務。
異步任務下有兩個函數,模擬發送郵件和推送消息;計劃任務下有數個根據計劃任務執行事件定義的模擬函數。
推薦以celery.start的方式來啓動celery
在建立完celery實例後,調用celery實例的start方法,來啓動celery。
在demo中,在manage.py中建立celery實例。
celery = Celery('demo', broker=broker) celery.config_from_object(celery_conf) # argv中傳入啓動celery所需的參數 celery.start(argv=['celery', 'worker', '-l', 'info', '-f', 'logs/celery.log'])
第一個參數是固定的,用於啓動celery
第二個參數是啓動的celery組件,這裏啓動的是worker,用於執行任務
第三個參數和第四個參數爲一組,指定日誌的級別,這裏記錄級別爲info的日子
第五個參數和第六個參數爲一組,指定日誌文件的位置,這裏將日誌記錄在log/celery.log
在啓動命令中,增長一個-Q的參數,用於指定消費的隊列。
celery.start(argv=['celery', 'worker', '-Q', 'message_queue', '-l', 'info', '-f', 'logs/message.log'])
上述的參數中,'-Q', 'message_queue'兩個參數,是指定這個worker消費名爲「message_queue」的隊列。
對於須要在celery中異步執行的函數,只須要在函數上增長一個裝飾器。
# 導入manage.py中建立的celery實例 from manage import celery # 須要在celery中異步執行的函數,增長celery.task裝飾器 @celery.task def async_send_email(send_from, send_to, subject, content): """ 忽略方法實現 """ pass
對於以celery.task裝飾的函數,能夠同步運行,也能夠異步運行。
接上一個例子的async_send_email函數。
同步執行時,直接調用這個函數便可。
async_push_message(send_to=['張三', '李四'], content='老闆喊你來背鍋了') async_push_message(send_to=['劉五', '趙六'], content='老闆喊你來領獎金了')
須要異步執行時,調用函數的delay()方法執行,此時會將任務委託給celery後臺的worker執行。
async_push_message.delay(send_to=['張三', '李四'], content='老闆喊你來背鍋了') async_push_message.delay(send_to=['劉五', '趙六'], content='老闆喊你來領獎金了')
其實在以前配置celery參數時,就已經實現爲不一樣任務指定不一樣隊列的目標。
在這裏詳細說明下配置項:
# 導入Queue from kombu import Queue # 導入Task所在的模塊,全部使用celery.task裝飾器裝飾過的函數,所須要把所在的模塊導入 # 咱們以前建立的幾個測試用函數,都在handlers.async_tasks和handlers.schedules中 # 因此在這裏須要導入這兩個模塊,以str表示模塊的位置,模塊組成tuple後賦值給CELERY_IMPORTS # 這樣Celery在啓動時,會自動找到這些模塊,並導入模塊內的task CELERY_IMPORTS = ('handlers.async_tasks', 'handlers.schedules') # 爲Celery設定多個隊列,CELERY_QUEUES是個tuple,每一個tuple的元素都是由一個Queue的實例組成 # 建立Queue的實例時,傳入name和routing_key,name即隊列名稱 CELERY_QUEUES = ( Queue(name='email_queue', routing_key='email_router'), Queue(name='message_queue', routing_key='message_router'), Queue(name='schedules_queue', routing_key='schedules_router'), ) # 最後,爲不一樣的task指派不一樣的隊列 # 將全部的task組成dict,key爲task的名稱,即task所在的模塊,及函數名 # 如async_send_email所在的模塊爲handlers.async_tasks # 那麼task名稱就是handlers.async_tasks.async_send_email # 每一個task的value值也是爲dict,設定須要指派的隊列name,及對應的routing_key # 這裏的name和routing_key須要和CELERY_QUEUES設定的徹底一致 CELERY_ROUTES = { 'handlers.async_tasks.async_send_email': { 'queue': 'email_queue', 'routing_key': 'vcan.email_router', }, 'handlers.async_tasks.async_push_message': { 'queue': 'message_queue', 'routing_key': 'message_router', }, 'handlers.schedules.every_30_seconds': { 'queue': 'schedules_queue', 'routing_key': 'schedules_router', } }
配置完成後,不一樣的task會根據CELERY_ROUTES的設置,指派到不一樣的消息隊列。
celery除用於異步執行任務外,還能夠用於執行計劃任務。
在開始配置計劃任務以前對配置文件中設定celery時區。
# 設定 Celery 時區 CELERY_TIMEZONE = 'Asia/Shanghai'
計劃任務也是異步執行任務的一種方式,因此也須要參考以前的說明,爲計劃任務分配不一樣的隊列。
在配置文件中,新增一項CELERYBEAT_SCHEDULE的配置
from datetime import timedelta from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # 給計劃任務取一個獨一無二的名字吧 'every-30-seconds': { # task就是須要執行計劃任務的函數 'task': 'handlers.schedules.every_30_seconds', # 配置計劃任務的執行時間,這裏是每30秒執行一次 'schedule': timedelta(seconds=30), # 傳入給計劃任務函數的參數 'args': {'value': '2333333'} } }
CELERYBEAT_SCHEDULE是一個dict,每一個key爲計劃任務的名稱,value也是dict,包含task、schedule、args。
task即須要執行計劃任務的函數,這裏是handlers.schedules.every_30_seconds
,即handlers的schedules的every_30_seconds函數,如需配置其餘的函數,依照此規則定義便可。
args是傳遞給計劃任務函數的參數,在這個例子中,即傳遞給every_30_seconds的參數,若是無需參數,則args配置爲None。
schedule即配置計劃任務的執行時間,例子中使用的是timedelta實例,用於實現固定間隔某些時間執行。除此以外,還能夠設定某個時間點執行,或重複某個時間點執行。這個就須要用到celery的crontab類。
'push_occupancy_rates': { 'task': 'handlers.schedules.test_func_b', # 天天中午12點執行 'schedule': crontab(hour='12', minute='0'), 'args': None },
關於crontab更詳細的配置方式,能夠參考官方手冊:
Example | Meaning |
---|---|
crontab() | Execute every minute. |
crontab(minute=0, hour=0) | Execute daily at midnight. |
crontab(minute=0, hour='*/3') | Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
crontab(minute=0, hour='0,3,6,9,12,15,18,21') | Same as previous. |
crontab(minute='*/15') | Execute every 15 minutes. |
crontab(day_of_week='sunday') | Execute every minute (!) at Sundays. |
crontab(minute='', hour='', day_of_week='sun') | Same as previous. |
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') | Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0,hour='/2,/3') | Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') | Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of 「15」, which is divisible by 5). |
crontab(minute=0,hour='*/3,8-17') | Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(0, 0,day_of_month='2') | Execute on the second day of every month. |
crontab(0, 0, day_of_month='2-30/3') | Execute on every even numbered day. |
crontab(0, 0, day_of_month='1-7,15-21') | Execute on the first and third weeks of the month. |
crontab(0, 0,day_of_month='11', month_of_year='5') | Execute on the eleventh of May every year. |
crontab(0, 0, month_of_year='*/3') | Execute on the first month of every quarter. |