AMQP消息隊列的測試方法

做者簡介html

羊老師,目前就任於餓了麼物流研發部,運單與服務業務線的測試負責人,同時也負責測試基礎設施的開發與維護,致力於自動化測試及工程效率的提高工做前端

前言

在大型互聯網架構中常常會用到消息隊列(Message Queue)這種中間件,在服務端測試時,許多測試同窗經過工具對API和數據庫都能熟練地進行測試,一說到消息隊列的測試就有點不知道怎麼入手了。那麼對於看不見摸不着的消息隊列,如何進行有效的測試呢?在介紹測試方法以前,咱們先來了解一下消息隊列的原理與機制,這裏以常見的AMQP協議的消息隊列爲例。python

1. AMQP消息隊列簡介

1.1 什麼是消息隊列

消息隊列,簡單來講,就是咱們經過網絡向對方發送了一封短消息,短消息經過運營商網絡發送到接收者,被對方讀取。消息隊列則是由生產者(消息的發送者)經過消息隊列服務器向消費者發送一個消息,消息體能夠爲字符串或者更多的數據結構,由消費者在消費端讀取消息。sql

1.2 什麼是AMQP

當前各類應用大量使用異步消息模型,並隨之產生衆多消息中間件產品及協議,標準的不一導致應用與中間件之間的耦合限制產品的選擇,並增長維護成本。AMQP(Advanced Message Queuing Protocol)是一個提供統一消息服務的應用層標準協議,基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣開發語言等條件的限制。shell

RabbitMQ是比較常見的一種基於AMQP的消息服務端。數據庫

1.3 AMQP 0.9.1 工做模型

image.png-187.6kB

工做過程

  1. Publisher將message發佈到exchange(exchange能夠看做是一個郵局或者郵件系統,也就是Broker)
  2. 將queue註冊到exchange上監聽某種類型的消息,這個過程稱之爲bingding(綁定)
  3. exchange將message投遞到queue,這個過程稱之爲routing(路由)。
  4. AMQP Broker將message發送給訂閱(subscribed)message的consumer,或者consumer按需將message從對應的queue中取出來。

名詞解釋

  • Broker: 接收和分發消息的應用,其實就是AMQP服務器端
  • Exchange: message到達broker的第一站,至關於一個路由器,匹配查詢表中的routing key,分發消息到queue中去。exchange主要有四種類型:direct (點對點)、 topic (主題訂閱) 、 fanout (廣播)和 headers(頭信息匹配)。
  • Queue: 是一個消息的載體,消息最終被送到這裏等待consumer取走。
  • Binding: exchange和queue之間的虛擬鏈接,binding中能夠包含routing key。Binding信息被保存到exchange中的查詢表中,用於message的分發依據。
  • Virtual host: 當多個不一樣的用戶使用同一個AMQP服務時,能夠劃分出多個vhost,每一個用戶在本身的vhost建立exchange/queue等。
  • Connection: publisher/consumer和broker之間的TCP鏈接。斷開鏈接的操做只會在client端進行,Broker不會斷開鏈接,除非出現網絡故障或broker服務出現問題。
  • Channel: Channel是在connection內部創建的邏輯鏈接,若是應用程序支持多線程,一般每一個thread建立單獨的channel進行通信,AMQP method包含了channel id幫助客戶端和message broker識別channel,因此channel之間是徹底隔離的。

校稿人注json

關於channel和多線程服務器

現在不多會有單進程單線程的應用,大多數狀況下生產者、消費者都是多進程多線程的,固然每一個線程均可以建立一個connection,一樣能夠知足連接broker、投遞或消費消息的工做。可是對操做系統而言,tcp連接是有代價的,並且建立和銷燬tcp連接的代價很昂貴。所以一些具體的消息隊列實現(例如RabbitMQ)選擇相似NIO的作法,複用tcp連接。經過在connection之上創建channel邏輯連接,每一個線程持有本身的channel,實現和broker的通訊,這樣提高了性能,也更便於管理網絡

消息投遞的幾種方式

1. 點對點(direct)數據結構

點對點模式相似於咱們發短消息,由指定的人來接收。

在AMQP中,一個direct類型的exchange基於routing key將投遞消息到queue中,若是routing key和queue的名字相同,那麼帶有該routing key的消息會直接被投遞到名字相同的queue中。

2. 廣播(fanout)

顧名思義,廣播模式相似於咱們在一個商場裏,忽然收到了整個商場的廣播消息,無論咱們願不肯意聽,只要在這個商場的人都會聽到。

