探索 OpenStack 之(14):OpenStack 中 RabbitMQ 的使用

本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什麼。第二部分將介紹其在 OpenStack 中的使用。html

 

1 RabbitMQ 的基本概念

RabbitMQ 是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。node

AMQP 是一個定義了在應用或者組織之間傳送消息的協議的開放標準 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送消息存在的下列問題:python

  • 網絡是不可靠的 =>消息須要保存後再轉發並有出錯處理機制
  • 與本地調用相比,網絡速度慢 =>得異步調用
  • 應用之間是不一樣的(好比不一樣語言實現、不一樣操做系統等) =>得與應用無關
  • 應用會常常變化 =>同上

AMQP 使用異步的、應用對應用的、二進制數據通訊來解決這些問題。git

RabbitMQ 是 AMQP 的一種實現,它包括Server (服務器端)、Client (客戶端) 和 Plugins (插件)。RabbitMQ 服務器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)發佈的 3.4.4,而 OpenStack Juno 中使用的 Server 是 2014年3月發佈的 3.2.4 版本。如今 RabbitMQ 支持的 AMQP 版本依然是0.9.1。github

1.1 RabbitMQ 的概念很是清晰、簡潔

其基本概念參見下圖:編程

 

RabbitMQ 官網 和其它網站上有不少文章來描述其基本概念。簡單說明以下:服務器

  • Message (消息):RabbitMQ 轉發的二進制對象,包括Headers(頭)、Properties (屬性)和 Data (數據),其中數據部分不是必要的。具體見 1.2 部分的描述。
  • Producer(生產者): 消息的生產者,負責產生消息並把消息發到交換機 Exhange的應用。
  • Consumer (消費者):使用隊列 Queue 從 Exchange 中獲取消息的應用。
  • Exchange (交換機):負責接收生產者的消息並把它轉到到合適的隊列 Queue 。下面有 1.3 部分描述。
  • Queue (隊列):一個存儲Exchange 發來的消息的緩衝,並將消息主動發送給Consumer,或者 Consumer 主動來獲取消息。見 1.4 部分的描述。網絡

  • Binding (綁定):隊列 和 交換機 之間的關係。Exchange 根據消息的屬性和 Binding 的屬性來轉發消息。綁定的一個重要屬性是 binding_key。
  • Connection (鏈接)和 Channel (通道):生產者和消費者須要和 RabbitMQ 創建 TCP 鏈接。一些應用須要多個connection,爲了節省TCP 鏈接,可使用 Channel,它能夠被認爲是一種輕型的共享 TCP 鏈接的鏈接。鏈接須要用戶認證,而且支持 TLS (SSL)。鏈接須要顯式關閉。
  • Virtual Host (虛擬主機) :RabbitMQ 用來進行資源隔離的機制。一個虛機主機會隔離用戶、exchange、queue 等。默認的虛擬主機爲 "/"。

1.2 關於消息 message

消息結構:app

消息的幾個重要屬性:異步

  • routing_key:Direct 和 Topic 類型的 exchange 會根據本屬性來轉發消息。
  • delivery_mode: 將其值設置爲 2 將用於消息的持久化,持久化的消息會被保存到磁盤上來防止其丟失。下面章節 3 有描述。

  • reply_to:通常用來表示RPC實現中客戶端的回調隊列的名字。下面章節 4 有描述。
  • correlation_id:用於使用 RabbitMQ 來實現 RPC的情形。下面章節 4 有描述。
  • content_type:表示消息data的編碼格式名稱。實際上RabbitMQ只負責原樣傳送消息所以不會使用該屬性,該屬性只被 Publisher 和 Consumer 使用。

消息的確認/刪除機制:

Consumer 處理消息可能會失敗,那麼 RabbitMQ 怎麼知道何時來刪除 queue 中的消息呢?它使用兩種機制:

  • 當 RabbitMQ 主動將消息發給 Consumer 之後,它會刪除消息
  • 當 Consumer 發回一個確認後,RabbitMQ 會刪除消息。

第二種狀況下,若是 RabbitMQ 沒收到確認,它會把消息從新放進隊列(re-queued)並添加標識 'redelivered' 代表該消息以前已經發送過 ,若是沒有Consumer的話,消息將保持到有下一個 Consumer 爲止。

Consumer 能夠主動告訴 RabbitMQ 消息處理失敗了(拒絕消息),並告知RabbitMQ 是刪除消息仍是從新放進隊列。

