一.Celery介紹和基本使用
1.什麼是Celery:
是一個基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理,若是你的業務場景中須要用到異步任務,就能夠考慮使用celery,Celery在執行任務時須要經過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 通常使用rabbitMQ or Redis
2.Celery有如下優勢:
(1)簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
(2)高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
(3)快速:一個單進程的celery每分鐘可處理上百萬個任務
(4)靈活: 幾乎celery的各個組件均可以被擴展及自定製
3.Celery基本工做流程
(1)請求把任務給---celerv組件,他把任務給放到rabbitmq(broker中間商)同時生成一個任務id
(2)worker(執行任務的節點)從rabbitmq(broker中間商)裏取數據把執行好的任務放到rabbitmq(broker中間商)裏面
(3)請求再經過celerv組件去rabbitmq(broker中間商)裏取結果
4.Celery的基本使用
(1)建立一個celery application用來定義你的任務列表前端
#!/bin/env python3 #_*_coding:utf-8_*_ from celery import Celery app = Celery('tasks', #給app取得名字 broker='redis://:123456@192.168.1.232', #鏈接rabbitmq(broker中間商) backend='redis://:123456@192.168.1.232' #把結果寫到redis裏 ) #加裝飾器表明這是worker能夠執行的一個任務 @app.task #定義加法任務 def add(x,y): print("running...",x,y) return x+y
(2)啓動Celery Worker來開始監聽並執行任務
命令:celery -A celery_test worker -l debug
(3)開一個終端,調用任務node
>>> from celery_test import add >>> t = add.delay(18,18)
Celery Worker日誌打印:
[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] running...
[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] 18
[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] 18
(4)在執行python
>>> t.get()
返回:36
Celery Worker日誌打印:
[2019-06-17 12:34:09,509: INFO/ForkPoolWorker-1] Task celery_test.add[4b9e24ef-b148-4e81-a019-d9b28309b93d] succeeded in 0.007139017805457115s: 36
二.在項目中如何使用celery
1.能夠把celery配置成一個應用目錄格式以下:redis
proj/__init__.py proj/celery.py #celery的配置 proj/tasks.py #任務1 proj/tasks2.py #任務2
(1)celery配置:django
from __future__ import absolute_import, unicode_literals #from __future__ import absolute_import是從python包的絕對路徑裏取import咱們安裝的包而不是當前目錄 from celery import Celery #默認import當前目錄的Celery,當前目錄沒有 app = Celery('proj', broker='redis://:123456@192.168.1.232', #連上rabbitMQ的參數 backend='redis://:123456@192.168.1.232',#連上rabbitMQ的參數 include=['proj.tasks','proj.tasks2']) #定義proj目錄下的tasks腳本和proj目錄下的tasks2腳本 #能夠給app設置參數 app.conf.update( result_expires=3600, #全部任務結果一個小時以內結果沒被取走就沒了 ) if __name__ == '__main__': app.start()
(2)任務1:app
from __future__ import absolute_import, unicode_literals from .celery import app #導入當前目錄下celery.py裏的app,用裝飾器 #相加 @app.task def add(x, y): return x + y
(3)任務2:異步
from __future__ import absolute_import, unicode_literals from .celery import app #導入當前目錄下celery.py裏的app,用裝飾器 #求和 @app.task def xsum(numbers): return sum(numbers)
(3)退出上層目錄啓動:celery -A proj worker -l debug
(4)開一個終端,調用任務分佈式
>>> from proj import tasks2,tasks >>> t2 = tasks2.xsum.delay([3,45,5,6,7,4,88,21,5]) >>> t1 = tasks.add.delay(2,5) >>> t1.get()
返回:7函數
>>> t2.get()
返回:184
2.後臺啓動worker
(1)啓動1個w1任務:celery multi start w1 -A celery_django -l info
返回:
celery multi v4.3.0 (rhubarb)
> Starting nodes...
> w1@localhost: OK
(2)啓動1個w2任務:celery multi start w2 -A celery_django -l info
返回:
celery multi v4.3.0 (rhubarb)
> Starting nodes...
> w2@localhost: OK
(3)中止w2任務:celery multi stop w2 -A proj -l info
返回:
celery multi v4.3.0 (rhubarb)
> Stopping nodes...
> w2@localhost: TERM -> 11221
三.Celery定時任務
celery支持定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模塊叫celery beat
1.經過計劃任務實現每隔10秒,30秒,一週執行任務
(1)celery配置: url
from __future__ import absolute_import, unicode_literals #from __future__ import absolute_import是從python包的絕對路徑裏取import咱們安裝的包而不是當前目錄 from celery import Celery #默認import當前目錄的Celery,當前目錄沒有 app = Celery('proj', broker='redis://:123456@192.168.1.232', #連上rabbitMQ的參數 backend='redis://:123456@192.168.1.232',#連上rabbitMQ的參數 include=['proj.periodic_task']) #定義proj目錄下的periodic_task腳本 #能夠給app設置參數 app.conf.update( result_expires=3600, #全部任務結果一個小時以內結果沒被取走就沒了 ) if __name__ == '__main__': app.start()
(2)定時任務:periodic_task.py
from __future__ import absolute_import, unicode_literals from celery.schedules import crontab from .celery import app #只要腳本一啓動馬上執行這個函數,這個函數自動有兩個參數sender(添加任務), **kwargs @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): #每隔十秒鐘執行test這個函數,.s是給test這個函數傳的參數 sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') #name='add every 10'任務名 #每三十秒執行test這個函數,.s是給test這個函數傳的參數 sender.add_periodic_task(30.0, test.s('xixi'), expires=10) #expires=10任務結果保存10秒鐘 #每週一早上七點半執行test這個函數,.s是給test這個函數傳的參數 sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
(3)啓動worker:celery -A proj worker -l debug
(4)啓動任務調度器:celery -A proj.periodic_task beat -l debug
十秒返回:
[2019-06-18 14:01:54,236: WARNING/ForkPoolWorker-3] hello
[2019-06-18 14:01:54,243: INFO/ForkPoolWorker-3] Task proj.periodic_task.test[89b349b8-3c19-4968-a09b-214a72164cee] succeeded in 0.007367603946477175s: None
[2019-06-18 14:02:14,192: INFO/MainProcess] Received task: proj.periodic_task.test[1803e491-5a41-4fe1-bfe2-18b8335ae7d5]
三十秒返回:
[2019-06-18 14:03:14,192: WARNING/ForkPoolWorker-3] xixi
[2019-06-18 14:03:14,192: DEBUG/MainProcess] Task accepted: proj.periodic_task.test[8fe2e9f2-e07e-4f87-bd80-daa4d3c64605] pid:27865
[2019-06-18 14:03:14,194: INFO/ForkPoolWorker-3] Task proj.periodic_task.test[8fe2e9f2-e07e-4f87-bd80-daa4d3c64605] succeeded in 0.0014079520478844643s: None
[2019-06-18 14:03:34,196: INFO/MainProcess] Received task: proj.periodic_task.test[80b7fcfa-b1e6-4c11-8390-3e3a0cc68f4e]
2.經過計劃任務實現每五秒作一次相加操做
(1)celery配置:celery.py
from __future__ import absolute_import, unicode_literals #from __future__ import absolute_import是從python包的絕對路徑裏取import咱們安裝的包而不是當前目錄 from celery import Celery #默認import當前目錄的Celery,當前目錄沒有 app = Celery('proj', broker='redis://:123456@192.168.1.232', #連上rabbitMQ的參數 backend='redis://:123456@192.168.1.232', #連上rabbitMQ的參數 include=['proj.tasks','proj.periodic_task']) #定義proj目錄下的periodic_task腳本 #能夠給app設置參數 app.conf.update( result_expires=3600, #全部任務結果一個小時以內結果沒被取走就沒了 ) if __name__ == '__main__': app.start()
(2)相加任務:tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app #導入當前目錄下celery.py裏的app,用裝飾器 #相加 @app.task def add(x, y): return x + y
(3)計劃任務:
from __future__ import absolute_import, unicode_literals from celery.schedules import crontab from .celery import app app.conf.beat_schedule = { '每隔五秒執行相加': { 'task': 'proj.tasks.add', 'schedule': 5.0, 'args': (10, 10) }, } app.conf.timezone = 'UTC' @app.task def test(arg): print(arg)
(4)啓動worker:celery -A proj worker -l debug
(5)啓動任務調度器:celery -A proj.periodic_task beat -l debug
worker輸出:
[2019-06-18 14:29:02,377: DEBUG/MainProcess] Task accepted: proj.tasks.add[03c85e0e-973c-4607-882f-44bb1951cd7a] pid:28339
[2019-06-18 14:29:02,378: INFO/ForkPoolWorker-3] Task proj.tasks.add[03c85e0e-973c-4607-882f-44bb1951cd7a] succeeded in 0.0011271699331700802s: 20
任務調度器輸出:
[2019-06-18 14:28:22,392: INFO/MainProcess] Scheduler: Sending due task 每隔五秒執行相加 (proj.tasks.add)
[2019-06-18 14:28:22,415: DEBUG/MainProcess] proj.tasks.add sent. id->083a5d92-f6ff-47ef-8bb8-be800a7edfa8
四.Celery與django結合
django能夠輕鬆跟celery結合實現異步任務
(1)在項目文件夾裏建立celery_django\celery_django\celery.py
#celery配置
from __future__ import absolute_import, unicode_literals import os from celery import Celery os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django.settings') #celery_django項目名字 app = Celery('celery_task') app.config_from_object('django.conf:settings', namespace='CELERY') #必須以CELERY大寫開頭 app.autodiscover_tasks() #能夠自動刷新 @app.task(bind=True) def debug_task(self): # print('Request: {0!r}'.format(self.request))
(2)在項目文件夾celery_django\celery_django\__init__.py裏寫入
from __future__ import absolute_import, unicode_literals from .celery import app as celery_app __all__ = ['celery_app']
(3)配置文件寫入:celery_django\celery_django\settings.py
#鏈接redis CELERY_BROKER_URL = 'redis://:123456@192.168.1.232' CELERY_RESULT_BACKEND = 'redis://:123456@192.168.1.232'
(4)總url寫入:celery_django\celery_django\urls.py
from django.conf.urls import url,include from django.contrib import admin urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^celery/', include('celery_task.urls')),
(5)項目url寫入:celery_django\celery_task\urls.py
from django.conf.urls import url from celery_task import views urlpatterns = [ url(r'^celery_test/', views.celery_test ), #獲取任務id url(r'^celery_res/', views.celery_res ), #經過任務id獲取到執行結果 ]
(6)建立一個celery application用來定義你的任務列表:celery_django\celery_task\tasks.py
from __future__ import absolute_import, unicode_literals from celery import shared_task # import time @shared_task def add(x, y): print("running task add",x,y ) time.sleep(10) return x + y
(7)進入項目目錄裏啓動Celery Worker來開始監聽並執行任務
命令:celery -A celery_django worker -l debug
(8)調用任務:celery_django\celery_task\views.py
from __future__ import absolute_import, unicode_literals from celery import shared_task # import time @shared_task def add(x, y): print("running task add",x,y ) time.sleep(10) return x + y
(9)執行任務獲取id和經過id取得執行的結果函數
/celery/celery_django/celery_task/views.py
from django.shortcuts import HttpResponse from celery_task.tasks import add from celery.result import AsyncResult #執行任務獲取id號 def celery_test(request): task = add.delay(4,22) return HttpResponse(task.id) #返回id號 #經過id好獲取執行結果 def celery_res(request): #獲取id號 task_id = '2edbcc88-12ad-4709-b770-d38179c67ef5' res = AsyncResult(id=task_id) #經過id號獲取任務結果 return HttpResponse(res.get()) #把結果返回給前端
訪問:http://192.168.1.232:8080/celery/celery_test/
獲得任務id:2edbcc88-12ad-4709-b770-d38179c67ef5
訪問:http://192.168.1.232:8080/celery/celery_res/
經過任務id獲得結果:26
五.在django中使用計劃任務功能
settings.py配置文件添加
INSTALLED_APPS = [ "django_celery_beat", ]
建立表存定時任務:python3 manage.py migrate