RabbitMQ和Kafka同樣,都是消息中間件。RabbitMQ中有多個隊列,每一個隊列稱之爲一個Topic。生產者Producer首先須要鏈接到RabbitMQ中的指定隊列,即在鏈接的時候指明消息發送到哪一個Topic中。消費者Consumer也須要鏈接到RabbitMQ,並指定訂閱哪一個Topic,即從哪一個隊列拉取消息進行消費。在Consumer中有一個回調函數,用於說明消費完這個消息以後該作什麼事,一般就是向RabbitMQ發送一個ACK信號,表示消息已經被成功消費,能夠從隊列中刪除了。若是消費者在處理消息的時候忽然宕機了,那麼RabbitMQ中的隊列依然存在,能夠繼續發送給其餘消費者進行處理。python
只要消費者一直不斷運行着(監聽),那麼生產者只要一發送消息到RabbitMQ中,就會被當即分發到對應的消費者中進行消費。此外,RabbitMQ中的消息還能夠持久化到磁盤中。數據結構
在Ubuntu中安裝RabbitMQ: sudo apt-get install rabbitmq-server函數
python程序要鏈接到RabbitMQ中,須要import pika,pika是AMQP協議的開源python實現:server
sudo pip install pika中間件
在RabbitMQ中,能夠經過 sudo rabbitmqctl list_queues 命令查看到當前RabbitMQ中的隊列及其對應的消息個數,<"Topic",num>,數據結構相似於Map。blog
消息發送端:rabbitmq
#!/usr/bin/env python #coding=utf8 import sys import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='libi',durable=True) message = ' '.join(sys.argv[1:]) or "bili111111111" channel.basic_publish(exchange='', routing_key='libi', body=message,properties=pika.BasicProperties(delivery_mode = 2,)) print "Sent %r" % (message,) connection.close()
消息消費端:隊列
# !/usr/bin/env python # coding=utf8 import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='libi') def callback(ch, method, properties, body): print "Received %r" % (body,) time.sleep(5) print "done" channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='libi', no_ack=False) channel.start_consuming()