關於celery的介紹

1.Celery是什麼

  • 1.1 Celery 是一個由 Python 編寫的簡單、靈活、可靠的用來處理大量信息的分佈式系統,它同時提供操做和維護分佈式系統所需的工具(它自己不是一個任務隊列, 它是 任務隊列管理的工具, 它提供的接口能夠幫助咱們實現分佈式任務隊列)。python

  • 1.2 Celery 專一於實時任務處理,支持任務調度(跟rabbitMQ可實現多種exchange。)redis

說白了,它是一個分佈式隊列的管理工具,咱們能夠用 Celery 提供的接口快速實現並管理一個分佈式的任務隊列。mongodb

  • 1.3 Celery 架構 shell

    註釋

    • 消息中間件(message broker)(郵箱, 郵局): 自己不提供消息服務,能夠和第三方消息中間件集成,經常使用的有 redis mongodb rabbitMQdjango

    • 任務執行單元(worker)(寄件人): 是Celery提供的任務執行單元, worker併發的運行在分佈式的系統節點中bash

    • 任務執行結果存儲(task result store)(收件人):用來存儲Worker執行的任務的結果,Celery支持以不一樣方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等服務器

  • 1.4 任務隊列和消息隊列架構

    • 任務隊列是一種在線或機器分發任務的機制併發

    • 消息隊列輸入是工做的一個單元, 能夠認爲是一個任務,獨立的職程(Worker)進程持續監視隊列中是否有須要處理的新任務。app

    • 圖解

      註釋


2.簡單示例

2.1 建立一個celery實例 建立tasks.py文件

import time
from celery import Celery

app = Celery('tasks', broker='redis:////127.0.0.1:6379/6', backend='redis:////127.0.0.1:6379/7')


@app.task
def add(x, y):
    time.sleep(10)
    return x + y
複製代碼

ps: tasks爲任務名稱 設置reids爲中間件

2.2 建立一個index.py文件調用而且檢測任務、查看任務執行狀態

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from tasks import add, app
from celery.result import AsyncResult
import time

# 當即告知celery去執行add任務,並傳入兩個參數
result = add.delay(4, 4)
print(result.id)
async = AsyncResult(id=result.id, app=app)

time.sleep(3)
if async.successful():
    result = async.get()
    print(result, "執行成功")
    # result.forget() # 將結果刪除
elif async.failed():
    print('執行失敗')
elif async.status == 'PENDING':
    print('任務等待中被執行')
elif async.status == 'RETRY':
    print('任務異常後正在重試')
elif async.status == 'STARTED':
    print('任務已經開始被執行')

複製代碼
  • ps 若是使用redis做爲任務隊列中間人,在redis中存在兩個鍵 celery 和 _kombu.binding.celery , _kombu.binding.celery 表示有一名爲 celery 的任務隊列(Celery 默認),而 celery爲默認隊列中的任務列表,使用list類型,能夠看看添加進去的任務數據。

2.3 執行命令詳解

  • celery -A app.celery_tasks.celery worker -Q queue --loglevel=info
    • A參數指定celery對象的位置,該app.celery_tasks.celery指的 是app包下面的celery_tasks.py模塊的celery實例,注意必定是初始化後的實例,

    • Q參數指的是該worker接收指定的隊列的任務,這是爲了當多個隊列有不一樣的任務時能夠獨立;若是不設會接收全部的隊列的任務;

    • l參數指定worker的日誌級別;

執行完畢後結果存儲在redis中,查看redis中的數據,發現存在一個string類型的鍵值對 celery-task-meta-064e4262-e1ba-4e87-b4a1-52dd1418188f:data 該鍵值對的失效時間爲24小時

