python 之 rabbitMQ

RabbitMQ

RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。他遵循Mozilla Public License開源協議。html

MQ全稱爲Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通訊方法。應用程序經過讀寫出入隊列的消息(針對應用程序的數據)來通訊,而無需專用鏈接來連接它們。消 息傳遞指的是程序之間經過在消息中發送數據進行通訊,而不是經過直接調用彼此來通訊,直接調用一般是用於諸如遠程過程調用的技術。排隊指的是應用程序經過 隊列來通訊。隊列的使用除去了接收和發送應用程序同時執行的要求。python

RabbitMQ安裝算法

安裝配置epel源
   # 64位源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/x86_64/epel-release-6-8.noarch.rpm
   # 32位源
   $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
 
安裝erlang(依賴包)
   $ yum -y install erlang
 
安裝RabbitMQ
   $ yum -y install rabbitmq-server
默認端口爲:5672

啓動rabbitmq:  service rabbitmq-server start/stopjson

安裝API緩存

pip3 install pika
or
easy_install pika
or
源碼
https://pypi.python.org/pypi/pika

使用API操做RabbitMQ服務器

MQ是消費-生產者模型的一個典型的表明,一端往消息隊列中不斷寫入消息,而另外一端則能夠讀取或者訂閱隊列中的消息。下面以消費-生產者模型爲例:網絡

基礎概念

  Exchange:交換機,決定了消息路由規則;
  Queue:消息隊列;
  Channel:進行消息讀寫的通道;
  Bind:綁定了Queue和Exchange,意即爲符合什麼樣路由規則的消息,將會放置入哪個 消息隊列

消息持久

  1) 將交換機置爲可持久;
  2) 將通道置爲可持久
  3) 消息發送時設置可持久。
  當咱們「生產」了一條可持久化的消息,嘗試中斷MQ服務,啓動消費者獲取消息,消息依然可以恢復。相反,則拋出異常。

RabbitMQ 的結構圖:app

  

生產者-消費者模型的簡單實例異步

對於RabbitMQ來講,生產和消費再也不針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。ide

-----------------------------生產者-----------------------------------
# /usr/bin/env python
# -*- coding:utf8 -*-
# auth rain

import pika
# 建立鏈接,鏈接到消息隊列服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
# 建立通道
channel = connection.channel()
# 建立任務隊列
channel.queue_declare(queue='task_queue')
# 發佈消息
# exchange -- 它使咱們可以確切地指定消息應該到哪一個隊列去。
# 向隊列插入數值 routing_key是隊列名 body 是要插入的內容
channel.basic_publish(
    exchange='',
    routing_key='task_queue',
    body='test rabbitMQ'
)
print("[X] sent 'test rabbitMq'")
# 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉連接
connection.close()

# [X] sent 'test rabbitMq'
----------------------------消費者-----------------------------------
# /usr/bin/env python
# -*- coding:utf8 -*-
# auth rain

import pika

# 建立鏈接,鏈接到消息隊列服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
# 建立通道
channel = connection.channel()
# 若是生產者沒有運行建立隊列,那麼消費者也許就找不到隊列了。爲了不這個問題因此消費者也建立這個隊列
channel.queue_declare(queue='task_queue')

# 接收消息須要使用callback這個函數來接收,他會被pika庫來調用
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

# 從隊列取數據 callback是回調函數 若是拿到數據 那麼將執行callback函數
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 永遠循環等待數據處理和callback處理的數據
channel.start_consuming()

'''
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'test rabbitMQ'
[x] Received b'test rabbitMQ'
'''
概念說明:
  Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
  Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
  Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
  Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
  publish:消息生產者,就是投遞消息的程序。
  consumer:消息消費者,就是接受消息的程序。
  channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
消息隊列的使用過程大概以下:
  (1)客戶端鏈接到消息隊列服務器,打開一個channel。
  (2)客戶端聲明一個exchange,並設置相關屬性。
  (3)客戶端聲明一個queue,並設置相關屬性。
  (4)客戶端使用routing key,在exchange和queue之間創建好綁定關係。
  (5)客戶端投遞消息到exchange。
工做隊列(Work Queue)
   循環調度:
    默認來講,RabbitMQ會按順序得把消息發送給每一個消費者(consumer)。平均每一個消費者都會收到同等數量得消息。這種發送消息得方式叫作——輪詢(round-robin)。
    試着添加三個或更多得工做者(workers)。
import pika
import sys
import time

