在windows環境利用celery實現簡單的任務隊列

測試使用環境:python

  一、Python==3.6.1linux

  二、MongoDB==3.6.2web

  三、celery==4.1.1redis

  四、eventlet==0.23.0sql

 

Celery分爲3個部分mongodb

    (1)worker部分負責任務的處理,即工做進程(個人理解工做進程就是你寫的python代碼,固然還包括python調用系統工具功能)數據庫

    (2)broker部分負責任務消息的分發以及任務結果的存儲,這部分任務主要由中間數據存儲系統完成,好比消息隊列服務器RabbitMQ、redis、django

Amazon SQS、MongoDB、IronMQ等或者關係型數據庫,使用關係型數據庫依賴sqlalchemy或者django的ORMflask

    (3)Celery主類,進行任務最開始的指派與執行控制,他能夠是單獨的python腳本,也能夠和其餘程序結合,應用到django或者flask等web框架裏面以及你能想到的任何應用windows

 

上代碼

  這裏將celery封裝成一個Python包,結構以下圖

 

 

 celery.py

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 
 4 """
 5 Celery主類
 6 啓動文件名必須爲celery.py!!!
 7 """
 8 
 9 from __future__ import absolute_import  # 爲兼容Python版本
10 from celery import Celery, platforms
11 
12 platforms.C_FORCE_ROOT = True  # linux環境下,用於開啓root也能夠啓動celery服務,默認是不容許root啓動celery的
13 app = Celery(
14     main='celery_tasks',  # celery啓動包名稱
15     # broker='redis://localhost',
16     # backend='redis://localhost',
17     include=['celery_tasks.tasks', ]  # celery全部任務
18 )
19 app.config_from_object('celery_tasks.config')  # celery使用文件配置
20 
21 if __name__ == '__main__':
22     app.start()

 

 config.py

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 from __future__ import absolute_import
 4 
 5 CELERY_TIMEZONE = 'Asia/Shanghai'
 6 # CELERY_RESULT_BACKEND='redis://localhost:6379/1'
 7 # BROKER_URL='redis://localhost:6379/2'
 8 BROKER_BACKEND = 'mongodb'  # mongodb做爲任務隊列(或者說是緩存)
 9 BROKER_URL = 'mongodb://localhost:27017/for_celery'  # 隊列地址
10 CELERY_RESULT_BACKEND = 'mongodb://localhost:27017/for_celery'  # 消息結果存儲地址
11 CELERY_MONGODB_BACKEND_SETTINGS = {  # 消息結果存儲配置
12     'host': 'localhost',
13     'port': 27017,
14     'database': 'for_celery',
15     # 'user':'root',
16     # 'password':'root1234',
17     'taskmeta_collection': 'task_meta',  # 任務結果的存放collection
18 }
19 CELERY_ROUTES = {  # 配置任務的前後順序
20     'celery_task.tasks.add': {'queue': 'for_add', 'router_key': 'for_add'},
21     'celery_task.tasks.subtract': {'queue': 'for_subtract', 'router_key': 'for_subtract'}
22 }

 

tasks.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
worker部分
"""

from __future__ import absolute_import
from celery import Celery, group
from .celery import app
from time import sleep


@app.task
def add(x, y):
    sleep(5)
    return x + y


@app.task
def substract(x, y):
    sleep(5)
    return x - y

  

 

 接下來演示,演示以前先把config中mongdb的用到的database和collection配置好,並啓動mongodb服務

 

首先啓動consumer

 

 注意啓動目錄爲celery_tasks同一級,啓動命令爲

celery -A celery_tasks worker --loglevel=info -P eventlet

參數解釋,命令中-A參數表示的是Celery APP的名稱celery_tasks,這個實例中指的就是tasks.py,後面的tasks就是APP的名稱,worker是一個執行任務角色,後面的loglevel=info記錄日誌類型默認是info,這個命令啓動了一個worker,用來執行程序中add這個加法任務(task),-P eventlet是防止在windows環境下出現

[2018-06-02 15:08:15,550: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)

Traceback (most recent call last):

File "d:\programmingsoftware\python35\lib\site-packages\billiard\pool.py", line 358, in workloop result = (True, prepare_result(fun(*args, **kwargs)))

File "d:\programmingsoftware\python35\lib\site-packages\celery\app\trace.py", line 525, in _fast_trace_task tasks, accept, hostname = _loc

ValueError: not enough values to unpack (expected 3, got 0)

若啓動成功,結果以下

 

 

 再啓動produce

在另外一個終端terminal,首先啓動Python

以下

再導入並調用任務,使用delay方法

以下

 

 調用以後,回到consumer終端,發現

 

 收到任務。

再到mongodb中查看任務結果

相關文章
相關標籤/搜索