RabbitMQ

RabbitMQ和Kafka同樣,都是消息中間件。RabbitMQ中有多個隊列,每一個隊列稱之爲一個Topic。生產者Producer首先須要鏈接到RabbitMQ中的指定隊列,即在鏈接的時候指明消息發送到哪一個Topic中。消費者Consumer也須要鏈接到RabbitMQ,並指定訂閱哪一個Topic,即從哪一個隊列拉取消息進行消費。在Consumer中有一個回調函數,用於說明消費完這個消息以後該作什麼事,一般就是向RabbitMQ發送一個ACK信號,表示消息已經被成功消費,能夠從隊列中刪除了。若是消費者在處理消息的時候忽然宕機了,那麼RabbitMQ中的隊列依然存在,能夠繼續發送給其餘消費者進行處理。python

只要消費者一直不斷運行着(監聽),那麼生產者只要一發送消息到RabbitMQ中,就會被當即分發到對應的消費者中進行消費。此外,RabbitMQ中的消息還能夠持久化到磁盤中。數據結構

在Ubuntu中安裝RabbitMQ:   sudo apt-get install rabbitmq-server函數

python程序要鏈接到RabbitMQ中,須要import pika,pika是AMQP協議的開源python實現:server

  sudo pip install pika中間件

在RabbitMQ中,能夠經過 sudo rabbitmqctl list_queues 命令查看到當前RabbitMQ中的隊列及其對應的消息個數,<"Topic",num>,數據結構相似於Map。blog

 

消息發送端:rabbitmq

#!/usr/bin/env python
#coding=utf8
import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()

channel.queue_declare(queue='libi',durable=True)
message = ' '.join(sys.argv[1:]) or "bili111111111"
channel.basic_publish(exchange='', routing_key='libi', body=message,properties=pika.BasicProperties(delivery_mode = 2,))
print "Sent %r" % (message,)
connection.close()

  

消息消費端:隊列

# !/usr/bin/env python
# coding=utf8
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))
channel = connection.channel()
channel.queue_declare(queue='libi')

def callback(ch, method, properties, body):
    print "Received %r" % (body,)
    time.sleep(5)
    print "done"
    channel.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback, queue='libi', no_ack=False)
channel.start_consuming()
相關文章
相關標籤/搜索