Python rabbitmq的使用(二)

上一篇介紹了rabbitmq的安裝和經典的hello world!實例。這裏將對工做隊列(Work Queues)作一個瞭解。由於是接上一篇說明的,因此若是沒看過上一篇,看這篇可能會比較難理解。上一篇的地址是:ubuntu安裝rabbitmq和python的使用實現python

消息也能夠理解爲任務,消息發送者能夠理解爲任務分配者,消息接收者能夠理解爲工做者,當工做者接收到一個任務,還沒完成的時候,任務分配者又發一個任務過來,那就忙不過來了,因而就須要多個工做者來共同處理這些任務,這些工做者,就稱爲工做隊列。結構圖以下:ubuntu

rabbitmq的python實例工做隊列

rabbitmq的python實例工做隊列併發

準備工做(Preparation)app

在實例程序中,用new_task.py來模擬任務分配者, worker.py來模擬工做者。函數

修改send.py,從命令行參數裏接收信息,併發送fetch

1spa

2命令行

3code

4cdn

5

6

7

import sys

 

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

                      routing_key='hello',

                      body=message)

print " [x] Sent %r" % (message,)

修改receive.py的回調函數。

1

2

3

4

5

6

import time

 

def callback(ch, method, properties, body):

    print " [x] Received %r" % (body,)

    time.sleep( body.count('.') )

    print " [x] Done"

這邊先打開兩個終端,都運行worker.py,處於監聽狀態,這邊就至關於兩個工做者。打開第三個終端,運行new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....

觀察worker.py接收到任務,其中一個工做者接收到3個任務 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'

另一個工做者接收到2個任務 :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'

從上面來看,每一個工做者,都會依次分配到任務。那麼若是一個工做者,在處理任務的時候掛掉,這個任務就沒有完成,應當交由其餘工做者處理。因此應當有一種機制,當一個工做者完成任務時,會反饋消息。

消息確認(Message acknowledgment)

消息確認就是當工做者完成任務後,會反饋給rabbitmq。修改worker.py中的回調函數:

1

2

3

4

5

def callback(ch, method, properties, body):

    print " [x] Received %r" % (body,)

    time.sleep(5)

    print " [x] Done"

    ch.basic_ack(delivery_tag = method.delivery_tag)

這邊停頓5秒,能夠方便ctrl+c退出。

去除no_ack=True參數或者設置爲False也能夠。

1

channel.basic_consume(callback, queue='hello', no_ack=False)

用這個代碼運行,即便其中一個工做者ctrl+c退出後,正在執行的任務也不會丟失,rabbitmq會將任務從新分配給其餘工做者。

消息持久化存儲(Message durability)

雖然有了消息反饋機制,可是若是rabbitmq自身掛掉的話,那麼任務仍是會丟失。因此須要將任務持久化存儲起來。聲明持久化存儲:

1

channel.queue_declare(queue='hello', durable=True)

可是這個程序會執行錯誤,由於hello這個隊列已經存在,而且是非持久化的,rabbitmq不容許使用不一樣的參數來從新定義存在的隊列。從新定義一個隊列:

1

channel.queue_declare(queue='task_queue', durable=True)

在發送任務的時候,用delivery_mode=2來標記任務爲持久化存儲:

1

2

3

4

5

6

channel.basic_publish(exchange='',

                      routing_key="task_queue",

                      body=message,

                      properties=pika.BasicProperties(

                         delivery_mode = 2# make message persistent

                      ))

公平調度(Fair dispatch)

上面實例中,雖然每一個工做者是依次分配到任務,可是每一個任務不必定同樣。可能有的任務比較重,執行時間比較久;有的任務比較輕,執行時間比較短。若是能公平調度就最好了,使用basic_qos設置prefetch_count=1,使得rabbitmq不會在同一時間給工做者分配多個任務,即只有工做者完成任務以後,纔會再次接收到任務。

1

channel.basic_qos(prefetch_count=1)

new_task.py完整代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

#!/usr/bin/env python

import pika

import sys

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host='localhost'))

channel = connection.channel()

 

channel.queue_declare(queue='task_queue', durable=True)

 

message = ' '.join(sys.argv[1:]) or "Hello World!"

channel.basic_publish(exchange='',

                      routing_key='task_queue',

                      body=message,

                      properties=pika.BasicProperties(

                         delivery_mode = 2# make message persistent

                      ))

print " [x] Sent %r" % (message,)

connection.close()

worker.py完整代碼

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

#!/usr/bin/env python

import pika

import time

 

connection = pika.BlockingConnection(pika.ConnectionParameters(

        host='localhost'))

channel = connection.channel()

 

channel.queue_declare(queue='task_queue', durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

 

def callback(ch, method, properties, body):

    print " [x] Received %r" % (body,)

    time.sleep( body.count('.') )

    print " [x] Done"

    ch.basic_ack(delivery_tag = method.delivery_tag)

 

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,

                      queue='task_queue')

 

channel.start_consuming()

相關文章
相關標籤/搜索