python 使用 RabbitMQ

1、RabbitMQ消息隊列介紹

RabbitMQ是在兩個獨立得python程序,或其餘語言交互時使用。
RabbitMQ:erlang語言 開發的。
Python中鏈接RabbitMQ的模塊:pika 、Celery(分佈式任務隊列) 、haigha
能夠維護不少得隊列
RabbitMQ 教程官網:http://www.rabbitmq.com/getstarted.html
幾個概念說明:html

Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,他制定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入一個或多個隊列。
Binding:綁定,他的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠設多個vhost,用做不一樣用戶得權限分離。
producer:消息生產者,就是投遞消息得程序。
consumer:消息消費者,就是接受消息得程序。
channel:消息通道,在客戶端得每一個鏈接裏。能夠創建多個channel,每一個channel表明一個會話任務。python


2、RabbitMQ基本示範

1.Rabbitmq安裝

ubuntu系統mysql

install rabbitmq-server # 直接搞定
---
centos系統
1)Install Erlangsql

1)Install Erlang
 # For EL5:
 rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm
 # For EL6:
 rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 # For EL7:
 rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-8.noarch.rpm
 yum install erlang

2)Install RabbitMQ Serverapache

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpmubuntu

3)use RabbitMQ Servercentos

chkconfig rabbitmq-server on
service rabbitmq-server stop/start服務器

2.基本示例子

發送端producerdom

import pika
 
# 創建一個實例
connection = pika.BlockingConnection(
  pika.ConnectionParameters('localhost',5672) # 默認端口5672,可不寫
  )
# 聲明一個管道,在管道里發消息
channel = connection.channel()
# 在管道里聲明queue
channel.queue_declare(queue='hello')
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='',
           routing_key='hello', # queue名字
           body='Hello World!') # 消息內容
print(" [x] Sent 'Hello World!'")
connection.close() # 隊列關閉

接收端consumersocket

import pika
import time
# 創建實例
connection = pika.BlockingConnection(pick.ConnectionParameters('hocalhost'))
# 聲明管道
channel = connection.channel()

# 爲何聲明瞭一個‘hello’隊列?
# 若是肯定已經聲明瞭,能夠不聲明。可是你不知道那個機器先運行,因此要聲明兩次。
# 一般是先運行消費者
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):#四個參數爲標準格式
    print("[x]Received %r"%body)
    time.sleep(15)
    ch.basic_ack(delivry_tay = method.delivery_tay)# 告訴生成者,消息處理完成
channel.basic_consume(# 消費消息
    callback, # 若是收到消息,就調用callback函數來處理消息
    queue='hello',# 要消費的隊列
    # no ack=True # 消息確認
    # 通常不寫。宕機則生產者檢測到發給其餘消費者
    )
print('[*]Waiting for messages.To exit press CTRL+C')
channel.start_consuming() # 開始消費消息

3.RabbitMQ消息分發輪詢

一個生產者多個消費者
採用輪詢機制;把消息依次分發
假如消費者處理雄安熙須要15秒,若是宕機了,那這個消息處理尚未處理完,怎麼處理?
(能夠模擬消費端斷了,分別註釋和不註釋no_ack=True看一下)
沒有回覆,就表明消息沒有處理完,
上面的效果消費端斷了就轉到另一個消費端去了,可是生產者怎麼知道消費端斷了呢?
由於生產者和消費者是經過socket鏈接的,socket斷了,就說明消費端斷開了。

上面的模式只是依次分發,實際狀況是機器配置不同。怎麼設置相似權重的操做?
RabbitMQ怎麼辦呢,RabbitMQ作了簡單的處理就能實現公平的分發。
就是RabbitMQ給消費者發消息的時候檢測下消費者裏的消息數量,若是超過指定值(好比1條),就不給你發了。
只須要在消費者端,channel.basic_consume前加上就能夠了。

channel.basic_qos(prefetch_count=1)# 相似權重,按能力分發,若是有一個消息,就不在給你發
channel.basic_consume( # 消費消息

3、Rabbit MQ消息持久化(durable、properties)

1.RabbitMQ相關命令

rabbitmqctl list_queues # 查看當前queue數量及queue裏消息數量

2.消息持久化

若是隊列裏還有消息,RabbitMQ服務端宕機了呢?消息還在不在?
把RabbitMQ服務重啓,看一下消息在不在。
上面的狀況下,宕機了,消息就久了,下面看看如何把消息持久化。
每次聲明隊列的時候,都加上durable,注意每一個隊列都得寫,客戶端、服務端聲明的時候都得寫。

在管道里聲明queue
channel.queue_declare(queue='hello2', durable=True)
測試結果發現,只是把隊列持久化了,但隊列裏的消息沒了。
durable的做用只是把隊列持久化。離消息持久話還差一步:
發送端發送消息時,加上properties

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
             type='direct')
