在下圖中的x即是exchangepython
P是生產者,紅色爲queuespa
X能夠將P的task進行過濾,從而決定將task作如何處理:例如:code
(1),捨棄任務it
(2),將任務發送到某個taskio
(3),將任務發送到全部taskclass
exchange有4中類型:direct, topic, headers and fanoutimport
此次主要使用fanoutcoding
emit_log.py:task將被髮送到exchange
循環
# -*- coding: UTF-8 -*- import pika if __name__ == '__main__': connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel = connection.channel() channel.exchange_declare(exchange="logs",type="fanout") message = "You are awsome!" for i in range(0, 100): # 循環100次發送消息 channel.basic_publish(exchange="logs", routing_key='', body=message + " " + str(i),) print "sending ", message
receive_log.pychannel
# -*- coding: UTF-8 -*- import pika __author__ = 'Yue' def callback(ch, method, properties, body): print body if __name__ == '__main__': connection=pika.BlockingConnection(pika.ConnectionParameters("localhost")) channel=connection.channel() channel.exchange_declare(exchange="logs",type="fanout") #隨機生成Queue result=channel.queue_declare(exclusive=True) #獲取queue的name queue_name=result.method.queue print "queue_name",queue_name channel.queue_bind(exchange="logs",queue=queue_name) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()