rabbitmq高級消息隊列

rabbitmq使用

什麼是消息隊列

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠很複雜,能夠包含嵌入對象。node

消息隊列是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中不用考慮哪一個消費者來取數據,消息使用者只管從MQ 中取消息而不用管是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。python

爲什麼用消息隊列

消息隊列是一種應用間的異步協做機制,那麼何時使用 MQ呢後端

一常見的訂單系統爲例,用戶點擊下單按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展出力這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發紅包、發短信、郵件通知等。這種場景下能夠用 MQ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完成,而由另外的單獨線程拉取 MQ 的消息(或者由 MQ 推送消息),當發現 MQ中有發紅包或其餘的消息時,執行相應的業務邏輯。安全

RabbitMQ 的特色

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。服務器

AMQP:Advanced Message Queue Protocol,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。網絡

RabbitMQ 最初起源於金融系統,用與在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現很好。具體特色以下:併發

  1. 可靠性(Reliability)app

    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。框架

  2. 靈活的路由(Flexible Routing)運維

    在消息進入隊列以前,經過 Exchange 來路由消息。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也能夠經過插件實現本身的 Exchange。

  3. 消息集羣(Clustering)

    多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker。

  4. 高可用(Highly Available Queues)

    隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍可使用。

  5. 多種協議(Multi-Protocol)

    RabbitMQ 支持多種消息隊列協議,如 STOMP、MQTT 等。

  6. 多語言客戶端(Many Clients)

    RabbitMQ 幾乎支持全部經常使用語言,好比 Python、Java、.NET等。

  7. 管理界面(Management UI)

    RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。

  8. 跟蹤機制(Tracing)

    如消息異常,RabbitMQ 提供了消息追蹤機制,使用者能夠找出發生了什麼。

  9. 插件機制(Plugin System)

    RabbitMQ 提供了許多插件,能夠擴展插件,也能夠本身編寫插件。

RabbitMQ 中的概念模型

消息模型

全部 MQ 產品從模型抽象上來講都是同樣的過程:

消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費之。

消息流

RabbitMQ 基本概念

上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:

RabbitMQ 內部結構
  1. Message

    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。。

  2. Publisher

    消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。

  3. Exchange

    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。

  4. Binding

    綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。

  5. Queue

    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息能夠投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。

  6. Connection

    網絡鏈接,好比一個 TCP 鏈接。

  7. Channel

    信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的 TCP 鏈接內的虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。

  8. Consumer

    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

  9. Virtual Host

    虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 是默認的 vhost。

  10. Broker

    表示消息隊列服務器實體。

AMQP 中的消息路由

AMQP 中消息的路由過程增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接受,而 Binding 決定交換器的消息應該發送到哪一個隊列中。

AMQP 的消息路由過程

Exchange 類型

Exchange 分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了。

  1. direct

direct 交換器

消息中的路由鍵(routing key)若是和 Binding 中的 bind key 一致,交換器就將消息發送到相應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲‘’dog‘’,則只轉發 routing key 標記爲‘’dog‘’的消息,不會轉發‘’dog.puppy‘’等。這是徹底匹配、單薄的模式。

  1. fanout

fanout 交換器

每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單地將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到一份複製的消息。fanout 類型轉發消息是最快的。

  1. topic

topic 交換器

topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串分紅單詞,這些單詞之間用點隔開。它一樣會識別兩個通配符:符號#和符號*#匹配0個或多個單詞,*匹配一個單詞。

RabbitMQ 運行和管理

  1. 啓動

找到安裝後的 RabbitMQ 所在目錄下的 sbin 目錄,能夠看到該目錄下有7個以 rabbitmq 開頭的可執行文件,直接執行 rabbitmq-server 便可,下面