# 重要程度級別,這裏默認定義爲 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
           routing_key=severity,
           body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

接收端subscriber

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
             type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
           routing_key=routing_key,
           body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

接收端 consumer

import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello2', durable=True)
 
def callback(ch, method, properties, body):
  print(" [x] Received %r" % body)
  time.sleep(10)
  ch.basic_ack(delivery_tag = method.delivery_tag) # 告訴生產者,消息處理完成
 
channel.basic_qos(prefetch_count=1) # 相似權重,按能力分發,若是有一個消息,就不在給你發
channel.basic_consume( # 消費消息
           callback, # 若是收到消息,就調用callback
           queue='hello2',
           # no_ack=True # 通常不寫,處理完接收處理結果。宕機則發給其餘消費者
           )
 
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

4、RabbitMQ廣播模式(exchange)

前面的效果都是一對一發,若是作一個廣播效果可不能夠,這時候就要用到exchange了
exchange必須精確的知道收到的消息要發給誰。exchange的類型決定了怎麼處理,
類型有如下幾種:

fanout: 全部綁定到此exchange的queue均可以接收消息
direct: 經過routingKey和exchange決定的那個惟一的queue能夠接收消息
topic: 全部符合routingKey(此時能夠是一個表達式)的routingKey所bind的queue能夠接收消息

1.fanout 純廣播、all

須要queue和exchange綁定,由於消費者不是和exchange直連的,消費者是連在queue上,queue綁定在exchange上,消費者只會在queu裏度消息

|------------------------|
     |      /—— queue <—|—> consumer1
producer —|—exchange1 <bind    |         
    \ |      \—— queue <—|—> consumer2
    \-|-exchange2  ……    |
     |------------------------|

發送端 publisher 發佈、廣播

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
# 注意:這裏是廣播,不須要聲明queue
channel.exchange_declare(exchange='logs', # 聲明廣播管道
             type='fanout')
 
# message = ' '.join(sys.argv[1:]) or "info: Hello World!"
message = "info: Hello World!"
channel.basic_publish(exchange='logs',
           routing_key='', # 注意此處空,必須有
           body=message)
print(" [x] Sent %r" % message)
connection.close()

接收端 subscriber 訂閱

import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='logs',
             type='fanout')
# 不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除
result = channel.queue_declare(exclusive=True)
# 獲取隨機的queue名字
queue_name = result.method.queue
print("random queuename:", queue_name)
 
channel.queue_bind(exchange='logs', # queue綁定到轉發器上
          queue=queue_name)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
  print(" [x] %r" % body)
 
channel.basic_consume(callback,
           queue=queue_name,
           no_ack=True)
 
channel.start_consuming()

注意:廣播,是實時的,收不到就沒了,消息不會存下來,相似收音機。

2.direct 有選擇的接收消息

接收者能夠過濾消息,只收我想要的消息
發送端publisher

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
             type='direct')
# 重要程度級別,這裏默認定義爲 info
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
           routing_key=severity,
           body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

接收端subscriber

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
             type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運行腳本全部的參數
severities = sys.argv[1:]
if not severities:
  sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
  sys.exit(1)
# 循環列表去綁定
for severity in severities:
  channel.queue_bind(exchange='direct_logs',
            queue=queue_name,
            routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
  print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
           queue=queue_name,
           no_ack=True)
 
channel.start_consuming()

運行接收端,指定接收級別的參數,例:

python direct_sonsumer.py info warning
python direct_sonsumer.py warning error

3.topic 更細緻的過濾

好比把error中,apache和mysql的分別或取出來

發送端publisher

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='topic_logs',
             type='topic')
 
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
           routing_key=routing_key,
           body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

接收端 subscriber

import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.exchange_declare(exchange='direct_logs',
             type='direct')
 
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 獲取運行腳本全部的參數
severities = sys.argv[1:]
if not severities:
  sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
  sys.exit(1)
# 循環列表去綁定
for severity in severities:
  channel.queue_bind(exchange='direct_logs',
            queue=queue_name,
            routing_key=severity)
 
print(' [*] Waiting for logs. To exit press CTRL+C')
 
