Celery-分佈式任務隊列

一.Celery介紹和基本使用
1.什麼是Celery:
是一個基於python開發的分佈式異步消息任務隊列,經過它能夠輕鬆的實現任務的異步處理,若是你的業務場景中須要用到異步任務,就能夠考慮使用celery,Celery在執行任務時須要經過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 通常使用rabbitMQ or Redis
2.Celery有如下優勢:
(1)簡單:一單熟悉了celery的工做流程後,配置和使用仍是比較簡單的
(2)高可用:當任務執行失敗或執行過程當中發生鏈接中斷,celery 會自動嘗試從新執行任務
(3)快速:一個單進程的celery每分鐘可處理上百萬個任務
(4)靈活: 幾乎celery的各個組件均可以被擴展及自定製
3.Celery基本工做流程
(1)請求把任務給---celerv組件,他把任務給放到rabbitmq(broker中間商)同時生成一個任務id
(2)worker(執行任務的節點)從rabbitmq(broker中間商)裏取數據把執行好的任務放到rabbitmq(broker中間商)裏面
(3)請求再經過celerv組件去rabbitmq(broker中間商)裏取結果
4.Celery的基本使用
(1)建立一個celery application用來定義你的任務列表前端

#!/bin/env python3
#_*_coding:utf-8_*_
from celery import Celery

app = Celery('tasks',                                      #給app取得名字
             broker='redis://:123456@192.168.1.232',   #鏈接rabbitmq(broker中間商)
             backend='redis://:123456@192.168.1.232'   #把結果寫到redis裏
             )

#加裝飾器表明這是worker能夠執行的一個任務
@app.task
#定義加法任務
def add(x,y):
    print("running...",x,y)
    return x+y

(2)啓動Celery Worker來開始監聽並執行任務
命令:celery -A celery_test worker -l debug
(3)開一個終端,調用任務node

>>> from celery_test import add
>>> t = add.delay(18,18)

Celery Worker日誌打印:
[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] running...
[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] 18
[2019-06-17 12:34:09,503: WARNING/ForkPoolWorker-1] 18
(4)在執行python

>>> t.get()

返回:36
Celery Worker日誌打印:
[2019-06-17 12:34:09,509: INFO/ForkPoolWorker-1] Task celery_test.add[4b9e24ef-b148-4e81-a019-d9b28309b93d] succeeded in 0.007139017805457115s: 36
二.在項目中如何使用celery 
1.能夠把celery配置成一個應用目錄格式以下:redis

proj/__init__.py
proj/celery.py       #celery的配置
proj/tasks.py        #任務1
proj/tasks2.py       #任務2

(1)celery配置:django

from __future__ import absolute_import, unicode_literals  #from __future__ import absolute_import是從python包的絕對路徑裏取import咱們安裝的包而不是當前目錄
from celery import Celery                                 #默認import當前目錄的Celery,當前目錄沒有
 
app = Celery('proj',
             broker='redis://:123456@192.168.1.232', #連上rabbitMQ的參數
             backend='redis://:123456@192.168.1.232',#連上rabbitMQ的參數
             include=['proj.tasks','proj.tasks2'])   #定義proj目錄下的tasks腳本和proj目錄下的tasks2腳本
 
#能夠給app設置參數
app.conf.update(
    result_expires=3600,                             #全部任務結果一個小時以內結果沒被取走就沒了
)
 
if __name__ == '__main__':
    app.start()

(2)任務1:app

from __future__ import absolute_import, unicode_literals
from .celery import app                                   #導入當前目錄下celery.py裏的app,用裝飾器

#相加
@app.task
def add(x, y):
    return x + y

(3)任務2:異步

from __future__ import absolute_import, unicode_literals
from .celery import app                                   #導入當前目錄下celery.py裏的app,用裝飾器

#求和
@app.task
def xsum(numbers):
    return sum(numbers)

(3)退出上層目錄啓動:celery -A proj worker -l debug
(4)開一個終端,調用任務分佈式

>>> from proj import tasks2,tasks
>>> t2 = tasks2.xsum.delay([3,45,5,6,7,4,88,21,5])
>>> t1 = tasks.add.delay(2,5)
>>> t1.get()

返回:7函數

>>> t2.get()

返回:184
2.後臺啓動worker
(1)啓動1個w1任務:celery multi start w1 -A celery_django -l info
返回:
celery multi v4.3.0 (rhubarb)
> Starting nodes...
    > w1@localhost: OK