# 建立鏈接,鏈接到消息隊列服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.10.36.101'))
# 建立通道
channel = connection.channel()
# 建立任務隊列
channel.queue_declare(queue='task_queue')
# 發佈消息
# exchange -- 它使咱們可以確切地指定消息應該到哪一個隊列去。
# 向隊列插入數值 routing_key是隊列名 body 是要插入的內容

message = ' '.join(sys.argv[1:]) or "Hello World!"
# 循環發送數據
for i in range(20):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message
    )
    time.sleep(0.5)
    print("[X] sent %s " % message)
# 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉連接
connection.close()
循環發送數據
import pika
import time

# 建立鏈接,鏈接到消息隊列服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
# 建立通道
channel = connection.channel()
# 若是生產者沒有運行建立隊列,那麼消費者也許就找不到隊列了。爲了不這個問題因此消費者也建立這個隊列
channel.queue_declare(queue='task_queue')

# 接收消息須要使用callback這個函數來接收,他會被pika庫來調用
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # time.sleep(body.count('.'))
    # print('[x] Done')

# 從隊列取數據 callback是回調函數 若是拿到數據 那麼將執行callback函數
channel.basic_consume(callback,
                      queue='task_queue',
                      no_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
# 永遠循環等待數據處理和callback處理的數據
channel.start_consuming()
開啓多個消費者

一、acknowledgment 消息不丟失

消息確認

當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否運行到一半就掛掉。當前的代碼中,當消息被RabbitMQ發送給消費者(consumers)以後,立刻就會在內存中移除。這種狀況,你只要把一個工做者(worker)中止,正在處理的消息就會丟失。同時,全部發送到這個工做者的尚未處理的消息都會丟失。

咱們不想丟失任何任務消息。若是一個工做者(worker)掛掉了,咱們但願任務會從新發送給其餘的工做者(worker)。

爲了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)。消費者會經過一個ack(響應),告訴RabbitMQ已經收到並處理了某條消息,而後RabbitMQ就會釋放並刪除這條消息。

若是消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認爲消息沒有被徹底處理,而後從新發送給其餘消費者(consumer)。這樣,及時工做者(workers)偶爾的掛掉,也不會丟失消息。

消息是沒有超時這個概念的;當工做者與它斷開連的時候,RabbitMQ會從新發送消息。這樣在處理一個耗時很是長的消息任務的時候就不會出問題了。

消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。
消息確認基礎講解

no-ack = False,若是消費者遇到狀況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會從新將該任務添加到隊列中。

消息響應默認是開啓的。以前的例子中咱們可使用no_ack=True標識把它關閉。是時候移除這個標識了,當工做者(worker)完成了任務,就發送一個響應。

ch.basic_ack(delivery_tag=method.delivery_tag)
一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。消息在你的程序退出以後就會從新發送,若是它不可以釋放沒響應的消息,RabbitMQ就會佔用愈來愈多的內存。
爲了排除這種錯誤,你可使用rabbitmqctl命令,輸出messages_unacknowledged字段:

  sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

import pika
import sys
import time

# 建立鏈接,鏈接到消息隊列服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='10.10.36.101'))
# 建立通道
channel = connection.channel()
# 建立任務隊列
channel.queue_declare(queue='task_queue')
# 發佈消息
# exchange -- 它使咱們可以確切地指定消息應該到哪一個隊列去。
# 向隊列插入數值 routing_key是隊列名 body 是要插入的內容

message = ' '.join(sys.argv[1:]) or "Hello World!"
# 循環發送數據
for i in range(20):
    channel.basic_publish(
        exchange='',
        routing_key='task_queue',
        body=message
    )
    time.sleep(0.5)
    print("[X] sent %s " % message)
# 緩衝區已經flush並且消息已經確認發送到了RabbitMQ中,關閉連接
connection.close()
生產者 basic_publish
import pika
import time

# 建立鏈接,鏈接到消息隊列服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
# 建立通道
channel = connection.channel()
# 若是生產者沒有運行建立隊列,那麼消費者也許就找不到隊列了。爲了不這個問題因此消費者也建立這個隊列
channel.queue_declare(queue='task_queue')

# 接收消息須要使用callback這個函數來接收,他會被pika庫來調用
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.decode().count('...'))
    print(" [x] Done")

    # 消息不丟失
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 從隊列取數據 callback是回調函數 若是拿到數據 那麼將執行callback函數
channel.basic_consume(callback,
                      queue='task_queue',
                      )

