環境說明:python
window7 X64redis
python 2.7.6 、celery 3.1.2五、redis 2.10.6數據庫
本地安裝的redis服務端版本號:Redis-x64-3.2.100app
工程結構說明:源文件下載請訪問https://i.cnblogs.com/Files.aspx異步
一、tasks.py:實例化celery,並定義生成任務的方法add()測試
# -*- coding: utf-8 -*- ''' Created on 2019年8月27日 @author: lenovo ''' import time from celery import Celery from celery.bin.multi import celery_exe #實例化一個celery broker='redis://localhost:6379/1' #接收發送過來的任務,並等待celery的worker進行消費 backend='redis://localhost:6379/2' #celery的worker消費完任務後,backend保存任務執行結果 app=Celery('my_task',broker=broker,backend=backend) #在add()方法上方添加裝飾器,將該方法轉換爲異步的 @app.task def add(x,y): print 'enter call func...' time.sleep(4) return x+y
二、app.py:spa
# -*- coding: utf-8 -*- ''' Created on 2019年8月27日 @author: lenovo ''' from tasks import add if __name__ == '__main__': print 'start task...' result=add.delay(3, 8) #將任務發送給tasks.py中celery的broker print 'end task...' print result
三、執行app.py生成新任務,在未啓動worker前,該任務保存在broker指定的redis數據庫中等待worker進行消費;3d
{"body": "gAJ9cQEoVQdleHBpcmVzcQJOVQN1dGNxA4hVBGFyZ3NxBEsDSwiGcQVVBWNob3JkcQZOVQljYWxsYmFja3NxB05VCGVycmJhY2tzcQhOVQd0YXNrc2V0cQlOVQJpZHEKVSQyMTlhOWU2My0yZjExLTRiMmEtYmNjZi0yYTY2MTMzZjY3NGRxC1UHcmV0cmllc3EMSwBVBHRhc2txDVUJdGFza3MuYWRkcQ5VCXRpbWVsaW1pdHEPTk6GVQNldGFxEE5VBmt3YXJnc3ERfXESdS4=", "headers": {}, "content-type": "application/x-python-serialize", "properties": {"body_encoding": "base64", "correlation_id": "219a9e63-2f11-4b2a-bccf-2a66133f674d", "reply_to": "fa6b65cc-0efa-3ddc-bd74-246023f65e8d", "delivery_info": {"priority": 0, "routing_key": "celery", "exchange": "celery"}, "delivery_mode": 2, "delivery_tag": "aa0f8035-a256-46b3-84a9-acf8e9d490db"}, "content-encoding": "binary"}日誌
四、啓動worker,消費該任務;code
cmd到celery實例化文件tasks.py所在路徑下,執行命令:python -m celery -A tasks worker --loglevel=info 或者 celery -A tasks worker --loglevel=info
從日誌打印和下圖redis數據庫能夠看到,worker啓動後,馬上從redis中將未消費的任務進行了消費;
任務執行的結果保存在backend設定的redis數據庫中,以下圖所示:
五、測試新任務:
發送任務:
查看worker執行狀況:
查看backend保存結果:
六、異步與同步效果比對:
同步處理任務:
工程結構:
app.py:同步處理任務
# -*- coding: utf-8 -*- import time ''' Created on 2019年8月27日 @author: lenovo ''' def add(x,y): print 'enter call func...' time.sleep(4) return x+y if __name__ == '__main__': print 'start task...' result=add(2, 8) print 'end task...' print result
異步處理任務:
能夠看出,同步執行任務時須要等待任務過程執行完畢後纔會繼續,執行任務過程當中存在阻塞現象;而採用異步處理,生成任務后里面就結束了,執行任務不存在阻塞現象;