啓動會看到一些啓動過程和最後的 completed with 6 plugins,這說明啓動的時候默認加載了6個插件。

  1. 後臺啓動

    若是想讓RabbitMQ 以守護程序的方式在後臺運行,能夠在啓動的時候加上-detached 參數:

    rabbitmq-server -detached
  2. 查詢服務器狀態

    sbin 目錄下有個很重要的文件叫 rabbitmqctl,它提供了 RabbitMQ 管理須要的幾乎一站式解決方案,絕大部分的運維命令它均可以提供。

    查詢 RabbitMQ 服務器的狀態信息能夠用參數 status

    rabbitmqctl status

    該命令將輸出服務器不少信息,好比 RabbitMQ 和 Erlang的版本、os 名稱、內存等信息。

  3. 關閉 RabbitMQ 節點

    在 Erlang 中有兩個概念:節點和應用程序。節點就是 Erlang 虛擬機的每一個實例,而多個 Erlang 應用程序能夠運行在同一個節點之上。節點之間能夠進行本地通訊(無論他們是否是運行在同一臺服務器之上)。好比一個運行在節點 A 上的應用程序能夠調用節點 B 上應用程序的方法,就好像調用本地函數同樣。若是應用程序因爲某些緣由崩潰,Erlang 節點會自動嘗試重啓應用程序。

    若是要關閉整個 RabbitMQ 節點可使用參數 stop

    rabbitmqctl stop

    它會和本地節點通訊並指示其乾淨的關閉,也能夠指定關閉不一樣的節點,包括遠程節點,只須要傳入產生參數-n

    rabbitmqctl -n rabbit@server.example.com stop

    -n node默認 node 名稱是 rabbit@server,若是你的主機名是 server.example.com,那麼 node 名稱就是 rabbit@server.example.com。

  4. 關閉 RabbitMQ 應用程序

    若是隻想關閉應用程序,同時保持 Erlang 節點運行則可使用stop_app

    rabbitmqctl stop_app
  5. 啓動 RabbitMQ 節點

    rabbitmqctl start_app
  6. 重置RabbitMQ 節點

    rabbitmqctl reset
  7. 查看已聲明的隊列

    rabbitmqctl list_queues
  8. 查看交換器

    rabbitmqctl list_exchanges

    該命令還能夠附加參數,好比列出交換器的名稱、類型、是否持久化、是否自動刪除:

    rabbitmqctl list_exchanges name type durable auto_delete
  9. 查看綁定

    rabbitmqctl list_bindings

隊列的做用

應用解耦

實際案例:在用戶進行註冊的時候須要進行郵件發送和驗證碼發送,在沒有使用消息隊列以前,這些操做是同步進行的,在用戶量較少的狀況下,是不會影響用戶體驗的,可是一旦用戶量大了起來,仍是使用同步的話會極大地影響用戶體驗,在這種狀況下就可使用消息隊列。

當用戶註冊的時候先直接返回註冊成功頁面,以後將須要發送郵件和驗證碼的任務存在隊列裏面,以後由服務器來消費隊列裏面的任務已達到應用解耦的目的。

流量削峯

在淘寶雙十一或者春運時的12306的用戶訪問量是特別大的,這種狀況下若是全部的請求直接打到服務器上,服務器會直接掛掉。在這種狀況下,可使用動態擴容服務器的數量來處理高併發,可是並非全部時間都須要不少服務器進行處理,爲了成本的考慮能夠採用消息隊列進行處理併發,就是全部的請求並非直接請求服務器,而是先把用戶的請求存儲在消息隊列裏面,以後服務器在慢慢地進行處理請求。

消息隊列數據不丟失

  1. 在產生隊列時,設置durable=True,表明隊列持久化
  2. 在生產端,設置properties=pika.BasicProperties(delivery_mode=2)
  3. 在消費端,設置auto_ack=False, ch.basic_ack(delivery_tag=method.delivery_tag)

消息產生與消費

簡單消息發送

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 16:56

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

# 聲明隊列
channel.queue_declare(queue='test', durable=True)

channel.basic_publish(exchange='',
                      routing_key='test',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=1,  # make message persistent
                      ))

