上一篇介紹了rabbitmq的安裝和經典的hello world!實例。這裏將對工做隊列(Work Queues)作一個瞭解。由於是接上一篇說明的,因此若是沒看過上一篇,看這篇可能會比較難理解。上一篇的地址是:ubuntu安裝rabbitmq和python的使用實現python
消息也能夠理解爲任務,消息發送者能夠理解爲任務分配者,消息接收者能夠理解爲工做者,當工做者接收到一個任務,還沒完成的時候,任務分配者又發一個任務過來,那就忙不過來了,因而就須要多個工做者來共同處理這些任務,這些工做者,就稱爲工做隊列。結構圖以下:ubuntu
rabbitmq的python實例工做隊列併發
準備工做(Preparation)app
在實例程序中,用new_task.py來模擬任務分配者, worker.py來模擬工做者。函數
修改send.py,從命令行參數裏接收信息,併發送fetch
1spa 2命令行 3code 4cdn 5 6 7 |
|
修改receive.py的回調函數。
1 2 3 4 5 6 |
|
這邊先打開兩個終端,都運行worker.py,處於監聽狀態,這邊就至關於兩個工做者。打開第三個終端,運行new_task.py
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
觀察worker.py接收到任務,其中一個工做者接收到3個任務 :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
另一個工做者接收到2個任務 :
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
從上面來看,每一個工做者,都會依次分配到任務。那麼若是一個工做者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其餘工做者處理。因此應當有一種機制,當一個工做者完成任務時,會反饋消息。
消息確認(Message acknowledgment)
消息確認就是當工做者完成任務後,會反饋給rabbitmq。修改worker.py中的回調函數:
1 2 3 4 5 |
|
這邊停頓5秒,能夠方便ctrl+c退出。
去除no_ack=True參數或者設置爲False也能夠。
1 |
|
用這個代碼運行,即便其中一個工做者ctrl+c退出後,正在執行的任務也不會丟失,rabbitmq會將任務從新分配給其餘工做者。
消息持久化存儲(Message durability)
雖然有了消息反饋機制,可是若是rabbitmq自身掛掉的話,那麼任務仍是會丟失。因此須要將任務持久化存儲起來。聲明持久化存儲:
1 |
|
可是這個程序會執行錯誤,由於hello這個隊列已經存在,而且是非持久化的,rabbitmq不容許使用不一樣的參數來從新定義存在的隊列。從新定義一個隊列:
1 |
|
在發送任務的時候,用delivery_mode=2來標記任務爲持久化存儲:
1 2 3 4 5 6 |
|
公平調度(Fair dispatch)
上面實例中,雖然每一個工做者是依次分配到任務,可是每一個任務不必定同樣。可能有的任務比較重,執行時間比較久;有的任務比較輕,執行時間比較短。若是能公平調度就最好了,使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工做者分配多個任務,即只有工做者完成任務以後,纔會再次接收到任務。
1 |
|
new_task.py完整代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
|
worker.py完整代碼
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
|