Celery 是一個 基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理, 若是你的業務場景中須要用到異步任務,就能夠考慮使用celery.python
1.你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等着結果返回,而是給你返回 一個任務ID,你過一段時間只須要拿着這個任務id就能夠拿到任務執行結果, 在任務執行ing進行時,你能夠繼續作其它的事情。linux
2.你想作一個定時任務,好比天天檢測一下大家全部客戶的資料,若是發現今天 是客戶的生日,就給他發個短信祝福web
Celery 在執行任務時須要經過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 通常使用rabbitMQ or Redis 或者是數據庫來存放消息的中間結果redis
Celery優勢:數據庫
Celery缺點:django
1.目前只能在Linux系統上有較好的支持ubuntu
Celery工做流程圖:bash
1. Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,週期性地將配置中到期須要執行的任務發送給任務隊列。服務器
2. Celery Worker:執行任務的消費者,一般會在多臺服務器運行多個消費者來提升執行效率。架構
3. Broker:消息代理,或者叫做消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(一般是消息隊列或者數據庫)。
4. Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。
5. Result Backend:任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
Celery的架構圖如圖所示:
在傳統的web應用中,Django的web頁面經過url的映射到view,view再執行方法,若是方法須要調用大量的腳本,執行大量的任務,頁面就會阻塞,若是在項目中使用Celery隊列.首先用戶的任務會被celery放到broker中進行中轉,而後將任務分爲一個個的task來執行,因爲celery是異步機制,因此會直接給用戶返回task_id,頁面拿到task_id就能夠執行後續的操做,好比查看任務進度,暫停任務,而無需等待全部任務所有執行完畢,才能看到頁面。
1.在linux(ubuntu)系統上首先安裝Celery隊列
pip3 install Celery
2.在linux安裝redis
sudo apt-get install redis-server
3.在linux上安裝redis-celery中間件
pip3 install -U "celery[redis]"
4.啓動redis
sudo /etc/init.d/redis-server start
命名爲tasks.py
from celery import Celery app = Celery('tasks', broker='redis://localhost', backend='redis://localhost') @app.task def add(x,y): print("running...",x,y) return x+y
啓動監聽並開始執行該服務
celery -A tasks worker -l debug
在開啓一個終端進行測試任務
進入python環境
from tasks import add t = add.delay(3,3) #此時worker會生成一個任務和任務id t.get() #獲取任務執行的結果 t.get(propagate=False) #若是任務執行中出現異常,在client端不會異常退出 t.ready()#查看任務是否執行完畢 t.traceback #打印異常詳細信息
在當前的目錄下建立文件夾celery_pro
mkdir celery_pro
在此目錄下建立兩個文件
目錄結構:
celery_proj /__init__.py /celery.py /tasks.py
celery.py(定義了celery的一些元信息)
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='redis://localhost', #消息中間接收 backend='redis://localhost', #消息結果存放 include=['proj.tasks']) #執行任務的文件 # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
tasks.py (定義任務執行的具體邏輯和調用的具體方法)
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
啓動worker
celery -A celery_pro worker -l debug
再另外一個窗口打開python命令模式進行測試
from celery_pro import tasks t = tasks.add.delay(3,4) t.get()
Celery的分佈式:多啓動worker就能夠自動實現負載均衡,無需手動管理
Celery永駐後臺(開啓&重啓&關閉)
celery multi start w1 -A celery_pro -l info #開啓後臺celery任務 celery multi restart w1 -A proj -l info #重啓該服務 celery multi stop w1 -A proj -l info #關閉該服務
在celery_pro文件夾下建立periodic_tasks.py
目錄結構:
celery_proj /__init__.py /celery.py /tasks.py /periodic_tasks.py
文件內容以下:
from __future__ import absolute_import, unicode_literals from .celery import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=21, minute=42, day_of_week=5), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
修改celery.py,加入periodic_task.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='redis://localhost', backend='redis://localhost', include=['celery_pro.tasks','celery_pro.periodic_tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
在服務端啓動 celery -A celery_pro worker -l debug
在客戶端啓動 celery -A celery_pro.periodic_tasks beat -l debug
在服務端若是看到打印的hell ,world說明定時任務配置成功
上面是經過調用函數添加定時任務,也能夠像寫配置文件 同樣的形式添加, 下面是每30s執行的任務
在celery.py中添加
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'cerely_pro.tasks.add', #執行的具體方法 'schedule': 5.5, #每秒鐘執行 'args': (16, 16) #執行的具體動做的參數 }, } app.conf.timezone = 'UTC'
更多定製
上面的定時任務比較簡單,但若是你想要每週一三五的早上8點給你發郵件怎麼辦呢?用crontab功能,跟linux自帶的crontab功能是同樣的,能夠個性化定製任務執行時間
rom celery.schedules import crontab app.conf.beat_schedule = { #在每週一早上7:30執行 'add-every-monday-morning': { 'task': 'celery_pro.tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), },
還有更多定時配置方式以下:
Example | Meaning |
crontab() |
Execute every minute. |
crontab(minute=0, hour=0) |
Execute daily at midnight. |
crontab(minute=0, hour='*/3') |
Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm. |
|
Same as previous. |
crontab(minute='*/15') |
Execute every 15 minutes. |
crontab(day_of_week='sunday') |
Execute every minute (!) at Sundays. |
|
Same as previous. |
|
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays. |
crontab(minute=0,hour='*/2,*/3') |
Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') |
Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of 「15」, which is divisible by 5). |
crontab(minute=0, hour='*/3,8-17') |
Execute every hour divisible by 3, and every hour during office hours (8am-5pm). |
crontab(0, 0,day_of_month='2') |
Execute on the second day of every month. |
|
Execute on every even numbered day. |
|
Execute on the first and third weeks of the month. |
|
Execute on the eleventh of May every year. |
|
Execute on the first month of every quarter. |
1.在setting.py的文件同一級別建立celery.py
1 from __future__ import absolute_import, unicode_literals 2 import os 3 from celery import Celery 4 5 # 設置Django的環境變量 6 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'PerfectCRM.settings') 7 8 #設置app的默認處理方式,若是不設置默認是rabbitMQ 9 app = Celery('proj', 10 broker='redis://localhost', 11 backend='redis://localhost' 12 ) 13 14 #配置前綴 15 app.config_from_object('django.conf:settings', namespace='CELERY') 16 17 #自動掃描app下的tasks文件 18 app.autodiscover_tasks() 19 20 21 @app.task(bind=True) 22 def debug_task(self): 23 print('Request: {0!r}'.format(self.request))
2.修改當前目錄下的__init__文件
1 from __future__ import absolute_import, unicode_literals 2 3 #啓動時檢測celery文件 4 from .celery import app as celery_app 5 6 __all__ = ['celery_app']
3.在app下新增tasks文件,寫要執行的任務
from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
4.在另外一個app下新增tasks文件
1 from __future__ import absolute_import, unicode_literals 2 from celery import shared_task 3 import time,random 4 5 @shared_task 6 def randnum(start, end): 7 time.sleep(3) 8 return random.ranint(start,end)
5.在views下增長處理邏輯
1 from crm import tasks 2 from celery.result import AsyncResult 3 import random 4 #計算結果 5 def celery_call(request): 6 randnum =random.randint(0,1000) 7 t = tasks.add.delay(randnum,6) 8 print('randum',randnum) 9 return HttpResponse(t.id) 10 11 #獲取結果 12 def celery_result(request): 13 task_id = request.GET.get('id') 14 res = AsyncResult(id=task_id) 15 if res.ready(): 16 return HttpResponse(res.get()) 17 else: 18 return HttpResponse(res.ready())
6.測試
首先啓動Django,從web端輸入url調用celery_call方法
例:http://192.168.17.133:9000/crm/celery_call,此方法會返回一個task_id(41177118-3647-4830-b8c8-7be76d9819d7)
帶着這個task_id 訪問http://192.168.17.133:9000/crm/celery_result?id=41177118-3647-4830-b8c8-7be76d9819d7若是能夠看到結果說明配置成功
Dnango+Celery實現定時任務
pip3 install django-celery-beat
INSTALLED_APPS = (
.....,
'django_celery_beat', #新增的app
)
python manage.py migrate #建立與Django有關定時計劃任務的新表
celery -A PrefectCRM beat -l info -S django
python3 manager.py runserver 0.0.0.0:9000
並設置settings.py中的
ALLOW_HOSTS=['*']
後記:經測試,每添加或修改一個任務,celery beat都須要重啓一次,要否則新的配置不會被celery beat進程讀到