1、安裝

因爲celery4.0不支持window,若是在window上安裝celery4.0將會出現下面的錯誤
flask_cleryflask_cleryhtml

你如今只能安裝
pip install celery==3.1
python

2、安裝py for redis 模塊

pip install redisgit

3、安裝redis服務

網上不少文章都寫得模棱兩可,把人坑的不要不要的!!!

Redis對於Linux是官方支持的,可是不支持window,網上不少做者寫文章都不寫具體的系統環境,大多數直接說pip install redis就可使用redis了,真的是坑人的玩意,本人深受其毒害github

對於windows,若是你須要使用redis服務,那麼進入該地址下載
https://github.com/MSOpenTech/redis/releases

redis安裝包,雙擊完成就能夠了web

若是你在window上不安裝該redis包,將會提示

redis.exceptions.ConnectionError: Error 10061 connecting to localhost:6379.

或者

redis.exceptions.ConnectionError redis

須要注意是:安裝目錄不能安裝在C盤,不然會出現權限依賴錯誤flask

4、添加redis環境變量

D:\Program Files\Rediswindows

5、初始化redis

進入redis安裝目錄,打開cmd運行命令redis-server.exe redis.windows.conf,若是出錯服務器

  • 雙擊目錄下的redis-cli.exe
  • 在出現的窗口中輸入shutdown
  • 繼續輸入exit

6、lask 集成celyer

在Flask配置中添加配置

1
2
3
 # Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0' # broker是一個消息傳輸的中間件
CELERY_RESULT_BACKEND = 'redis://localhost:6379/1' # 任務執行器
  • 在flask工程的__init__目錄下生產celery實例
    注意!!如下代碼必須在 flask app讀取完配置文件後編寫,不然會報錯
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)
TaskBase = celery.Task

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask
return celery


celery = make_celery(app)

完整示例以下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
app = Flask(__name__)
app.config.from_object(config['default'])

def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
backend=app.config['CELERY_RESULT_BACKEND'])
celery.conf.update(app.config)
TaskBase = celery.Task

class ContextTask(TaskBase):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)

celery.Task = ContextTask
return celery


celery = make_celery(app)

一份比較經常使用的配置文件

# 注意,celery4版本後,CELERY_BROKER_URL改成BROKER_URL
BROKER_URL = 'amqp://username:passwd@host:port/虛擬主機名'
# 指定結果的接受地址
CELERY_RESULT_BACKEND = 'redis://username:passwd@host:port/db'
# 指定任務序列化方式
CELERY_TASK_SERIALIZER = 'msgpack' 
# 指定結果序列化方式
CELERY_RESULT_SERIALIZER = 'msgpack'
# 任務過時時間,celery任務執行結果的超時時間
CELERY_TASK_RESULT_EXPIRES = 60 * 20   
# 指定任務接受的序列化類型.
CELERY_ACCEPT_CONTENT = ["msgpack"]   
# 任務發送完成是否須要確認,這一項對性能有一點影響     
CELERY_ACKS_LATE = True  
# 壓縮方案選擇,能夠是zlib, bzip2,默認是發送沒有壓縮的數據
CELERY_MESSAGE_COMPRESSION = 'zlib' 
# 規定完成任務的時間
CELERYD_TASK_TIME_LIMIT = 5  # 在5s內完成任務,不然執行該任務的worker將被殺死,任務移交給父進程
# celery worker的併發數,默認是服務器的內核數目,也是命令行-c參數指定的數目
CELERYD_CONCURRENCY = 4 
# celery worker 每次去rabbitmq預取任務的數量
CELERYD_PREFETCH_MULTIPLIER = 4 
# 每一個worker執行了多少任務就會死掉,默認是無限的
CELERYD_MAX_TASKS_PER_CHILD = 40 
# 設置默認的隊列名稱,若是一個消息不符合其餘的隊列就會放在默認隊列裏面,若是什麼都不設置的話,數據都會發送到默認的隊列中
CELERY_DEFAULT_QUEUE = "default" 
# 設置詳細的隊列
CELERY_QUEUES = {
    "default": { # 這是上面指定的默認隊列
        "exchange": "default",
        "exchange_type": "direct",
        "routing_key": "default"
    },
    "topicqueue": { # 這是一個topic隊列 凡是topictest開頭的routing key都會被放到這個隊列
        "routing_key": "topic.#",
        "exchange": "topic_exchange",
        "exchange_type": "topic",
    },
    "task_eeg": { # 設置扇形交換機
        "exchange": "tasks",
        "exchange_type": "fanout",
        "binding_key": "tasks",
    },
    
}

 

在cmd中啓動celery服務

