celery的使用

介紹

Celery 是一個基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery。
異步任務介紹
在寫項目過程當中常常會遇到一些耗時的任務, 好比:發送郵件、發送短信等等~。這些操做若是都同步執行耗時長對用戶體驗不友好,在這種狀況下就能夠把任務放在後臺異步執行
celery就是用於處理異步任務的框架,celery能完成的功能遠不止異步任務,還有一個很經常使用的功能定時任務.python

架構
 
image.png
Celery包含以下組件:

Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,週期性地將配置中到期須要執行的任務發送給任務隊列。
Celery Worker:執行任務的消費者,一般會在多臺服務器運行多個消費者來提升執行效率。
Broker:消息代理,或者叫做消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(一般是消息隊列或者數據庫)。
Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
Result Backend:任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。redis

特色
  • 簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
  • 高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
  • 快速:一個單進程的celery每分鐘可處理上百萬個任務
  • 靈活: 幾乎celery的各個組件均可以被擴展及自定製
多任務執行方式

後臺多任務方式:
啓動:celery multi start w1 -A proj -l info
重啓:celery multi restart w1 -A proj -l info
中止:celery multi stop w1 -A proj -l info
任務執行完畢後才退出:celery multi stopwait w1 -A proj -l info數據庫

定時任務

celery支持定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模塊叫celery beatdjango

方式一:函數方式
from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg) 
方式二:配置文件形式
app.conf.beat_schedule = {
    'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC' 

任務添加好了,須要讓celery單獨啓動一個進程來定時發起這些任務, 注意, 這裏是發起任務,不是執行,這個進程只會不斷的去檢查你的任務計劃, 每發現有任務須要執行了,就發起一個任務調用消息,交給celery worker去執行
啓動任務調度器celery beat:celery -A periodic_task beat
此時還差一步,就是還須要啓動一個worker,負責執行celery beat發起的任務
啓動celery worker來執行任務:celery -A periodic_task workerbash

經常使用的定時任務配置
Example Meaning
crontab() 默認是每分鐘
crontab(minute=0, hour=0) 天天0點執行
crontab(minute=0, hour='*/3') 每隔3個小時執行
crontab(minute=0,hour='0,3,6') 0點、3點、6點執行
crontab(minute='*/15') 每隔15分鐘
crontab(day_of_week='sunday') 每逢週日的每一分鐘執行
crontab(minute='',hour='',day_of_week='sun') 與上一個意義一致
crontab(minute='*/10',hour='3,17,22',day_of_week='fri') 在週五3-4點、17-18點、22-23點之間每隔10分鐘執行
crontab(minute=0,hour='/2,/3') 能被2,3整除的小時執行
crontab(minute=0, hour='*/5') 能被5整除的小時執行
crontab(minute=0, hour='*/3,8-17') 能被3整除或者8-17之間的小時執行
crontab(0, 0,day_of_month='2') 每月的次日
crontab(0, 0,day_of_month='2-30/3') 偶數天執行
crontab(0, 0,day_of_month='1-7,15-21') 每月的第一週和第三週
crontab(0, 0,day_of_month='11',month_of_year='5') 每一年的5月11日執行
crontab(0, 0,month_of_year='*/3') 在每一個季度的第一個月執行

上面能知足你絕大多數定時任務需求了,甚至還能根據潮起潮落來配置定時任務服務器

最佳實踐之與django結合

django 能夠輕鬆跟celery結合實現異步任務,只需簡單配置便可
項目目錄架構

- proj /
   - proj / __init__ . py
   - proj / settings . py
   - proj / urls . py
- manage . py

proj/proj/celery.pyapp

from __future__ import absolute_import, unicode_literals import os from celery import Celery # 爲celery程序設置默認的Django設置模塊 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # 從配置文件中引入celery相關參數,每一個參數使用CELERY_開頭 app.config_from_object('django.conf:settings', namespace='CELERY') # 從每一個app中的tasks.py文件加載異步任務 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request)) 

proj/proj/__init__.py框架

from __future__ import absolute_import, unicode_literals # 確保celery文件必定被引用 from .celery import app as celery_app __all__ = ['celery_app'] 

proj/xxx/tasks.py異步

from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers) 

proj/xxx/views.py

from django.shortcuts import render,HttpResponse from bernard import tasks def task_test(request): res = tasks.add.delay(228,24) print("start running task") print("async task res",res.get() ) return HttpResponse('res %s'%res.get()) 
在django中使用計劃任務功能

安裝:pip install django-celery-beat
加載:在settings.py中INSTALLED_APPS加入django_celery_beat
生成相關數據庫:python manage.py migrate
開啓定時任務:celery -A proj beat -l info -S django
結果:

 
admin中的3張表

 
image.png

此時啓動你的celery beat 和worker,會發現每隔2分鐘,beat會發起一個任務消息讓worker執行scp_task任務
注意,經測試,每添加或修改一個任務,celery beat都須要重啓一次,要否則新的配置不會被celery beat進程讀到

 

celery flower

(1).查看任務歷史,任務具體參數,開始時間等信息。
(2).提供圖表和統計數據。
(3).實現全面的遠程控制功能, 包括但不限於 撤銷/終止任務, 關閉重啓 worker, 查看正在運行任務。
(4).提供一個 HTTP API , 方便集成。
終端執行:celery flower --broker=redis://localhost:6379/6

相關文章
相關標籤/搜索