本文是 OpenStack 中的 RabbitMQ 使用研究 兩部分中的第一部分,將介紹 RabbitMQ 的基本概念,即 RabbitMQ 是什麼。第二部分將介紹其在 OpenStack 中的使用。html
RabbitMQ 是實現了高級消息隊列協議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。node
AMQP 是一個定義了在應用或者組織之間傳送消息的協議的開放標準 (an open standard for passing business messages between applications or organizations),它最新的版本是 1.0。AMQP 目標在於解決在兩個應用之間傳送消息存在的下列問題:python
AMQP 使用異步的、應用對應用的、二進制數據通訊來解決這些問題。git
RabbitMQ 是 AMQP 的一種實現,它包括Server (服務器端)、Client (客戶端) 和 Plugins (插件)。RabbitMQ 服務器是用 Erlang 語言編寫的,其最新版本是剛剛(2015/02/11)發佈的 3.4.4,而 OpenStack Juno 中使用的 Server 是 2014年3月發佈的 3.2.4 版本。如今 RabbitMQ 支持的 AMQP 版本依然是0.9.1。github
其基本概念參見下圖:編程
RabbitMQ 官網 和其它網站上有不少文章來描述其基本概念。簡單說明以下:服務器
Queue (隊列):一個存儲Exchange 發來的消息的緩衝,並將消息主動發送給Consumer,或者 Consumer 主動來獲取消息。見 1.4 部分的描述。網絡
消息結構:app
消息的幾個重要屬性:異步
delivery_mode: 將其值設置爲 2 將用於消息的持久化,持久化的消息會被保存到磁盤上來防止其丟失。下面章節 3 有描述。
消息的確認/刪除機制:
Consumer 處理消息可能會失敗,那麼 RabbitMQ 怎麼知道何時來刪除 queue 中的消息呢?它使用兩種機制:
第二種狀況下,若是 RabbitMQ 沒收到確認,它會把消息從新放進隊列(re-queued)並添加標識 'redelivered' 代表該消息以前已經發送過 ,若是沒有Consumer的話,消息將保持到有下一個 Consumer 爲止。
Consumer 能夠主動告訴 RabbitMQ 消息處理失敗了(拒絕消息),並告知RabbitMQ 是刪除消息仍是從新放進隊列。
exchange 有幾個重要的屬性:
RabbitMQ 默認會爲每一種類型生成一個或者兩個的默認的 exchange:
隊列一樣有相似於 exchange 的 name、durable、auto_delete 和 exclusive 等屬性,而且含義相同。
Exchange 會將消息分發(copy)到符合要求的全部隊列中。
Consumer 能夠主動獲取或者被動接受Queue裏面的消息:
一個 Queue 容許有多個 Consumer,好比利用 RabbitMQ 來實現一個簡單的 load balancer。這時候,消息會在這些 Consumer 之間根據 channel 的 prefetch level 作分發(請參見AQMP: QoS or message prefetching),若是該值同樣的話,消息會被平均分發給這些Consumer。
RabbitMQ 提供Cli rabbitmqctl [-n <node>] [-q] <command> [<command options>] 來進行管理和配置。經常使用到的命令有:
Exchange 根據它自身的類型 type、消息的屬性 routing_key 或者 headers,以及 Binding 的屬性 binding_key 來轉發消息。
Exchange 的類型 Type | 使用的消息屬性 | 使用的Binding 屬性 | 轉發模式 |
Fanout | - (忽略消息的轉發屬性) | - (忽略binding的轉發屬性) | Exchange 將消息轉發到全部與它有 binding 關係的隊列中。 這種方法轉發效率較高。OpenStack 大量使用這種類型的 exchange。 |
Direct | routing_key (任意的字符串,好比 "abc") | binding_key (任意的字符串,好比 "abc") | Exchange 只將消息轉到 binding 的 binding_key 等於消息的 routing_key 的隊列中。 |
Topic | routing_key (以 "." 分割的多單詞字符串,好比 abc.efg.hij) | binding_key (包含 "#" 和 "*" 的以 「.」 分割的多單詞字符串,好比 *.efg.*) | Exchange 只將消息轉到消息的 routing_key 和 binding 的 binding_key 匹配的隊列中。匹配規則以下: (1)二者以"."分割的單詞數目相同 (2)"*"可表明一個單詞 (3)"#「可表明零個或多個單詞 |
Headers | headers (消息頭) | binding_key | Exchange 只將消息轉到消息的 headers 和 binding 的 binding_key 匹配的隊列中。匹配規則待研究。 OpenStack不使用該類型的exchange。 |
參考文檔:
https://www.rabbitmq.com/getstarted.html 這裏有詳細的闡述和示例源代碼。
http://www.cnblogs.com/starof/p/4173413.html 這裏有官網文檔的中文版。
消息的持久化意味着在 RabbitMQ 被重啓後,消息依然還在。要實現持久化,得實現幾個相關組件的持久化:
(1). 交換機的持久化,須要將其 durable 屬性設爲 true。chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
(2). 隊列的持久化,須要將其 durable 屬性設置爲 true。chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False)
可使用 RabbitMQ 來實現 RPC 機制,這裏說說其實現原理:
過程:
(1). 客戶端 Client 設置消息的 routing key 爲 Service 的隊列 op_q;設置消息的 reply-to 屬性爲返回的 response 的目標隊列 reponse_q,設置其 correlation_id 爲以隨機UUID,而後將消息發到 exchange。好比 channel.basic_publish(exchange='', routing_key='op_q', properties=pika.BasicProperties(reply_to = reponse_q, correlation_id = self.corr_id),body=request)
(2). Exchange 將消息轉發到 Service 的 op_q
(3). Service 收到該消息後進行處理,而後將response 發到 exchange,並設置消息的 routing_key 爲原消息的 reply_to 屬性,以及設置其 correlation_id 爲原消息的 correlation_id 。
ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = props.correlation_id), body=str(response))
(4). Exchange 將消息轉發到 reponse_q
(5). Client 逐一接受 response_q 中的消息,檢查消息的 correlation_id 是否爲等於它發出的消息的correlation_id,是的話代表該消息爲它須要的response。
這裏有詳細的闡述。
經常使用的Python AMQP SDK包括:
#建立Connection和Channel鏈接到 RabbitMQ 服務器 conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel() #建立 queue result = chan.queue_declare(queue="debug", durable=True, exclusive=False, auto_delete=False) #建立 exchange result = chan.exchange_declare(exchange="sorting_room2", type="topic", durable=True, auto_delete=False,) #建立 binding result = chan.queue_bind(queue="debug", exchange="sorting_room2", routing_key="*.debug") #回調函數,當有 message 到達 queue 後,該函數會被調用 def recv_callback(msg): print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)
# lChannel.basic_ack(msg.delivery_tag) #若是no_ack=False的話,能夠須要發回一個確認
#啓動一個 consumer,consumer_tag 是該 consumer 的一個惟一標識符
#no_ack = True 表示該 consumer 不會發回確認
chan.basic_consume(queue='debug', no_ack=True, callback=recv_callback, consumer_tag="debugtag")
#等待有消息發到 queue while True: chan.wait()
#終止該 consumer chan.basic_cancel("testtag") #關閉 connection 和 channel chan.close() conn.close()
from amqplib import client_0_8 as amqp import sys
#建立 connection 和 channel conn = amqp.Connection(host="localhost:5672", userid="guest", password="1111", virtual_host="/", insist=False) chan = conn.channel()
#建立 message msg = amqp.Message(sys.argv[1]) msg.properties["delivery_mode"] = 2
#發送 message chan.basic_publish(msg,exchange="sorting_room2",routing_key=(sys.argv[2]))
#關閉 connection 和 channel chan.close() conn.close()
wget https://pypi.python.org/packages/source/p/pika/pika-0.9.14.tar.gz #md5=b99aad4b88961d3c7e4876b8327fc97c tar zxvf pika-0.9.14.tar.gz cd pika-0.9.14 python setup.py install
#!/usr/bin/env python ''' rabbitmq trace scripts. require (rabbitmq_tracing): $ sudo rabbitmq-plugins enable rabbitmq_tracing usage: $ sudo rabbitmqctl trace_on $ ./rabbitmqtrace.py << output >> ''' import sys import time from optparse import OptionParser import pika __AUTHOR__ = 'smallfish' __VERSION__ = '0.0.1' def _out(args): print time.strftime('%Y-%m-%d %H:%M:%S'), args def _run(host, port, vhost, user, password): conn = pika.BlockingConnection(pika.ConnectionParameters(host=host, port=port, virtual_host=vhost, credentials=pika.PlainCredentials(user, password))) chan = conn.channel() def _on_message(ch, method, properties, body): ret = {} ret['routing_key'] = method.routing_key ret['headers'] = properties.headers ret['body'] = body _out(ret) _out('start subscribe amq.rabbitmq.trace') ret = chan.queue_declare(exclusive=False, auto_delete=True) queue = ret.method.queue chan.queue_bind(exchange='amq.rabbitmq.trace', queue=queue, routing_key='#') chan.queue_bind(exchange='amq.rabbitmq.log', queue=queue, routing_key='#') chan.basic_consume(_on_message, queue=queue, no_ack=True) chan.start_consuming() def main(): parser = OptionParser('usage: %prog') parser.add_option('', '--host', metavar='host', default='localhost', help='rabbitmq host address, default: %default') parser.add_option('', '--port', metavar='port', default=5672, type='int', help='rabbitmq port, default: %default') parser.add_option('', '--vhost', metavar='vhost', default='/', help='rabbitmq vhost, default: %default') parser.add_option('', '--user', metavar='user', default='guest', help='rabbitmq user, default: %default') parser.add_option('', '--password', metavar='password', default='guest', help='rabbitmq password, default: %default') (options, args) = parser.parse_args() _run(options.host, options.port, options.vhost, options.user, options.password) if __name__ == '__main__': main()
RabbitMQ 支持使用插件來支持 Management, Federation, Shovel 和 STOMP。全部的插件都在這裏。
它提供 HTTP-based API 和 browser-based UI 以及 CLI 來管理 RabbitMQ。它的GUI的訪問地址是 http://<rabbitmq server ip>:15672/#/traces。它的GUI上,提供了一個 overview,還能夠經過它來管理connection、channel、exchange 和 queue,以及 virtual host,tracing 和 policy等。
該機制提供了一個查看被轉發的消息的途徑。當打開 firehose 的時候,RabbitMQ 會自動創建 amq.rabbitmq.trace 和 amq.rabbitmq.log 兩個exchange。你能夠編程建立queue 從這兩個 exchange 裏面獲取 trace 和 log,從而觀察每個被處理的消息。這裏有一個開源代碼實現。
rabbitmq_tracing 插件在 management 插件增長了消息追蹤的方法,它是從 firehose 中獲取數據。在激活了 rabbitmq-management,firehose 和 rabbitmq_tracing,你能夠在 management GUI 中追蹤消息:
自此,RabbitMQ 基本上算熟悉了,接下來能夠開始分析 OpenStack 中是如何使用 RabbitMQ 了。