1.3 exchange 交換機

exchange 有幾個重要的屬性:

  • Name 名字:交換機名字。空字符串名字的exchange爲默認的exchange。
  • Type 類型:Direct, Fanout, Topic, Headers。類型決定 exchange 的消息轉發能力。下面 章節2 有描述。
  • durable:值爲 True/False。值爲 true 的 exchange 在 rabbitmq 重啓後會被自動建立。OpenStack 使用的exchange的該值都爲false。
  • auto_delete:值爲 True/False。設置爲 true 的話,當全部消費者的鏈接都關閉後,該 exchange 會被自動刪除。OpenStack 使用的exchange的該值都爲false。
  • exclusive:值爲 True/False。設置爲 true 的話,該 exchange 只容許被建立的connection使用,而且在該 connection 關閉後它會被自動刪除。

RabbitMQ 默認會爲每一種類型生成一個或者兩個的默認的 exchange:

  • Fanout 類型:名字爲 amq.fanout
  • Topic 類型: 名字爲 amq.topic
  • Headers 類型:名字爲 amq.match 和 amq.headers
  • Direct 類型:名字爲空字符串的exchange 以及 amq.direct。其中名字爲空的exchange比較特殊。在一個 Queue 被建立後,RabbitMQ 會自動創建它和該 exchange 之間的binding,而且設置其 binding_key 爲該queue 的名字。這樣,該語句 channel.basic_publish(exchange=''routing_key='hello',             body=message) 會讓該默認的 exchange 將該 message 轉發到名字爲 'hello' 的隊列中。

1.4 隊列 Queue

隊列一樣有相似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,而且含義相同。

Exchange 會將消息分發(copy)到符合要求的全部隊列中。

Consumer 能夠主動獲取或者被動接受Queue裏面的消息:

一個 Queue 容許有多個 Consumer,好比利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,消息會在這些 Consumer 之間根據 channel 的 prefetch level 作分發(請參見AQMP: QoS or message prefetching),若是該值同樣的話,消息會被平均分發給這些Consumer。

1.5 rabbitmqctl  Cli

RabbitMQ 提供Cli  rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。經常使用到的命令有:

  • stop/start_app
  • add/delete/list_vhosts
  • list_queues/exchanges/bindings/connections/channels
  • trace_on/off

2 消息轉發機制

Exchange 根據它自身的類型 type、消息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發消息。

Exchange 的類型 Type 使用的消息屬性 使用的Binding 屬性 轉發模式
Fanout - (忽略消息的轉發屬性) - (忽略binding的轉發屬性)

Exchange 將消息轉發到全部與它有 binding 關係的隊列中。

這種方法轉發效率較高。OpenStack 大量使用這種類型的 exchange。

Direct routing_key (任意的字符串,好比 "abc") binding_key (任意的字符串,好比 "abc") Exchange 只將消息轉到 binding 的 binding_key 等於消息的 routing_key 的隊列中。
Topic routing_key (以 "." 分割的多單詞字符串,好比 abc.efg.hij) binding_key (包含 "#" 和 "*" 的以 「.」 分割的多單詞字符串,好比 *.efg.*)

Exchange 只將消息轉到消息的 routing_key 和 binding 的 binding_key 匹配的隊列中。匹配規則以下:

(1)二者以"."分割的單詞數目相同

(2)"*"可表明一個單詞

(3)"#「可表明零個或多個單詞

Headers headers (消息頭) binding_key

Exchange 只將消息轉到消息的 headers 和 binding 的 binding_key 匹配的隊列中。匹配規則待研究。

OpenStack不使用該類型的exchange。

參考文檔:

https://www.rabbitmq.com/getstarted.html 這裏有詳細的闡述和示例源代碼。

http://www.cnblogs.com/starof/p/4173413.html 這裏有官網文檔的中文版。

3 持久化

消息的持久化意味着在 RabbitMQ 被重啓後,消息依然還在。要實現持久化,得實現幾個相關組件的持久化:

(1). 交換機的持久化,須要將其 durable 屬性設爲 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)

(2). 隊列的持久化,須要將其 durable 屬性設置爲 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)

(3). 消息的持久化,須要將其 Delivery Mode 屬性設置成2 。msg.properties["delivery_mode"] = 2

4 RPC

可使用 RabbitMQ 來實現 RPC 機制,這裏說說其實現原理:

過程:

