RabbitMQ(python實現)學習之二:Producer發送消息至多個消息隊列queue(廣播消息)

1.1本部份內容簡介python

這部分咱們將要發送一個消息到多個Consumer,這部分稱之爲「publish/subscribe」緩存

咱們實現的方式就是發送端,發送一個消息,與此同時,多個接收端將同時接收到消息並打印在屏幕上面。函數

1.2exchange簡介測試

在前面的博文中,咱們的講解是:發送端發送消息至消息隊列,接收端從消息隊列獲取消息。如今咱們來介紹一下rabbitmq的完整消息傳送模型。spa

>Producer:用來發送消息的應用程序命令行

>queue:用來存儲消息的緩存3d

>Consumer:用來接收消息的應用程序code

消息傳送模型的核心是,Producer從不會直接將消息傳送給queue,而是,將消息傳送給exchange,exchange是個很簡單的東西,在一側,他接收來自Producer的消息,另外一側將消息傳送給queue。exchange將消息傳送給你個queue,仍是傳送給多個queue,這主要是由exchange的type決定。模型圖以下:blog

        

 

exchange有不少type可用,以下:direct、topic、headers、fanout。本博客針對fanout講解,後續博文對其餘類型有所講解,讓咱們建立一個exchange,type爲fanout,名字爲logs,代碼以下:rabbitmq

channel.exchange(exchange='logs',type='fanout')

對於type爲fanout的exchange,理解起來很是簡單,它將接收到的消息,廣播給他所知道的全部的queue,即全部和他創建鏈接的queue。前面的博文降到了命令行查看list_exchanges的命令以下:

 $ :sudo rabbitmqctl list_exchanges
 Listing exchanges ...
 logs      fanout
 amq.direct      direct
 amq.topic       topic
 amq.fanout      fanout
 amq.headers     headers
 ...done.

對於上圖中,你會看到不少amq.*的exchange,這些是系統默認創建的,在你不創建exchange時,系統默認創建上面幾個。

對於消息的發佈函數basic_publish()也隨之變爲:

channel.basic_publish(exchange='logs',routing_key='',body=message)

1.3臨時queue

 正如你前面學到的,對於一個queue,會有本身的名字(hello什麼的),

首先:

result = channel.queue_declare()

而後經過result.method.queue,系統會隨機給queue命名。

若是咱們想Producer與Consumer斷開鏈接時,隊列queue刪除,那麼須要改爲下面的代碼:

result = channel.queue_declare(exclusive=True)

1.4Bingings(將queue與exchange綁定)

模型圖以下:

        

咱們已經建立了一個type爲fanout的exchange,如今,咱們要告訴exchange,將消息發送給咱們本身定義的queue,在exchange與queue之間創建鏈接的是binding,代碼以下:

channel.queue_bind(exchange='logs',queue=result.method.queue)

在命令行查看binding的列表,命令以下:

$: sudo rabbitmqctl list_bindings

 1.5最終代碼

最終的模型以下:

           

send.py代碼以下:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"  #若是鍵盤有輸入,message爲鍵盤輸入,若是鍵盤沒有輸入,消息message="info: Hello World!";
 channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

 

receive.py代碼

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] %r" % (body,)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

1.6代碼測試

開啓一個命令行窗口,運行send.py:

$: python send.py   #(此時你傳送的內容爲info: Hello World!)或者  

$: python send.py message
#message爲你想發送的內容

開啓兩個命令行窗口,分別運行receive.py,兩個窗口你會看到有相同的消息輸出:

$: python receive.py
相關文章
相關標籤/搜索