在AMQP中,一個fanout類型的exchange會忽略routing key,將消息投遞到全部與之綁定的queue中。

3. 訂閱(topic)

訂閱模式相似於咱們刷微博,對於關注的人進行訂閱,當被關注者發佈一條新微博時,全部關注他的人都可以收到。

在AMQP中,一個topic類型的exchange會對routing key進行模式匹配,將消息投遞到綁定了對應的routing key的queue中。匹配的時候能夠用一些通配符,好比「#」表示匹配一個或多個字符。訂閱模式是最經常使用到的一種消息投遞方式。

4. 頭信息匹配(headers)

在AMQP中,一個header類型的exchange會使用消息的頭部屬性來進行匹配,再也不使用routing key。 若是匹配規則比較複雜,須要經過一個hash或者dictionary來匹配的話,可使用headers這種模式,對頭屬性的值來進行解析和匹配。

校稿人注

就一些特定的AMQP實現而言,例如RabbitMQ,headers模式的路由也是在exchange完成的,可是這種路由模式下exchange的性能會不好,並且這種模式也不實用,因此工程實現上通常都不使用這種模式

2. 消息隊列的經常使用測試方法

因爲消息隊列的應用場景主要是圍繞着 生產者消費者 展開的,因此測試思路其實很是簡單,若是被測應用是消息的生產者,那咱們就模擬消費者去接收消息,驗證發出的消息內容的正確性。若是被測應用是消息的消費者,那咱們就模擬消息的生產者去發送消息,而後驗證被測應用收到消息後的處理邏輯。

這裏咱們以最多見的RabbitMQ和最多見的Topic Exchange投遞方式爲例,介紹一下幾種主要的測試方式。

2.1 被測應用爲生產者

2.1.1 手工測試

  • 日誌法 若是被測應用是生產者,可讓開發將發送消息的內容打印在日誌中,經過查看日誌的方式進行驗證,這也是比較經常使用的方法。 可是這種方式若是發送消息頻次高、數據量大或者日誌級別設置的不合理的話可能會對應用的性能形成必定影響。

  • RabbitMQ管理面板 咱們須要模擬一個消費者去接收消息,直接從已有的queue中去取消息會和其餘的消費者產生衝突,因此咱們要新建一個測試queue,經過綁定相同的exchangerouting key,也拿到一份消息的拷貝。

具體步驟以下:

  1. 使用和被測應用相同的vhost帳號登錄RabbitMQ管理面板
  2. 新建一個測試queue,命名保證惟一性
    image.png-44.5kB
  3. 在測試queue的bindings中,綁定相同的exchangerouting key
    1541572245434.jpg-40.8kB
  4. 觸發被測系統發送消息,在測試queue中Get Messages來獲取消息

2.1.2 自動化測試

自動化測試的思路其實也是和手工測試同樣,惟一的不一樣是手工測試時把消息取出來後是肉眼進行驗證,而自動化測試則須要將消息落到一個可測的數據載體中,好比數據庫。總體的思路以下圖,是手工測試的一個延展。

這裏咱們使用到了python的pika庫(官方文檔:pypi.org/project/pik… ) 首先,咱們須要安裝pika:

pip install pika
複製代碼

而後,咱們模擬一個阻塞型的消費者來接收消息,代碼以下:

# coding:utf-8

__author__ = "小肥羊"

import pika
import json

username = 'username'    # 鏈接RabbitMQ服務器的用戶名
password = 'password'      # 鏈接RabbitMQ服務器的密碼
host = 'mq_server_host'  # 鏈接RabbitMQ服務器的地址
port = 'port'        # 鏈接RabbitMQ服務器的端口號
vhost = 'vhost_name'   # vhost名稱

queue_name = 'test_queue'   # 新建的測試queue的名稱
exchange_name = 'exchange_name'   # exchange名稱
routing_key = 'routing_key_name'   # routing key名稱

# 第一步,鏈接RabbitMQ服務器
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, vhost, credentials, socket_timeout=120))
# 在鏈接上建立一個頻道
channel = connection.channel()

# 第二步,爲確保隊列存在,再次執行queue_declare建立一個隊列,咱們能夠屢次運行該命令,可是隻有一個隊列會建立
channel.queue_declare(queue=queue_name, durable=True)

# 第三步,爲建立的隊列綁定對應的exchange和routing key
channel.queue_bind(queue_name, exchange_name, routing_key)

print ' [*] Waiting for messages. To exit press CTRL+C'