(2)啓動1個w2任務:celery multi start w2 -A celery_django -l info
返回:
celery multi v4.3.0 (rhubarb)
> Starting nodes...
    > w2@localhost: OK
(3)中止w2任務:celery multi stop w2 -A proj -l info
返回:
celery multi v4.3.0 (rhubarb)
> Stopping nodes...
    > w2@localhost: TERM -> 11221
三.Celery定時任務
celery支持定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模塊叫celery beat
1.經過計劃任務實現每隔10秒,30秒,一週執行任務
(1)celery配置:    url

from __future__ import absolute_import, unicode_literals  #from __future__ import absolute_import是從python包的絕對路徑裏取import咱們安裝的包而不是當前目錄
from celery import Celery                                 #默認import當前目錄的Celery,當前目錄沒有

app = Celery('proj',
             broker='redis://:123456@192.168.1.232', #連上rabbitMQ的參數
             backend='redis://:123456@192.168.1.232',#連上rabbitMQ的參數
             include=['proj.periodic_task'])         #定義proj目錄下的periodic_task腳本

#能夠給app設置參數
app.conf.update(
    result_expires=3600,                             #全部任務結果一個小時以內結果沒被取走就沒了
)

if __name__ == '__main__':
    app.start()

(2)定時任務:periodic_task.py

from __future__ import absolute_import, unicode_literals
from celery.schedules import crontab
from .celery import app

#只要腳本一啓動馬上執行這個函數,這個函數自動有兩個參數sender(添加任務), **kwargs
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    #每隔十秒鐘執行test這個函數,.s是給test這個函數傳的參數
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') #name='add every 10'任務名

    #每三十秒執行test這個函數,.s是給test這個函數傳的參數
    sender.add_periodic_task(30.0, test.s('xixi'), expires=10)           #expires=10任務結果保存10秒鐘

    #每週一早上七點半執行test這個函數,.s是給test這個函數傳的參數
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)    

(3)啓動worker:celery -A proj worker -l debug    
(4)啓動任務調度器:celery -A proj.periodic_task beat -l debug    
十秒返回:    
[2019-06-18 14:01:54,236: WARNING/ForkPoolWorker-3] hello
[2019-06-18 14:01:54,243: INFO/ForkPoolWorker-3] Task proj.periodic_task.test[89b349b8-3c19-4968-a09b-214a72164cee] succeeded in 0.007367603946477175s: None
[2019-06-18 14:02:14,192: INFO/MainProcess] Received task: proj.periodic_task.test[1803e491-5a41-4fe1-bfe2-18b8335ae7d5]    
三十秒返回:
[2019-06-18 14:03:14,192: WARNING/ForkPoolWorker-3] xixi
[2019-06-18 14:03:14,192: DEBUG/MainProcess] Task accepted: proj.periodic_task.test[8fe2e9f2-e07e-4f87-bd80-daa4d3c64605] pid:27865
[2019-06-18 14:03:14,194: INFO/ForkPoolWorker-3] Task proj.periodic_task.test[8fe2e9f2-e07e-4f87-bd80-daa4d3c64605] succeeded in 0.0014079520478844643s: None
[2019-06-18 14:03:34,196: INFO/MainProcess] Received task: proj.periodic_task.test[80b7fcfa-b1e6-4c11-8390-3e3a0cc68f4e]
2.經過計劃任務實現每五秒作一次相加操做
(1)celery配置:celery.py        

from __future__ import absolute_import, unicode_literals  #from __future__ import absolute_import是從python包的絕對路徑裏取import咱們安裝的包而不是當前目錄
from celery import Celery                                 #默認import當前目錄的Celery,當前目錄沒有
 
app = Celery('proj',
             broker='redis://:123456@192.168.1.232',       #連上rabbitMQ的參數
             backend='redis://:123456@192.168.1.232',      #連上rabbitMQ的參數
             include=['proj.tasks','proj.periodic_task'])  #定義proj目錄下的periodic_task腳本
 
#能夠給app設置參數
app.conf.update(
    result_expires=3600,                             #全部任務結果一個小時以內結果沒被取走就沒了
)
 
if __name__ == '__main__':
    app.start()    

(2)相加任務:tasks.py

from __future__ import absolute_import, unicode_literals
from .celery import app                                   #導入當前目錄下celery.py裏的app,用裝飾器

#相加
@app.task
def add(x, y):
    return x + y