print('[x] sent "hello world!"')
connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 16:56

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.queue_declare(queue='test', durable=True)


def callback(ch, method, properties, body):
    '''

    :param ch: 信道
    :param method:
    :param properties:
    :param body: message
    :return:
    '''
    print(ch, method, properties, body)

    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(
    queue='test', on_message_callback=callback, auto_ack=False
)

channel.start_consuming()

fanout 模式

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 18:36

import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout')

message = ''.join(sys.argv[1:]) or 'info: hello world!'

channel.basic_publish(exchange='logs', routing_key='', body=message)

print('[x] sent %r' % message)

connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 18:41

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare('', exclusive=True)  # exclusive 排他性,惟一性

queue_name = result.method.queue

print('queue_name:', queue_name)

channel.queue_bind(exchange='logs', queue=queue_name)

print('[*] waiting for logs. To exit press CTRL+c')


def callback(ch, method, properties, body):
    print('[x] %r' % body)


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True
)

channel.start_consuming()

direct 模式

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 18:49

import sys
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

log_levels = sys.argv[1] if len(sys.argv) > 1 else 'info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='direct_logs',
    routing_key=log_levels,
    body=message
)

print('[x] Sent %r:%r' % (log_levels, message))

connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 18:53

import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare('', exclusive=True)

queue_name = result.method.queue

log_levels = sys.argv[1:]

if not log_levels:
    sys.stderr.write('Usage: %s [info] [warning] [error]\n' % sys.argv[0])

for severity in log_levels:
    channel.queue_bind(
        exchange='direct_logs',
        queue=queue_name,
        routing_key=severity
    )

print('[*] Waiting for logs. To exit press CTRL+c')


def callback(ch, method, properties, body):
    print('[x] %r:%r' % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

topic 模式

producer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : producer.py
# __time__  : 2019-04-03 19:07

import sys
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'

message = ' '.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(
    exchange='topic_logs',
    routing_key=routing_key,
    body=message
)

print('[x] Sent %r:%r' % (routing_key, message))

connection.close()

consumer

# -*- coding: utf-8 -*-
# __author__: MUSIBII
# __email__ : shaozuanzuan@gmail.com
# __file__  : consumer.py
# __time__  : 2019-04-03 19:11

import sys

import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost')
)

channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]

if not binding_keys:
    sys.stderr.write('Usage: %s [binding_key]...\n' % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs',
        queue=queue_name,
        routing_key=binding_key
    )

print('[*] Waiting for logs. To exit press CTRL+c')


def callback(ch, method, properties, body):
    print('[x %r:%r' % (method.routing_key, body))


channel.basic_consume(
    queue=queue_name,
    on_message_callback=callback,
    auto_ack=True
)

channel.start_consuming()

進行消息傳遞的時候先啓動 consumer,這樣當生產者發送消息的時候能再消費者後端看到消息記錄。接着運行 producer,發佈一條消息,在 consumer 的控制檯能看到接收的消息。

RabbitMQ 集羣

RabbitMQ 最優秀的功能之一就是內建集羣,這個功能設計的目的是容許消費者和生產者在節點崩潰的狀況下繼續運行,以及經過添加更多的節點來線性擴展消息通訊吞吐量。RabbitMQ 內部利用 Erlang 提供的分佈式通訊框架 OTP 來知足上述需求,使客戶端在失去一個 RabbitMQ 節點鏈接的狀況下,依舊可以從新鏈接到集羣中的任何其餘節點繼續生產、消費。

RabbitMQ 集羣中的一些概念

RabbitMQ 會始終記錄如下四種類型的內部元數據:

  1. 隊列元數據

    包括隊列名稱和它們的屬性,好比是否可持久化、是否自動刪除

  2. 交換器元數據

    交換器名稱、類型、屬性

  3. 綁定元數據

    內部是一張表格記錄如何將消息路由到隊列

  4. vhost 元數據

    爲 vhost 內部的隊列、交換器、綁定提供命名空間和安全屬性。