# 第四步,定義一個回調函數,當得到消息時,Pika庫調用這個回調函數來處理消息,該回調函數將消息內容打印到屏幕
def callback(ch, method, properties, body):
    # 消息體body轉成json格式
    dumped = json.dumps(body, ensure_ascii=False)
    pure_json = json.loads(body)
    # 將接收到的消息打印到屏幕
    print " [x] Received queue: %r" % (body,)
    # 告訴服務器已經接收到消息
    channel.basic_ack(delivery_tag=method.delivery_tag)


# 第五步,告訴RabbitMQ回調函數將從queue隊列接收消息

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=False)

# 第六步,輸入一個無限循環來等待消息數據並運行回調函數
channel.start_consuming()
複製代碼

在callback回調函數中,只是將消息內容打印了出來,若是要運用在自動化測試中,咱們還須要將消息內容寫入數據庫中,能夠經過sqlalchemy等工具對DB進行寫入操做,這裏就不作詳細介紹了。 另外,因爲採用了阻塞型的鏈接,因此該腳本最好是部署在測試服務器上運行,以保證7*24小時的可用性。

2.2 被測應用爲消費者

2.2.1 手工測試

若是數據來源依賴於消息的生產者,那麼咱們能夠模擬生產者來發送消息。

在RabbitMQ的管理面板中,容許咱們經過exchange和綁定的routing key來廣播消息和推送訂閱消息(fanouttopic以及header模式),也能夠直接往queue裏面發送消息(direct模式),在這裏其實更推薦後者,由於經過前二者發出的消息可能有其餘的應用系統在消費,可能會對其餘應用形成影響,因此建議直接往被測應用監聽的queue裏發消息。

具體步驟以下:

  1. 使用和被測應用相同的vhost帳號登錄管理面板
  2. 在queue面板中,找到被測應用監聽的queue
  3. 在publish message中的,填入消息內容併發送
    1541582668268.jpg-37.7kB
  4. 驗證被測應用收到消息後的處理邏輯

2.2.2 自動化測試

自動化模擬生產者要比消費者簡單得多,只須要將消息發送到指定的隊列中去,也不須要阻塞式運行腳本。

1541582588251.jpg-49.9kB

實現代碼以下:

# coding:utf-8

__author__ = '小肥羊'

import pika

username = 'username'    # 鏈接RabbitMQ服務器的用戶名
password = 'password'      # 鏈接RabbitMQ服務器的密碼
host = 'mq_server_host'  # 鏈接RabbitMQ服務器的地址
port = 'port'        # 鏈接RabbitMQ服務器的端口號
vhost = 'vhost_name'   # vhost名稱

queue_name = 'queue_name'   # 被測系統監聽的隊列名稱

# 第一步,鏈接RabbitMQ服務器
credentials = pika.PlainCredentials(username, password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host, port, vhost, credentials, socket_timeout=120))
# 在鏈接上建立一個頻道
channel = connection.channel()

# 第二步,聲明一個隊列,生產者和消費者都要聲明一個相同的隊列,用來防止萬一某一方掛了,另外一方能正常運行
channel.queue_declare(queue=queue_name, durable=True)

# 第三步,發送消息,routing_key填的是queue的名稱,這裏exchange填空字符串,使用了default exchange
channel.basic_publish(exchange='', routing_key=queue_name, body='message you want to send')

# 第四步,關閉鏈接
connection.close()

複製代碼

其中,代碼中第三步使用了default exchange,對此,官方有一個說明:

The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. It has one special property that makes it very useful for simple applications: every queue that is created is automatically bound to it with a routing key which is the same as the queue name.

3. 總結

本次分享中主要介紹了AMQP消息隊列的簡單運做機制和原理,以及針對生產者和消費者兩種場景的測試方法,包含了手工和自動化的方式。

爲何要單獨從消息中間件來進行測試呢?主要緣由有:

  1. 分層測試。在測試一個完整功能時有時須要採起分層測試策略,先進行服務端測試,再驗證前端UI。
  2. 測試解耦。消息隊列的設計自己就是爲了系統之間的解耦,若是每次測試時都要依賴上游或者下游一塊兒驗證,那麼協同工做的成本將會很高。

4. 相關資料

  1. AMQP官方網站
  2. AMQP Wiki
  3. Python Pika 官方文檔
  4. RabbitMQ Python Tutorial




閱讀博客還不過癮?

歡迎你們掃二維碼經過添加羣助手,加入交流羣,討論和博客有關的技術問題,還能夠和博主有更多互動

博客轉載、線下活動及合做等問題請郵件至 shadowfly_zyl@hotmail.com 進行溝通

相關文章
相關標籤/搜索