2.4 消息主體分析

  • body : 是序列化後使用base64編碼的信息,包括具體的任務參數,其中包括了須要執行的方法、參數和一些任務基本信息
  • content-encoding: 序列化數據編碼方式
  • content-type: 任務數據的序列化方式,默認使用python內置的序列化模塊pickle(ps: pickle模塊支持的類型 全部python支持的原生類型:布爾值,整數,浮點數,複數,字符串,字節,None。 由任何原生類型組成的列表,元組,字典和集合。 函數,類,類的實例, 經常使用的方法:dumps,dump,loads,load)
{
    "body": "gAJ9cQAoWAQAAAB0YXNrcQFYCQAAAHRhc2tzLmFkZHECWAIAAABpZHEDWCQAAABjNDMwMzZkMi03Yzc3LTQ0MDUtOTYwNC1iZDc3ZTcyNzNlN2FxBFgEAAAAYXJnc3EFSwRLBIZxBlgGAAAAa3dhcmdzcQd9cQhYBwAAAHJldHJpZXNxCUsAWAMAAABldGFxCk5YBwAAAGV4cGlyZXNxC05YAwAAAHV0Y3EMiFgJAAAAY2FsbGJhY2tzcQ1OWAgAAABlcnJiYWNrc3EOTlgJAAAAdGltZWxpbWl0cQ9OToZxEFgHAAAAdGFza3NldHERTlgFAAAAY2hvcmRxEk51Lg==",
    "content-encoding": "binary",
    "content-type": "application/x-python-serialize",
    "headers": {},
    "properties": {
        "reply_to": "caa78c3a-618a-31f0-84a9-b79db708af02",
        "correlation_id": "c43036d2-7c77-4405-9604-bd77e7273e7a",
        "delivery_mode": 2,
        "delivery_info": {
            "priority": 0,
            "exchange": "celery",
            "routing_key": "celery"
        },
        "body_encoding": "base64",
        "delivery_tag": "e7e288b5-ecbb-4ec6-912c-f42eb92dbd72"
    }
}
複製代碼

2.5 Celery配置

CELERY_DEFAULT_QUEUE:默認隊列
BROKER_URL  : 代理人的網址
CELERY_RESULT_BACKEND:結果存儲地址
CELERY_TASK_SERIALIZER:任務序列化方式
CELERY_RESULT_SERIALIZER:任務執行結果序列化方式
CELERY_TASK_RESULT_EXPIRES:任務過時時間
CELERY_ACCEPT_CONTENT:指定任務接受的內容序列化類型(序列化),一個列表;
複製代碼

2.6 獲取執行任務執行結果的方法

r = func.delay(...)
r.ready()     			# 查看任務狀態,返回布爾值, 任務執行完成, 返回 True, 不然返回 False.
r.wait()      			# 等待任務完成, 返回任務執行結果,不多使用;
r.get(timeout=1)       # 獲取任務執行結果,能夠設置等待時間
r.result      			# 任務執行結果.
r.state       			# PENDING, START, SUCCESS,任務當前的狀態
r.status     				# PENDING, START, SUCCESS,任務當前的狀態
r.successful  			# 任務成功返回true
r.traceback 				# 若是任務拋出了一個異常,你也能夠獲取原始的回溯信息
複製代碼

2.7 celery的裝飾方法celery.task

  • task()把任務(函數)裝飾成異步
@celery.task()
def func():
	# do something
    pass
    
複製代碼
  • 能夠從新定義任務的基類
class MyTask(celery.Task):
    # 任務失敗時執行
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))
    # 任務成功時執行
    def on_success(self, retval, task_id, args, kwargs):
        pass
    # 任務重試時執行
    def on_retry(self, exc, task_id, args, kwargs, einfo):
        pass


複製代碼

參數

  • task_id : 任務id
  • einfo:執行失敗時任務詳情
  • exc: 失敗時的錯誤類型
  • retval: 任務成功時返回的執行結果

2.8 一份完整的配置文件