(3)計劃任務:    

from __future__ import absolute_import, unicode_literals
from celery.schedules import crontab
from .celery import app

app.conf.beat_schedule = {
    '每隔五秒執行相加': {
        'task': 'proj.tasks.add',
        'schedule': 5.0,
        'args': (10, 10)
    },
}

app.conf.timezone = 'UTC'

@app.task
def test(arg):
    print(arg)    

(4)啓動worker:celery -A proj worker -l debug    
(5)啓動任務調度器:celery -A proj.periodic_task beat -l debug    
worker輸出:
[2019-06-18 14:29:02,377: DEBUG/MainProcess] Task accepted: proj.tasks.add[03c85e0e-973c-4607-882f-44bb1951cd7a] pid:28339
[2019-06-18 14:29:02,378: INFO/ForkPoolWorker-3] Task proj.tasks.add[03c85e0e-973c-4607-882f-44bb1951cd7a] succeeded in 0.0011271699331700802s: 20
任務調度器輸出:
[2019-06-18 14:28:22,392: INFO/MainProcess] Scheduler: Sending due task 每隔五秒執行相加 (proj.tasks.add)
[2019-06-18 14:28:22,415: DEBUG/MainProcess] proj.tasks.add sent. id->083a5d92-f6ff-47ef-8bb8-be800a7edfa8
四.Celery與django結合
django能夠輕鬆跟celery結合實現異步任務
(1)在項目文件夾裏建立celery_django\celery_django\celery.py
#celery配置

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery


os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'celery_django.settings')   #celery_django項目名字

app = Celery('celery_task')

app.config_from_object('django.conf:settings', namespace='CELERY')              #必須以CELERY大寫開頭

app.autodiscover_tasks()                                                             #能夠自動刷新

@app.task(bind=True)
def debug_task(self):                                                               #
    print('Request: {0!r}'.format(self.request))

(2)在項目文件夾celery_django\celery_django\__init__.py裏寫入

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ['celery_app']

(3)配置文件寫入:celery_django\celery_django\settings.py

#鏈接redis
CELERY_BROKER_URL = 'redis://:123456@192.168.1.232'
CELERY_RESULT_BACKEND = 'redis://:123456@192.168.1.232'

(4)總url寫入:celery_django\celery_django\urls.py

from django.conf.urls import url,include
from django.contrib import admin
urlpatterns = [
    url(r'^admin/', admin.site.urls),
    url(r'^celery/', include('celery_task.urls')),

(5)項目url寫入:celery_django\celery_task\urls.py

from django.conf.urls import url
from celery_task import views
urlpatterns = [
    url(r'^celery_test/', views.celery_test  ),        #獲取任務id
    url(r'^celery_res/', views.celery_res  ),          #經過任務id獲取到執行結果

]

(6)建立一個celery application用來定義你的任務列表:celery_django\celery_task\tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task                             #

import time
@shared_task
def add(x, y):
    print("running task add",x,y )
    time.sleep(10)
    return x + y

(7)進入項目目錄裏啓動Celery Worker來開始監聽並執行任務
命令:celery -A celery_django worker -l debug
(8)調用任務:celery_django\celery_task\views.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task                             #

import time
@shared_task
def add(x, y):
    print("running task add",x,y )
    time.sleep(10)
    return x + y

(9)執行任務獲取id和經過id取得執行的結果函數
/celery/celery_django/celery_task/views.py

from django.shortcuts import HttpResponse
from celery_task.tasks import add
from celery.result import AsyncResult

#執行任務獲取id號
def celery_test(request):
    task = add.delay(4,22)
    return HttpResponse(task.id)  #返回id號


#經過id好獲取執行結果
def celery_res(request):
    #獲取id號
    task_id = '2edbcc88-12ad-4709-b770-d38179c67ef5'
    res = AsyncResult(id=task_id)                    #經過id號獲取任務結果
    return HttpResponse(res.get())                   #把結果返回給前端

訪問:http://192.168.1.232:8080/celery/celery_test/
獲得任務id:2edbcc88-12ad-4709-b770-d38179c67ef5
訪問:http://192.168.1.232:8080/celery/celery_res/
經過任務id獲得結果:26
五.在django中使用計劃任務功能 
settings.py配置文件添加

INSTALLED_APPS = [
    "django_celery_beat",
]

建立表存定時任務:python3 manage.py migrate

相關文章
相關標籤/搜索