在單一節點中,RabbitMQ 會將全部這些信息存儲在內存中,同時將標記爲可持久化的隊列、交換器、綁定存儲在硬盤上。存到硬盤上能夠確保隊列和交換器在節點重啓後可以重建。而在集羣模式下一樣也提供兩種選擇:存到硬盤上(獨立節點的默認設置),存到內存中。

若是在集羣中建立隊列,集羣只會在單個節點而不是全部節點上建立完整的隊列信息(元數據、狀態、內容)。結果是隻有隊列的全部者節點知道有關隊列的全部信息,所以當集羣節點崩潰時,該節點的的隊列和綁定就消失了,而且任何匹配該隊列的綁定的新消息也會丟失。

RabbitMQ 集羣中能夠共享 user、vhost、exchange等,全部的數據和狀態都是必須在全部節點上覆制的,例外就是上面所說的消息隊列。RabbitMQ 節點能夠動態的加入到集羣中。

當在集羣中聲明隊列、交換器、綁定的時候,這些操做會直到全部集羣節點都成功提交元數據變動後才返回。集羣中有內存節點和磁盤節點兩種類型,內存節點雖然不寫入磁盤,可是它的執行比磁盤節點要好。內存節點能夠提供出色的性能,磁盤節點能保障配置信息在節點重啓後仍然可用,那集羣中如何平衡這二者呢?

RabbitMQ 只要求集羣中至少有一個磁盤節點,全部其餘節點能夠是內存節點,當節點加入火離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,恰好又是該節點崩潰了,那麼集羣能夠繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加用戶、更改權限、添加或刪除集羣節點。換句話說集羣中的惟一磁盤節點崩潰的話,集羣仍然能夠運行,但知道該節點恢復,不然沒法更改任何東西。

RabbitMQ 集羣配置和啓動

若是是在一臺機器上同時啓動多個 RabbitMQ 節點來組建集羣的話,只用上面介紹的方式啓動第2、第三個節點將會由於節點名稱和端口衝突致使啓動失敗。因此在每次調用 rabbitmq-server 命令前,設置環境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定惟一的節點名稱和端口。下面的例子端口號從5672開始,每一個新啓動的節點都加1,節點也分別命名爲test_rabbit_一、test_rabbit_二、test_rabbit_3。

  1. 啓動第一個節點:

    RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
  2. 啓動第二個節點:

    RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

啓動第2個節點前建議將 RabbitMQ 默認激活的插件關掉,不然會存在使用了某個插件的端口號衝突,致使節點啓動不成功。

如今第2個節點和第1個節點都是獨立節點,它們並不知道其餘節點的存在。集羣中除第一個節點外後加入的節點須要獲取集羣中的元數據,因此要先中止 Erlang 節點上運行的 RabbitMQ 應用程序,並重置該節點元數據,再加入而且獲取集羣的元數據,最後從新啓動 RabbitMQ 應用程序。

  1. 中止第二個節點的應用程序:

    rabbitmqctl -n test_rabbit_2 stop_app
  2. 重置第二個節點元數據:

    rabbitmqctl -n test_rabbit_2 reset
  3. 第二個節點加入到第一個節點組成的集羣:

    rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
  4. 啓動第二個節點的應用程序

    rabbitmqctl -n test_rabbit_2 start_app
  5. 第三個節點的配置相似

    RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 rabbitmq-server -detached
    rabbitmqctl -n test_rabbit_3 stop_app
    rabbitmqctl -n test_rabbit_3 reset
    rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost
    rabbitmqctl -n test_rabbit_3 start_app

RabbitMQ 集羣運維

中止某個指定的節點,好比中止第二個節點

RABBITMQ_NODENAME=test_rabbit_2 rabbitmqctl stop

查看節點三的集羣狀態

rabbitmqctl -n test_rabbit_3 cluster_status
相關文章
相關標籤/搜索