# 注意,celery4版本後,CELERY_BROKER_URL改成BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名'
# 指定結果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任務序列化方式
CELERY_TASK_SERIALIZER = 'msgpack' 
# 指定結果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任務過時時間,celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 60 * 20   
# 指定任務接受的序列化類型.
CELERY_ACCEPT_CONTENT = ["msgpack"]   
# 任務發送完成是否須要確認,這一項對性能有一點影響 
CELERY_ACKS_LATE = True  
# 壓縮方案選擇,能夠是zlib, bzip2,默認是發送沒有壓縮的數據
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 規定完成任務的時間
CELERYD_TASK_TIME_LIMIT = 5  # 在5s內完成任務,不然執行該任務的worker將被殺死,任務移交給父進程
# celery worker的併發數,默認是服務器的內核數目,也是命令行-c參數指定的數目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去rabbitmq預取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每一個worker執行了多少任務就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 設置默認的隊列名稱,若是一個消息不符合其餘的隊列就會放在默認隊列裏面,若是什麼都不設置的話,數據都會發送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default" 
# 設置詳細的隊列
CELERY_QUEUES = {
    "default": { # 這是上面指定的默認隊列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 設置扇形交換機
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
    
}
複製代碼

2.8 Celery定時任務

  • 指定定時任務並加入配置 從新啓動worker
# config.py
from datetime import timedelta
from celery.schedules import crontab
 
CELERYBEAT_SCHEDULE = {
    'ptask': {
        'task': 'tasks.period_task',
        'schedule': timedelta(seconds=5),
    },
}

# 添加定時任務
@app.task(bind=True)
def period_task(self):
    print 'period task done: {0}'.format(self.request.id)
 
複製代碼

PS:時間若是涉及到datatime最好設置爲UTC時間

  • 啓動定時任務進程
celery -A task beat

複製代碼

2.9 鏈式任務

鏈式任務就是異步或者定時執行的任務由多個子任務執行完成

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()
 
@app.task()
def fetch_page(url):
    return myhttplib.get(url)
 
@app.task()
def parse_page(page):
    return myparser.parse_document(page)
 
@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

 fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])
複製代碼

3 Celery, rabbitmq實現exchange三種模式

  • celery的工做流程

celery流程

  • borker : 消息中間件
  • worker : 任務執行單元
  • storgae: 任務執行結果存儲

3.1 exchange 經常使用模式介紹(direct, fanout, topic, header)

當咱們執行的任務須要根據特定的須要進行分類時,咱們能夠對任務建立多個隊列進行, 每個隊列交換方式能夠指定,須要注意的是:redis只能提供 direct exchange 方式, 也是默認指定的方式,因此咱們把中間人換成了rabbitmq。

首先咱們來了解一下交換模式有哪些?

  • Direct Exchange 模式

    direct

    這種模式是rabbitmq(redis)自帶的一種模式,因此咱們在實際使用過程當中只要指定routing_key就能夠了, 或者是指定隊列名稱便可。

    ps: 若是咱們指定的隊列名稱不在配置裏面,那咱們建立的這條消息任務會被自動廢除,因此須要檢查下配置裏的隊列是否正確,由於rabbitmq只具有存儲隊列的能力,不能存儲消息信息。

  • Fanout Exchange 模式

    fanout

    • fanout模式不用指定routing_key, 全部exchange_ type 是fanout的 Queue都會執行
    • 因此隊列和fanout能夠多對多綁定。
  • Topic Exchange 模式

    topic

    任何發送到Topic Exchange的消息都會被轉發到全部關心RouteKey中指定話題的Queue上

    • 這種模式較爲複雜,簡單來講,就是每一個隊列都有其關心的主題,全部的消息都帶有一個「標題」(RouteKey),Exchange會將消息轉發到全部關注主題能與RouteKey模糊匹配的隊列。

    • 這種模式須要RouteKey,也許要提早綁定Exchange與Queue。

    • 在進行綁定時,要提供一個該隊列關心的主題,如「#.log.#」表示該隊列關心全部涉及log的消息(一個RouteKey爲」MQ.log.error」的消息會被轉發到該隊列)。

    • 「#」表示0個或若干個關鍵字,「」表示一個關鍵字。如「log.」能與「log.warn」匹配,沒法與「log.warn.timeout」匹配;可是「log.#」能與上述二者匹配。

    • 一樣,若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息。

3.2 在實際例子中如何去運用這三種模式

  • 首先要安裝rabitmq而且啓動 rabbitmq-server

  • 建立rabbitmq_config.py 文件, 而且把以前在tasks.py中引用的配置修改成rabbitmq_config,代碼以下

#coding:utf-8
from celery.schedules import crontab
import sys
import os


sys.path.insert(0, os.getcwd())
CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/"

複製代碼
  • 建立須要的交換方式
default_exchange = Exchange('dedfault', type='direct')

# 定義一個媒體交換機,類型是直連交換機
media_exchange = Exchange('media', type='direct')

# 定義一個image交換機,類型是fanout交換機
image_exchange = Exchange('media', type='direct')