print(' [*] Waiting for messages. To exit press CTRL+C')
# 永遠循環等待數據處理和callback處理的數據
channel.start_consuming()
消費者 basic_consume

二、消息持久化

若是你沒有特地告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失全部隊列和消息。爲了確保信息不會丟失,有兩個事情是須要注意的:咱們必須把「隊列」和「消息」設爲持久化。

首先,爲了避免讓隊列消失,須要把隊列聲明爲持久化(durable):

channel.queue_declare(queue='hello', durable=True)

這個queue_declare必須在生產者(producer)和消費者(consumer)對應的代碼中修改。

這時候,咱們就能夠確保在RabbitMq重啓以後queue_declare隊列不會丟失。另外,咱們須要把咱們的消息也要設爲持久化——將delivery_mode的屬性設爲2

# 生產者端
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # 即便服務端掛了, 消息也能持久化
                      ))

# 消費者端
def callback(ch, method, properties, body):
    print(body)
    time.sleep(body.count())
    ch.basic_ack(delivery_tag=method.delivery_tag)     

注意:消息持久化

將消息設爲持久化並不能徹底保證不會丟失。以上代碼只是告訴了RabbitMq要把消息存到硬盤,但從RabbitMq收到消息到保存之間仍是有一個很小的間隔時間。由於RabbitMq並非全部的消息都使用fsync(2)——它有可能只是保存到緩存中,並不必定會寫到硬盤中。並不能保證真正的持久化,但已經足夠應付咱們的簡單工做隊列。若是你必定要保證持久化,你須要改寫你的代碼來支持事務(transaction)。

三、公平調度

你應該已經發現,它仍舊沒有按照咱們指望的那樣進行分發。好比有兩個工做者(workers),處理奇數消息的比較繁忙,處理偶數消息的比較輕鬆。然而RabbitMQ並不知道這些,它仍然一如既往的派發消息。

這時由於RabbitMQ只管分發進入隊列的消息,不會關心有多少消費者(consumer)沒有做出響應。它盲目的把第n-th條消息發給第n-th個消費者

 

咱們可使用basic.qos方法,並設置prefetch_count=1。這樣是告訴RabbitMQ,再同一時刻,不要發送超過1條消息給一個工做者(worker),直到它已經處理了上一條消息而且做出了響應。這樣,RabbitMQ就會把消息分發給下一個空閒的工做者(worker)。

channel.basic_qos(prefetch_count=1)

注意:關於隊列大小

若是全部的工做者都處理繁忙狀態,你的隊列就會被填滿。你須要留意這個問題,要麼添加更多的工做者(workers),要麼使用其餘策略。

----------------------------------生產者--------------------------------------------
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

# 隊列持久化 durable=True
channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2,   # 消息持久化
                      ))
print " [x] Sent %r" % (message,)
connection.close()
 
 
----------------------------------消費者--------------------------------------------

#
!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() # 隊列持久化 durable=True channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()

四、發佈/訂閱

  分發一個消息給多個消費者(consumers)。這種模式被稱爲「發佈/訂閱」。

爲了描述這種模式,咱們將會構建一個簡單的日誌系統。它包括兩個程序——第一個程序負責發送日誌消息,第二個程序負責獲取消息並輸出內容。

在咱們的這個日誌系統中,全部正在運行的接收方程序都會接受消息。咱們用其中一個接收者(receiver)把日誌寫入硬盤中,另一個接受者(receiver)把日誌輸出到屏幕上。

最終,日誌消息被廣播給全部的接受者(receivers)。

交換機(Exchanges)

RabbitMQ消息模型的核心理念是:發佈者(producer)不會直接發送任何消息給隊列。事實上,發佈者(producer)甚至不知道消息是否已經被投遞到隊列。

發佈者(producer)只須要把消息發送給一個交換機(exchange)。交換機很是簡單,它一邊從發佈者方接收消息,一邊把消息推送到隊列。交換機必須知道如何處理它接收到的消息,是應該推送到指定的隊列仍是是多個隊列,或者是直接忽略消息。這些規則是經過交換機類型(exchange type)來定義的。

交換機類型:

  直連交換機(direct),

  主題交換機(topic),

  頭交換機  (headers),

  扇型交換機(fanout)。

 

channel.exchange_declare(exchange='logs',
                         type='fanout')