def callback(ch, method, properties, body):
  print(" [x] %r:%r" % (method.routing_key, body))
 
channel.basic_consume(callback,
           queue=queue_name,
           no_ack=True)
 
channel.start_consuming()

運行接收端,指定接收哪些消息,例:

python topic_sonsumer.py *.info
python topic_sonsumer.py *.error mysql.*
python topic_sonsumer.py '#' # 接收全部消息
 
# 接收全部的 logs run:
# python receive_logs_topic.py "#"
 
# To receive all logs from the facility "kern":
# python receive_logs_topic.py "kern.*"
 
# Or if you want to hear only about "critical" logs:
# python receive_logs_topic.py "*.critical"
 
# You can create multiple bindings:
# python receive_logs_topic.py "kern.*" "*.critical"
 
# And to emit a log with a routing key "kern.critical" type:
# python emit_log_topic.py "kern.critical" "A critical kernel error"

4.RabbitMQ RPC 實現(Remote procedure call)

不知道你有沒有發現,上面的流都是單向的,若是遠程的機器執行完返回結果,就實現不了了。
若是返回,這種模式叫什麼呢,RPC(遠程過程調用),snmp就是典型的RPC
RabbitMQ能不能返回呢,怎麼返回呢?既是發送端又是接收端。
可是接收端返回消息怎麼返回?能夠發送到發過來的queue裏麼?不能夠。
返回時,再創建一個queue,把結果發送新的queue裏
爲了服務端返回的queue不寫死,在客戶端給服務端發指令的的時候,同時帶一條消息說,你結果返回給哪一個queue

RPC client

import pika
import uuid
import time
 
class FibonacciRpcClient(object):
  def __init__(self):
    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
 
    self.channel.basic_consume(self.on_response, # 只要一收到消息就調用on_response
                  no_ack=True,
                  queue=self.callback_queue) # 收這個queue的消息
 
  def on_response(self, ch, method, props, body): # 必須四個參數
    # 若是收到的ID和本機生成的相同,則返回的結果就是我想要的指令返回的結果
    if self.corr_id == props.correlation_id:
      self.response = body
 
  def call(self, n):
    self.response = None # 初始self.response爲None
    self.corr_id = str(uuid.uuid4()) # 隨機惟一字符串
    self.channel.basic_publish(
        exchange='',
        routing_key='rpc_queue', # 發消息到rpc_queue
        properties=pika.BasicProperties( # 消息持久化
          reply_to = self.callback_queue, # 讓服務端命令結果返回到callback_queue
          correlation_id = self.corr_id, # 把隨機uuid同時發給服務器
        ),
        body=str(n)
    )
    while self.response is None: # 當沒有數據,就一直循環
      # 啓動後,on_response函數接到消息,self.response 值就不爲空了
      self.connection.process_data_events() # 非阻塞版的start_consuming()
      # print("no msg……")
      # time.sleep(0.5)
    # 收到消息就調用on_response
    return int(self.response)
 
if __name__ == '__main__':
  fibonacci_rpc = FibonacciRpcClient()
  print(" [x] Requesting fib(7)")
  response = fibonacci_rpc.call(7)
  print(" [.] Got %r" % response)

RPC server

import pika
import time
 
def fib(n):
  if n == 0:
    return 0
  elif n == 1:
    return 1
  else:
    return fib(n-1) + fib(n-2)
 
def on_request(ch, method, props, body):
  n = int(body)
  print(" [.] fib(%s)" % n)
  response = fib(n)
 
  ch.basic_publish(
      exchange='', # 把執行結果發回給客戶端
      routing_key=props.reply_to, # 客戶端要求返回想用的queue
      # 返回客戶端發過來的correction_id 爲了讓客戶端驗證消息一致性
      properties=pika.BasicProperties(correlation_id = props.correlation_id),
      body=str(response)
  )
  ch.basic_ack(delivery_tag = method.delivery_tag) # 任務完成,告訴客戶端
 
if __name__ == '__main__':
  connection = pika.BlockingConnection(pika.ConnectionParameters(
      host='localhost'))
  channel = connection.channel()
  channel.queue_declare(queue='rpc_queue') # 聲明一個rpc_queue ,
 
  channel.basic_qos(prefetch_count=1)
  # 在rpc_queue裏收消息,收到消息就調用on_request
  channel.basic_consume(on_request, queue='rpc_queue')
  print(" [x] Awaiting RPC requests")
  channel.start_consuming()
相關文章
相關標籤/搜索