Python RabbitMQ

RabbitMQjava

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。python

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。服務器

RabbitMQ安裝ide

安裝配置epel
  # rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm

安裝erlang
 
# yum -y install erlang

安裝RabbitMQ
 
# yum -y install rabbitmq-server

win下安裝推薦http://blog.csdn.net/a__java___a/article/details/17614797fetch


啓動/中止 service rabbitmq-server start/stopui


安裝APIgoogle

    pip install pikaspa

    or.net

    easy_install pikaorm

    or

    源碼

    https://pypi.python.org/pypi/pika


使用API操做RabbitMQ

基於Queue能夠實現生產者消費者模型,而對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。

import pika

# ######################### 生產者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='MQ1')

channel.basic_publish(exchange='',routing_key='MQ1',body='Hello World!')

print(" [x] Sent 'Hello World!'")

connection.close()

import pika

# ########################## 消費者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='MQ1')

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)

channel.basic_consume(callback,queue='MQ1',no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


1. acknowledgment 消息不丟失

no-ack = False,若是生產者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。

import pika
# ########################## 消費者 ##########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ1')

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)
   
import time
   time.sleep(10)
   
print('ok')
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='MQ1',no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


2. durable   消息不丟失

生產者消息持久化

import pika
# ######################### 生產者 #########################

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 消息持久化
channel.queue_declare(queue='MQ2', durable=True)

channel.basic_publish(exchange='',routing_key='MQ2',body='Hello World!',
                     properties=pika.BasicProperties(
                         
delivery_mode=2, # 消息持久化
                     
))
print(" [x] Sent 'Hello World!'")
connection.close()

import pika
# ########################## 消費者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ2', durable=True)  # 消息持久化

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)
   
import time
   time.sleep(10)
   
print('ok')
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,queue='MQ2',no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


3.  消息獲取順序

默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。

若是Rabbit只管按順序把消息發到各個消費者身上,不考慮消費者負載的話,極可能出現,一個機器配置不高的消費者那裏堆積了不少消息處理不完,同時配置高的消費者卻一直很輕鬆。爲解決此問題,能夠在各個消費者端,配置perfetch=1,意思就是告訴RabbitMQ在我這個消費者當前消息還沒處理完的時候就不要再給我發新消息了。

channel.basic_qos(prefetch_count=1) 表示誰來誰取,再也不按照奇偶數排列


import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='MQ2',durable=True)  # 消息持久化

def callback(ch, method, properties, body):
   
print(" [x] Received %r" % body)
   
import time
   time.sleep(10)
   
print('ok')
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,
                     queue='hello',
                     no_ack=False)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()


4. 發佈訂閱

發佈訂閱和簡單的消息隊列區別在於,發佈訂閱會將消息發送給全部的訂閱者,而消息隊列中的數據被消費一次便消失。因此,RabbitMQ實現發佈和訂閱時,會爲每個訂閱者建立一個隊列,而發佈者發佈消息時,會將消息放置在全部相關隊列中。

 exchange type = fanout

import pika,sys

##------------------------發佈者-----------------

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',routing_key='',body=message)

print(" [x] Sent %r" % message)
connection.close()

import pika

##------------------------訂閱者-----------------
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue    #獲取隨機隊列名

channel.queue_bind(exchange='logs',queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   
print(" [x] %r" % body)

channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()



5.  關鍵字發送

 exchange type = direct

以前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 斷定應該將數據發送至指定隊列。

import pika

# ######################### 生產者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                     routing_key= 'mingyue',   #綁定的關鍵字
                     
body=message)
print(" [x] Sent %r" % (message))
connection.close()

import pika
# # ########################## 消費者 ##########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 綁定兩個不一樣的關鍵字
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key= 'shuoming')
channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key= 'mingyue')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()

6.  模糊匹配

 exchange type = topic

在topic類型下,可讓隊列綁定幾個模糊的關鍵字,以後發送者將數據發送到exchange,exchange將傳入」路由值「和 」關鍵字「進行匹配,匹配成功,則將數據發送到指定隊列。

  • # 表示能夠匹配 0 個 或 多個 單詞

  • *  表示只能匹配 一個 單詞

發送者路由值              隊列中

www.google.com          www.*  -- 不匹配

www.baidu.com           www.#  -- 匹配

import pika

# ######################### 生產者 #########################
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')


message = 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                     routing_key= 'www.baidu.com',   #匹配模糊關鍵字
                     
body=message)
print(" [x] Sent %r" % (message))
connection.close()

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 綁定兩個不一樣的關鍵字
channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key= '
www.*')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
   
print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,queue=queue_name,no_ack=True)

channel.start_consuming()


Remote procedure call (RPC)

RPC 遠程過程調用。客戶端C,向服務端S請求一項服務,官網舉了一個計算fibonacci值的例子。C 向S請求計算fib(x),S 則計算,算完以後發給,或者說反饋給C。C收到反饋信息以後才能想S 繼續請求服務。

一個RPC流程是:

  • C 啓動,而後建立一個匿名私有(exclusive=Ture)的反饋隊列。

  • C 發送請求的時候,要附上reply_to(用戶反饋的隊列名)和correlation_id(反饋的***ID)。

  • 請求被髮送到 rpc_queue 隊列.

  • S 等待隊列. 發現有消息到達則計算fib(x)而後經過反饋隊列反饋給C.

  • C 等待反饋隊列,發現有反饋信息到達,比對反饋***ID。符合,發送下一個計算請求。不符合,再等。

  • wKioL1byJDihA7kpAAA5qk8GcaI689.png

RPC Server

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
   
if n == 0:
       
return 0
   
elif n == 1:
       
return 1
   
else:
       
return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body):
   
n = int(body)

   
print(" [.] fib(%s)" % n)
   
response = fib(n)

   
ch.basic_publish(exchange='',
                    routing_key=props.reply_to,
                    properties=pika.BasicProperties(correlation_id = \
                                                        props.correlation_id),
                    body=str(response))
   
ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()


RPC Client

import pika,uuid

class FibonacciRpcClient(object):
   
def __init__(self):
       
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
               
host='localhost'))

       
self.channel = self.connection.channel()

       
result = self.channel.queue_declare(exclusive=True)
       
self.callback_queue = result.method.queue

       self.channel.basic_consume(self.on_response, no_ack=True,
                                  queue=self.callback_queue)

   
def on_response(self, ch, method, props, body):
       
if self.corr_id == props.correlation_id:
           
self.response = body

   def call(self, n):
       
self.response = None
       
self.corr_id = str(uuid.uuid4())
       
self.channel.basic_publish(exchange='',
                                  routing_key='rpc_queue',
                                  properties=pika.BasicProperties(
                                       
reply_to = self.callback_queue,
                                        correlation_id = self.corr_id,
                                        ),
                                  body=str(n))
       
while self.response is None:
           
self.connection.process_data_events()
       
return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
相關文章
相關標籤/搜索