扇型交換機(fanout)

  扇型交換機(fanout)很簡單,你可能從名字上就能猜想出來,它把消息發送給它所知道的全部隊列。這正是咱們的日誌系統所須要的。

匿名的交換器

前面的教程中咱們對交換機一無所知,但仍然可以發送消息到隊列中。由於咱們使用了命名爲空字符串("")默認的交換機。

回想咱們以前是如何發佈一則消息:

channel.basic_publish(exchange='', routing_key='hello', body=message)

exchange參數就是交換機的名稱。空字符串表明默認或者匿名交換機:消息將會根據指定的routing_key分發到指定的隊列。

如今,咱們就能夠發送消息到一個具名交換機了:

channel.basic_publish(exchange='logs', routing_key='', body=message)

臨時隊列

第一步, 當咱們鏈接上RabbitMQ的時候,咱們須要一個全新的、空的隊列。咱們能夠手動建立一個隨機的隊列名,或者讓服務器爲咱們選擇一個隨機的隊列名(推薦)。咱們只須要在調用queue_declare方法的時候,不提供queue參數就能夠了:

  result = channel.queue_declare()

這時候咱們能夠經過result.method.queue得到已經生成的隨機隊列名。它多是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。

第二步,當與消費者(consumer)斷開鏈接的時候,這個隊列應當被當即刪除。exclusive標識符便可達到此目的。

  result = channel.queue_declare(exclusive=True)

 

綁定(Bindings)

咱們已經建立了一個扇型交換機(fanout)和一個隊列。如今咱們須要告訴交換機如何發送消息給咱們的隊列。交換器和隊列之間的聯繫咱們稱之爲綁定(binding)。

  channel.queue_bind(exchange='logs', queue=result.method.queue)

如今,logs交換機將會把消息添加到咱們的隊列中。

綁定(binding)列表

你可使用rabbitmqctl list_bindings 列出全部現存的綁定。

一個發送日誌的實例

#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
channel = connection.channel()

# 指定exchange,其類型爲fanout
channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ''.join(sys.argv[1:] or 'hello world')
# 發送給指定的exchange
channel.publish(
    exchange='logs',
    routing_key='',
    body='message'
)

print(" [x] Sent %r" % (message,))

connection.close()
發佈者
#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
channel = connection.channel()

# 指定exchange,其類型爲fanout
channel.exchange_declare(exchange='logs',
                         type='fanout')

# 建立臨時(queue)隊列
result = channel.queue_declare(exclusive=True)

queue_name = result.method.queue


# 將queue綁定到指定的exchange上
channel.queue_bind(exchange='logs', queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % (body,))


channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True
                      )

channel.start_consuming()
訂閱者

直連交換機(Direct exchange)

綁定(binding)是指交換機(exchange)和隊列(queue)的關係。能夠簡單理解爲:這個隊列(queue)對這個交換機(exchange)的消息感興趣。

綁定的時候能夠帶上一個額外的routing_key參數。爲了不與basic_publish的參數混淆,咱們把它叫作綁定鍵(binding key)。如下是如何建立一個帶綁定鍵的綁定。

channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')

綁定鍵的意義取決於交換機(exchange)的類型。咱們以前使用過的扇型交換機(fanout exchanges)會忽略這個值

應用場景:  

咱們的日誌系統廣播全部的消息給全部的消費者(consumers)。咱們打算擴展它,使其基於日誌的嚴重程度進行消息過濾。例如咱們也許只是但願將比較嚴重的錯誤(error)日誌寫入磁盤,以避免在警告(warning)或者信息(info)日誌上浪費磁盤空間。

咱們使用的扇型交換機(fanout exchange)沒有足夠的靈活性 —— 它能作的僅僅是廣播。

咱們將會使用直連交換機(direct exchange)來代替。路由的算法很簡單 —— 交換機將會對綁定鍵(binding key)和路由鍵(routing key)進行精確匹配,從而肯定消息該分發到哪一個隊列。

 

 下圖可以很好的描述這個場景:

在這個場景中,咱們能夠看到直連交換機 X和兩個隊列進行了綁定。第一個隊列使用orange做爲綁定鍵,第二個隊列有兩個綁定,一個使用black做爲綁定鍵,另一個使用green。

這樣以來,當路由鍵爲orange的消息發佈到交換機,就會被路由到隊列Q1。路由鍵爲black或者green的消息就會路由到Q2。其餘的全部消息都將會被丟棄。

多個綁定(Multiple bindings)