# 建立三個隊列,一個是默認隊列,一個是video、一個image
CELERY_QUEUES = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
# 定義默認隊列和默認的交換機routing_key
CELERY_DEFAULT_QUEUE = 'default'
CELERY_DEFAULT_EXCHANGE = 'default'
CELERY_DEFAULT_ROUTING_KEY = 'default'

複製代碼
  • 在tasks.py中指定任務
# 視頻壓縮
@app.task
def video_compress(video_name):
    time.sleep(10)
    print('Compressing the:', video_name)
    return 'success'


@app.task
def video_upload(video_name):
    time.sleep(5)
    print( u'正在上傳視頻')
    return 'success'


# 壓縮照片
@app.task
def image_compress(image_name):
    time.sleep(10)
    print('Compressing the:', image_name)
    return 'success'


# 其餘任務
@app.task
def other(str):
    time.sleep(10)
    print ('Do other things')
    return 'success'


複製代碼
  • 指定路由
CELERY_ROUTES = ({'tasks.image_compress': {
                        'queue': 'images',
                        'routing_key': 'media.image'
                 }},{'tasks.video_upload': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }},{'tasks.video_compress': {
                        'queue': 'videos',
                        'routing_key': 'media.video'
                 }}, )


複製代碼

如今執行建立的任務

在啓動worker的時候能夠分兩種啓動方式

第一種: 指定Queue

第二種 : 不指定(所有執行)

ps 爲了更好的看到咱們添加的隊列,還有相應的交換模式,啓動所有的隊列

啓動worker

[queue]中包含了建立的隊列,其餘參數本文前面能夠對照 [tasks]中顯示了咱們全部的任務

---- **** ----- 
--- * ***  * -- Darwin-18.2.0-x86_64-i386-64bit 2018-12-28 15:38:00
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x104e78d68
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> default          exchange=dedfault(direct) key=default
                .> images           exchange=media(direct) key=media.image
                .> others           exchange=other(fanout) key=other.others
                .> videos           exchange=media(direct) key=media.video

[tasks]
  . tasks.add
  . tasks.dr
  . tasks.image_compress
  . tasks.other
  . tasks.period_task
  . tasks.task
  . tasks.video_compress
  . tasks.video_upload

[2018-12-28 15:38:00,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

複製代碼

執行結果(能夠在rabbimq後臺管理中看相關執行結果):

[2018-12-28 15:38:00,906: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2018-12-28 15:38:00,933: INFO/MainProcess] mingle: searching for neighbors
[2018-12-28 15:38:02,013: INFO/MainProcess] mingle: all alone
[2018-12-28 15:38:02,091: INFO/MainProcess] celery@zhanlingjiedeMacBook-Pro.local ready.
[2018-12-28 15:38:42,386: INFO/MainProcess] Received task: tasks.add[1fdfbc23-e106-49ab-ac25-d46c2b5e8960]  
[2018-12-28 15:38:42,429: INFO/ForkPoolWorker-3] Task tasks.add[1fdfbc23-e106-49ab-ac25-d46c2b5e8960] succeeded in 0.040455893002217636s: 5
[2018-12-28 15:38:46,397: INFO/MainProcess] Received task: tasks.image_compress[cab797c5-eaae-4f11-b55c-041f4256ead9]  
[2018-12-28 15:38:46,410: INFO/MainProcess] Received task: tasks.other[0b00fd52-2251-42ef-9743-49df3f2906ed]  
[2018-12-28 15:38:56,401: WARNING/ForkPoolWorker-4] Compressing the:
[2018-12-28 15:38:56,402: WARNING/ForkPoolWorker-4] 這是我上傳的圖片
[2018-12-28 15:38:56,412: WARNING/ForkPoolWorker-3] Do other things
[2018-12-28 15:38:56,447: INFO/ForkPoolWorker-3] Task tasks.other[0b00fd52-2251-42ef-9743-49df3f2906ed] succeeded in 10.036200570997607s: 'success'
[2018-12-28 15:38:56,461: INFO/ForkPoolWorker-4] Task tasks.image_compress[cab797c5-eaae-4f11-b55c-041f4256ead9] succeeded in 10.061314186998061s: 'success'

複製代碼

補充

通常在使用celery爲了和實際場景結合會使用框架去使用 django + celery + redis(rabbitmq)

相關文章
相關標籤/搜索