執行命令celery -A your_application.celery worker loglevel=info,your_application爲你工程的名字,在這裏爲 get_tieba_film

調用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@app.route('/')
@app.route('/index')
def index():
print ("耗時的任務")
# 任務已經交給異步處理了
result = get_film_content.apply_async(args=[1])
# 若是須要等待返回值,可使用get()或wait()方法
# result.wait()
return '耗時的任務已經交給了celery'


@celery.task()
def get_film_content(a):
util = SpiderRunUtil.SpiderRun(TieBaSpider.FilmSpider())
util.start()

綁定

一個綁定任務意味着任務函數的第一個參數老是任務實例自己(self),就像 Python 綁定方法相似:併發

1
2
3
@task(bind=True) 
def add(self, x, y):
logger.info(self.request.id)

 

任務繼承

任務裝飾器的 base 參數能夠聲明任務的基類

1
2
3
4
5
6
7
import celery
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print('{0!r} failed: {1!r}'.format(task_id, exc))
@task(base=MyTask)
def add(x, y):
raise KeyError()

 

任務名稱

每一個任務必須有不一樣的名稱。
若是沒有顯示提供名稱,任務裝飾器將會自動產生一個,產生的名稱會基於這些信息:
1)任務定義所在的模塊,
2)任務函數的名稱

顯示設置任務名稱的例子:

1
2
3
4
5
>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
... return x + y
>>> add.name
'sum-of-two-numbers'

最佳實踐是使用模塊名稱做爲命名空間,這樣的話若是有一個同名任務函數定義在其餘模塊也不會產生衝突。

1
2
3
>>> @app.task(name='tasks.add')
>>> def add(x, y):
... return x + y

 

7、安裝flower

將各個任務的執行狀況、各個worker的健康狀態進行監控並以可視化的方式展示

1
pip install flower

 

啓動flower(默認會啓動一個webserver,端口爲5555):

1

 

指定broker並啓動:

celery flower --broker=amqp://guest:guest@localhost:5672//  或
celery flower --broker=redis://guest:guest@localhost:6379/0

 

 

進入http://localhost:5555便可查看。

doc

8、常見錯誤

1
ERROR/MainProcess] consumer: Cannot connect to redis://localhost:6379/0:

緣由是:redis-server 沒有啓動
解決方案:到redis安裝目錄下執行redis-server.exe redis.windows.conf
檢查redis是否啓動:redis-cli ping

1
line 442, in on_task_received

解決:

1
2
3
4
5
6
7
8
9
10
Did you remember to import the module containing this task?
Or maybe you are using relative imports?
Please see http://bit.ly/gLye1c for more information.
The full contents of the message body was:
{'timelimit': (None, None), 'utc': True, 'chord': None, 'args': [4, 4], 'retries': 0, 'expires': None, 'task': 'main.add', 'callbacks': None,
'errbacks': None, 'taskset': None, 'kwargs': {}, 'eta': None, 'id': '97000322-93be-47e9-a082-4620e123dc5e'} (210b)
Traceback (most recent call last):
File "d:\vm_env\flask_card\lib\site-packages\celery\worker\consumer.py", line 442, in on_task_received
strategies[name](message, body,
KeyError: 'main.add'

緣由:任務沒有註冊或註冊不成功,只有在啓動的時候提示有任務的時候,才能使用該任務
flask_celeryflask_celery
解決:

  1. 你在那個類中使用celery就在哪一個類中執行celery -A 包名.類名.celery worker -l info
  2. 根據上一部提示的任務列表給任務設置對應的名稱
    如在Test中
1
2
3
4
5
6
from main import app, celery

@celery.task(name="main.Test.add".)
def add(x, y):
print "ddddsws"
return x + y

目錄結構:

1
2
3
4
5
6
+ Card  # 工程
+ main
+ admin
- Task.py
- __init__.py
- Test.py

 

則應該啓動的命令爲:

1
celery -A main.Test.celery worker -l info

 

同時,若是你的Task.py也有任務,那麼你還應該從新建立一個cmd窗口執行

1
celery -A main.admin.Task.celery worker -l info

 

celery的工做進程能夠建立多個
flask_celeryflask_celery
flask_celeryflask_celery

 

參考:

https://www.laoyuyu.me/2018/02/10/python_flask_celery/

https://www.cnblogs.com/cwp-bg/p/8759638.html

celery用戶指南,強烈推薦看

redis安裝

celery使用
https://redis.io/topics/quickstart

http://einverne.github.io/post/2017/05/celery-best-practice.html  Celery 最佳實踐 

http://orangleliu.info/2014/08/09/celery-best-practice/  Celery最佳實踐-正確使用celery的7條建議