多個隊列使用相同的綁定鍵是合法的。這個例子中,咱們能夠添加一個X和Q1之間的綁定,使用black綁定鍵。這樣一來,直連交換機就和扇型交換機的行爲同樣,會將消息廣播到全部匹配的隊列。帶有black路由鍵的消息會同時發送到Q1和Q2。

發送日誌

咱們將會發送消息到一個直連交換機,把日誌級別做爲路由鍵。這樣接收日誌的腳本就能夠根據嚴重級別來選擇它想要處理的日誌。咱們先看看發送日誌。

severity = ['info','warning','error']
咱們須要建立一個交換機(exchange):

channel.exchange_declare(exchange='direct_logs',
                         type='direct')
而後咱們發送一則消息:

channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
咱們先假設「severity」的值是info、warning、error中的一個。
訂閱

處理接收消息的方式和以前差很少,只有一個例外,咱們將會爲咱們感興趣的每一個嚴重級別分別建立一個新的綁定。
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                      routing_key=severity)

 

1.生產者:

#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))  
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

 2.消費者:

#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

 主題交換機(topic exchange)

發送到主題交換機(topic exchange)的消息不能夠攜帶隨意什麼樣子的路由鍵(routing_key),它的路由鍵必須是一個由.分隔開的詞語列表。這些單詞隨即是什麼均可以,可是最好是跟攜帶它們的消息有關係的詞彙。如下是幾個推薦的例子:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。詞語的個數能夠隨意,可是不要超過255字節。

綁定鍵也必須擁有一樣的格式。主題交換機背後的邏輯跟直連交換機很類似 —— 一個攜帶着特定路由鍵的消息會被主題交換機投遞給綁定鍵與之想匹配的隊列。可是它的綁定鍵和路由鍵有兩個特殊應用方式:

  • * (星號) 用來表示一個單詞.
  • # (井號) 用來表示任意數量(零個或多個)單詞。

 

這個例子裏,咱們發送的全部消息都是用來描述小動物的。發送的消息所攜帶的路由鍵是由三個單詞所組成的,這三個單詞被兩個 . 分割開。路由鍵裏的第一個單詞描述的是動物的手腳的利索程度,第二個單詞是動物的顏色,第三個是動物的種類。因此它看起來是這樣的:  <celerity>.<colour>.<species> 。

咱們建立了三個綁定:Q1的綁定鍵爲  *.orange.* ,Q2的綁定鍵爲  *.*.rabbit  和  lazy.#

這三個綁定鍵被能夠總結爲:
•Q1 對全部的桔黃色動物都感興趣。
•Q2 則是對全部的兔子和全部懶惰的動物感興趣。

一個攜帶有  quick.orange.rabbit  的消息將會被分別投遞給這兩個隊列。攜帶着  lazy.orange.elephant  的消息一樣也會給兩個隊列都投遞過去。另外一方面攜帶有  quick.orange.fox  的消息會投遞給第一個隊列,攜帶有  lazy.brown.fox  的消息會投遞給第二個隊列。攜帶有  lazy.pink.rabbit  的消息只會被投遞給第二個隊列一次,即便它同時匹配第二個隊列的兩個綁定。攜帶着  quick.brown.fox  的消息不會投遞給任何一個隊列。

若是咱們違反約定,發送了一個攜帶有一個單詞或者四個單詞( "orange"  or  "quick.orange.male.rabbit" )的消息時,發送的消息不會投遞給任何一個隊列,並且會丟失掉。

可是另外一方面,即便  "lazy.orange.male.rabbit"  有四個單詞,他仍是會匹配最後一個綁定,而且被投遞到第二個隊列中。
關於topic exchange
主題交換機

主題交換機是很強大的,它能夠表現出跟其餘交換機相似的行爲

當一個隊列的綁定鍵爲 "#"(井號) 的時候,這個隊列將會無視消息的路由鍵,接收全部的消息。

當  *  (星號) 和  #  (井號) 這兩個特殊字符都未在綁定鍵中出現的時候,此時主題交換機就擁有的直連交換機的行爲。
#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import sys
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
topic-rabbitmq-server
#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()
topic-rabbitmq-client
執行下邊命令 接收全部日誌:
 python receive_logs_topic.py "#" 

執行下邊命令 接收來自」kern「設備的日誌:
 python receive_logs_topic.py "kern.*" 

執行下邊命令 只接收嚴重程度爲」critical「的日誌:
 python receive_logs_topic.py "*.critical" 