(1). 客戶端 Client 設置消息的 routing key 爲 Service 的隊列 op_q;設置消息的 reply-to 屬性爲返回的 response 的目標隊列 reponse_q,設置其 correlation_id 爲以隨機UUID,而後將消息發到 exchange。好比 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)

(2). Exchange 將消息轉發到 Service 的 op_q

(3). Service 收到該消息後進行處理,而後將response 發到 exchange,並設置消息的 routing_key 爲原消息的 reply_to 屬性,以及設置其 correlation_id 爲原消息的 correlation_id 。

ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))

(4). Exchange 將消息轉發到 reponse_q

(5). Client 逐一接受 response_q 中的消息,檢查消息的 correlation_id 是否爲等於它發出的消息的correlation_id,是的話代表該消息爲它須要的response。

 這裏有詳細的闡述。

5 Python AMQP SDK

經常使用的Python AMQP SDK包括:

5.1 一個簡單的使用 py-amqplib 的 Consumer 實現

#建立Connection和Channel鏈接到 RabbitMQ 服務器
conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False)
chan = conn.channel()

#建立 queue
result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False)

#建立 exchange
result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,)

#建立 binding
result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug")

#回調函數,當有 message 到達 queue 後,該函數會被調用
def recv_callback(msg):
    print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
#
lChannel.basic_ack(msg.delivery_tag) #若是no_ack=False的話,能夠須要發回一個確認

#啓動一個 consumer,consumer_tag 是該 consumer 的一個惟一標識符
#
no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有消息發到 queue while True: chan.wait()

#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()

5.2 一個簡單的使用 py-amqplib 的 Producer 實現代碼

from amqplib import client_0_8 as amqp
import sys

#建立 connection 和 channel conn
= amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#建立 message msg
= amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2

#發送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel
chan.close() conn.close()

5.3 使用 pika

5.3.1 安裝 pika

 

wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c  
tar zxvf pika-0.9.14.tar.gz  
cd pika-0.9.14
python setup.py install

 

5.3.2 使用 pika 編程(來源

#!/usr/bin/env python
'''
rabbitmq trace scripts.
require (rabbitmq_tracing):
    $ sudo rabbitmq-plugins enable rabbitmq_tracing
usage:
    $ sudo rabbitmqctl trace_on
    $ ./rabbitmqtrace.py
    << output >>
'''
import sys
import time
from optparse import OptionParser
import pika

__AUTHOR__  = 'smallfish'
__VERSION__ = '0.0.1'

def _out(args):
    print time.strftime('%Y-%m-%d %H:%M:%S'), args

def _run(host, port, vhost, user, password):
    conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost,
        credentials=pika.PlainCredentials(user, password)))
    chan = conn.channel()
    def _on_message(ch, method, properties, body):
        ret = {}
        ret['routing_key'] = method.routing_key
        ret['headers'] = properties.headers
        ret['body'] = body
        _out(ret)
    _out('start subscribe amq.rabbitmq.trace')
    ret = chan.queue_declare(exclusive=False, auto_delete=True)
    queue = ret.method.queue
    chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#')
    chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#')
    chan.basic_consume(_on_message, queue=queue, no_ack=True)
    chan.start_consuming()

def main():
    parser = OptionParser('usage: %prog')
    parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default')
    parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default')
    parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default')
    parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default')
    parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default')
    (options, args) = parser.parse_args()
    _run(options.host, options.port, options.vhost, options.user, options.password)

if __name__ == '__main__':
    main()

 

6 插件和消息追蹤

RabbitMQ 支持使用插件來支持 Management, Federation, Shovel  和 STOMP。全部的插件都在這裏

6.1 rabbitmq-management 插件

它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還能夠經過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。

6.2 RabbitMQ 的 firehose 機制

該機制提供了一個查看被轉發的消息的途徑。當打開 firehose 的時候,RabbitMQ 會自動創建 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你能夠編程建立queue 從這兩個 exchange 裏面獲取 trace 和 log,從而觀察每個被處理的消息。這裏有一個開源代碼實現

6.3 rabbitmq_tracing 插件

rabbitmq_tracing 插件在 management 插件增長了消息追蹤的方法,它是從 firehose 中獲取數據。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你能夠在 management GUI  中追蹤消息:

 

自此,RabbitMQ 基本上算熟悉了,接下來能夠開始分析 OpenStack 中是如何使用 RabbitMQ 了。

相關文章
相關標籤/搜索