基於之前的一篇文章,celery+Rabbit MQ的安裝和使用,html
本文更加詳細的介紹如何安裝和使用celey, Rabbit MQ。
並記錄在使用celery時遇到的一些問題。python
在 OS X上,直接執行以下命令:後端
$ brew install rabbitmq
其餘操做系統下的安裝能夠參考安裝 RabbitMQapi
啓動RabbitMQ服務器
$ sudo rabbitmq-server
你也能夠添加 -detached 屬性來讓它在後臺運行(注意:只有一個破折號):app
$ sudo rabbitmq-server -detached
查看RabbitMQ狀態異步
$ sudo rabbitmqctl status
中止RabbitMQsocket
永遠不要用 kill 中止 RabbitMQ 服務器,而是應該用 rabbitmqctl 命令:async
$ sudo rabbitmqctl stop
添加用戶tcp
默認用戶guest,密碼guest,只容許本地訪問,如需遠程訪問,須要設置.
$ sudo rabbitmqctl add_user test 123456 Adding user "test" ...
添加虛擬主機,並賦予用戶test權限
$ sudo rabbitmqctl add_vhost myvhost Adding vhost "myvhost" ...
$ sudo rabbitmqctl set_permissions -p myvhost test ".*" ".*" ".*" Setting permissions for user "test" in vhost "myvhost" ...
$ mkdir celery_demo $ cd celery_demo $ virtualenv -p python3 venv3
$ ./venv3/bin/pip install celery
建立配置文件 celeryconfig.py,裏面包含BROKER_URL
、CELERYD_LOG_FORMAT
、CELERY_ROUTES
.
# celeryconfig.py RABBIT_MQ = { 'HOST': '127.0.0.1', 'PORT': 5672, 'USER': 'test', 'PASSWORD': '123456' } # broker BROKER_URL = 'amqp://%s:%s@%s:%s/myvhost' % (RABBIT_MQ['USER'], RABBIT_MQ['PASSWORD'], RABBIT_MQ['HOST'], RABBIT_MQ['PORT']) # celery日誌格式 CELERYD_LOG_FORMAT = '[%(asctime)s] [%(levelname)s] %(message)s' CELERY_ROUTES = { 'demo_task.add': {'queue': 'sunday'}, }
其中,參數定義以下:
BROKER_URL
指定了broker信息,即消息隊列的地址。
CELERYD_LOG_FORMAT
指定了日誌格式。
CELERY_ROUTES
指定了路由信息,即調用demo_task.add
後,消息具體放入哪一個隊列,這裏是隊列名稱爲sunday
。
消費者
消費者代碼以下:
# demo_task.py from celery import Celery app = Celery("orange", backend='amqp') app.config_from_object("celeryconfig") @app.task def add(x, y): return x + y
首先,建立Celery
實例,從文件中讀取配置。
接着,定義task。
其中,在建立Celery
實例時,參數backend
指定告終果存儲後端,用於追蹤task執行狀態和結果。這裏使用amqp
,即便用RabbitMQ保存結果。默認狀況下,backend
參數是關閉的。
使用celery worker
啓動消費者
./venv3/bin/celery worker -A demo_task -Q sunday --loglevel=info -f app.log celery@admindeMacBook-Pro-2.local v4.3.0 (rhubarb) Darwin-18.2.0-x86_64-i386-64bit 2019-04-20 17:49:40 [config] .> app: orange:0x109ca0400 .> transport: amqp://test:**@127.0.0.1:5672/myvhost .> results: disabled:// .> concurrency: 8 (prefork) .> task events: OFF (enable -E to monitor tasks in this worker) [queues] .> sunday exchange=sunday(direct) key=sunday [tasks] . demo_task.add
其中,參數定義以下:
參數-A
是app name,即定義celery的文件。
參數-Q
指定了隊列的名稱,若是不指定,默認爲celery
。
參數-f
指定了日誌打印文件。
能夠經過如下命令查看更多幫助信息:
celery help
查看celery的選項celery worker --help
查看worker的選項生產者
代碼以下api.py所示:
# api.py from demo_task import add print("start...") result = add.apply_async((1, 2)) print("result:", result) print(result.ready()) print("end...")
代碼中,將1
、2
參數放入消息隊列。
celery配置backend
參數後, 調用任務時,會返回 AsyncResult
實例。
AsyncResult
的ready()
方法能夠查看任務是否完成處理。
執行api.py
執行結果分兩種狀況:worker啓動和沒有啓動。
當celery worker啓動的時候,結果以下:
./venv3/bin/python api.py start... result: 60dd0ab6-5fa2-4190-954e-584eb519384f True end...
能夠看到,task很快獲得執行,結果狀態爲True。
當celery worker沒有啓動的時候,結果以下:
./venv3/bin/python api.py start... result: 7280466f-73cd-44ec-85e7-5ad4f079a797 False end...
能夠看到,結果狀態爲False。
直接獲取結果
可使用get()
函數等待任務完成,但這不多使用,由於它把異步調用變成了同步調用。
對於get()
的使用,按照是否設置超時參數,分爲兩種:不使用超時參數timeout和使用超時參數timeout。
(1)不使用超時參數timeout
默認timeout爲None
,不會超時,而是阻塞住。
經過tcpdump抓包,能夠發現,get
調用後,每隔30s,RabbitMQ(server端)向生產者API(client端)發送心跳包,共發了5次。
最後,過了3分鐘,RabbitMQ(server端)關閉鏈接,生產者API(client端)也關閉鏈接,報錯:
start... result: aae4fb20-823c-4674-8bff-28429f14d5d7 False Traceback (most recent call last): File "api.py", line 13, in <module> result.get() File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/result.py", line 226, in get on_message=on_message, File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/base.py", line 496, in wait_for_pending no_ack=no_ack, File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 146, in wait_for on_interval=on_interval) File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 223, in consume conn, consumer, timeout, on_interval)[task_id] File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 204, in drain_events wait(timeout=1) File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/kombu/connection.py", line 315, in drain_events return self.transport.drain_events(self.connection, **kwargs) File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/kombu/transport/pyamqp.py", line 103, in drain_events return connection.drain_events(**kwargs) File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/connection.py", line 500, in drain_events while not self.blocking_read(timeout): File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/connection.py", line 505, in blocking_read frame = self.transport.read_frame() File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/transport.py", line 256, in read_frame frame_header = read(7, True) File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/amqp/transport.py", line 448, in _read raise IOError('Server unexpectedly closed connection') OSError: Server unexpectedly closed connection
(2)使用超時參數timeout
若是使用timeout
參數,get(timeout)
調用發起後, 當超過timeout
指定的時間仍然沒有得到結果,會超時報錯。
例如,調用get(timeout=1)
:
... ... # sync print(result.get(timeout=1)) print("end...")
若是worker沒有啓動或者worker處理超時,get
會報超時錯誤:
start... result: 190ad871-aae2-48ac-a18e-962cd2d537b7 False Traceback (most recent call last): File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 146, in wait_for on_interval=on_interval) File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 223, in consume conn, consumer, timeout, on_interval)[task_id] File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 202, in drain_events raise socket.timeout() socket.timeout During handling of the above exception, another exception occurred: Traceback (most recent call last): File "api.py", line 14, in <module> print(result.get(timeout=1)) File "/workspace//celery_demo/venv3/lib/python3.6/site-packages/celery/result.py", line 226, in get on_message=on_message, File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/base.py", line 496, in wait_for_pending no_ack=no_ack, File "/workspace/celery_demo/venv3/lib/python3.6/site-packages/celery/backends/amqp.py", line 148, in wait_for raise TimeoutError('The operation timed out.') celery.exceptions.TimeoutError: The operation timed out.
查看app.log中的日誌
[2019-04-20 20:53:59,327] [INFO] Connected to amqp://test:**@127.0.0.1:5672/myvhost [2019-04-20 20:53:59,345] [INFO] mingle: searching for neighbors [2019-04-20 20:54:00,379] [INFO] mingle: all alone [2019-04-20 20:54:00,441] [INFO] celery@admindeMacBook-Pro-2.local ready. [2019-04-20 20:55:46,498] [INFO] Received task: demo_task.add[1c8d47bd-449d-4bd9-b4db-819777081d23] [2019-04-20 20:55:46,561] [INFO] Task demo_task.add[1c8d47bd-449d-4bd9-b4db-819777081d23] succeeded in 0.060540573904290795s: 3
調用apply_async
時使用參數expires
,則表示任務有超時時間,超過這個時間後,task不會獲得執行。
result = add.apply_async((1, 2), expires=10)
當worker沒有啓動或者其餘異常狀況下,會出現任務超時,不被執行。
查看日誌,能夠看到task過時,不會被執行。
[2019-04-20 21:32:45,828] [INFO] Received task: demo_task.add[12691c27-3f2f-4c96-9b4e-54636e20d0eb] expires:[2019-04-20 13:31:51.030013+00:00] [2019-04-20 21:32:45,829] [INFO] Discarding revoked task: demo_task.add[12691c27-3f2f-4c96-9b4e-54636e20d0eb]