執行下邊命令 創建多個綁定:
 python receive_logs_topic.py "kern.*" "*.critical" 

執行下邊命令 發送路由鍵爲 "kern.critical" 的日誌:
 python emit_log_topic.py "kern.critical" "A critical kernel error" 

執行上邊命令試試看效果吧。另外,上邊代碼不會對路由鍵和綁定鍵作任何假設,因此你能夠在命令中使用超過兩個路由鍵參數。

 遠程過程調用(RPC Remote Procedure Call)

 使用RabbitMQ來構建一個RPC系統:包含一個客戶端和一個RPC服務器。

消息屬性

AMQP協議給消息預約義了一系列的14個屬性。大多數屬性不多會用到,除了如下幾個:

  • delivery_mode(投遞模式):將消息標記爲持久的(值爲2)或暫存的(除了2以外的其餘任何值)。第二篇教程裏接觸過這個屬性,記得吧?
  • content_type(內容類型):用來描述編碼的mime-type。例如在實際使用中經常使用application/json來描述JOSN編碼類型。
  • reply_to(回覆目標):一般用來命名回調隊列。
  • correlation_id(關聯標識):用來將RPC的響應和請求關聯起來。
關聯標識
咱們建議給每個RPC請求新建一個回調隊列。這不是一個高效的作法,幸虧這兒有一個更好的辦法 —— 咱們能夠爲每一個客戶端只創建一個獨立的回調隊列。

這就帶來一個新問題,當此隊列接收到一個響應的時候它沒法辨別出這個響應是屬於哪一個請求的。correlation_id 就是爲了解決這個問題而來的。咱們給每一個請求設置一個獨一無二的值。稍後,當咱們從回調隊列中接收到一個消息的時候,咱們就能夠查看這條屬性從而將響應和請求匹配起來。若是咱們接手到的消息的correlation_id是未知的,那就直接銷燬掉它,由於它不屬於咱們的任何一條請求。

你也許會問,爲何咱們接收到未知消息的時候不拋出一個錯誤,而是要將它忽略掉?這是爲了解決服務器端有可能發生的競爭狀況。儘管可能性不大,但RPC服務器仍是有可能在已將應答發送給咱們但還未將確認消息發送給請求的狀況下死掉。若是這種狀況發生,RPC在重啓後會從新處理請求。這就是爲何咱們必須在客戶端優雅的處理重複響應,同時RPC也須要儘量保持冪等性。
關聯標識

RPC工做流程:

  • 當客戶端啓動的時候,它建立一個匿名獨享的回調隊列。
  • 在RPC請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另外一個是設置惟一值的 correlation_id 屬性。
  • 將請求發送到一個 rpc_queue 隊列中。
  • RPC工做者(又名:服務器)等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工做而且將帶有執行結果的消息發送給reply_to字段指定的隊列。
  • 客戶端等待回調隊列裏的數據。當有消息出現的時候,它會檢查correlation_id屬性。若是此屬性的值與請求匹配,將它返回給應用。
#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print("[.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(
                         correlation_id=props.correlation_id,),
                     body=str(response)
                     )

    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()
rpc_server
#!/bin/bin/env python
# -*-coding:utf-8 -*-
# Author : rain


import pika
import uuid

class FibonacciRpcClient:
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.10.36.101'))
        self.channel = self.connection.channel()
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % (response,))
rpm_client.py

此處呈現的設計並非實現RPC服務的惟一方式,可是他有一些重要的優點:

  • 若是RPC服務器運行的過慢的時候,你能夠經過運行另一個服務器端輕鬆擴展它。試試在控制檯中運行第二個 rpc_server.py 。
  • 在客戶端,RPC請求只發送或接收一條消息。不須要像 queue_declare 這樣的異步調用。因此RPC客戶端的單個請求只須要一個網絡往返。

咱們的代碼依舊很是簡單,並且沒有試圖去解決一些複雜(可是重要)的問題,如:

  • 當沒有服務器運行時,客戶端如何做出反映。
  • 客戶端是否須要實現相似RPC超時的東西。
  • 若是服務器發生故障,而且拋出異常,應該被轉發到客戶端嗎?
  • 在處理前,防止混入無效的信息(例如檢查邊界)

rabbitMQ中文文檔: http://rabbitmq.mr-ping.com/tutorials_with_python/[1]Hello_World.html

相關文章
相關標籤/搜索