轉 消息隊列之 RabbitMQ

轉 https://www.jianshu.com/p/79ca08116d57html

 

消息隊列之 RabbitMQ

2017.05.06 16:03* 字數 4884 閱讀 80990評論 18

關於消息隊列,從前年開始斷斷續續看了些資料,想寫好久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術選型,是時候把這塊的知識整理記錄一下了。java

市面上的消息隊列產品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,去年末阿里巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種,就我本身的使用經驗和興趣只打算談談 RabbitMQ、Kafka 和 ActiveMQ ,本文先講 RabbitMQ ,在此以前先看下消息隊列的相關概念。node

什麼叫消息隊列

消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。redis

消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。數據庫

爲什麼用消息隊列

從上面的描述中能夠看出消息隊列是一種應用間的異步協做機制,那何時須要使用 MQ 呢?windows

以常見的訂單系統爲例,用戶點擊【下單】按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。後端

以上是用於業務解耦的狀況,其它常見場景包括最終一致性、廣播、錯峯流控等等。安全

RabbitMQ 特色

RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。bash

AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。服務器

RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:

  1. 可靠性(Reliability)
    RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。

  2. 靈活的路由(Flexible Routing)
    在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。

  3. 消息集羣(Clustering)
    多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。

  4. 高可用(Highly Available Queues)
    隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。

  5. 多種協議(Multi-protocol)
    RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。

  6. 多語言客戶端(Many Clients)
    RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。

  7. 管理界面(Management UI)
    RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。

  8. 跟蹤機制(Tracing)
    若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。

  9. 插件機制(Plugin System)
    RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。

RabbitMQ 中的概念模型

消息模型

全部 MQ 產品從模型抽象上來講都是同樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。

消息流
RabbitMQ 基本概念

上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:

RabbitMQ 內部結構
  1. Message
    消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
  2. Publisher
    消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
  3. Exchange
    交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。
  4. Binding
    綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。
  5. Queue
    消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
  6. Connection
    網絡鏈接,好比一個TCP鏈接。
  7. Channel
    信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內地虛擬鏈接,AMQP 命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
  8. Consumer
    消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  9. Virtual Host
    虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 / 。
  10. Broker
    表示消息隊列服務器實體。
AMQP 中的消息路由

AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。

AMQP 的消息路由過程
Exchange 類型

Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:

  1. direct


    direct 交換器

    消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。

  2. fanout


    fanout 交換器

    每一個發到 fanout 類型交換器的消息都會分到全部綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout 類型轉發消息是最快的。

  3. topic
    topic 交換器

    topic 交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「」。#匹配0個或多個單詞,匹配很少很多一個單詞。

RabbitMQ 安裝

通常來講安裝 RabbitMQ 以前要安裝 Erlang ,能夠去Erlang官網下載。接着去RabbitMQ官網下載安裝包,以後解壓縮便可。根據操做系統不一樣官網提供了相應的安裝說明:WindowsDebian / UbuntuRPM-based LinuxMac

若是是Mac 用戶,我的推薦使用 HomeBrew 來安裝,安裝前要先更新 brew:

brew update

接着安裝 rabbitmq 服務器:

brew install rabbitmq

這樣 RabbitMQ 就安裝好了,安裝過程當中會自動其所依賴的 Erlang 。

RabbitMQ 運行和管理

  1. 啓動
    啓動很簡單,找到安裝後的 RabbitMQ 所在目錄下的 sbin 目錄,能夠看到該目錄下有6個以 rabbitmq 開頭的可執行文件,直接執行 rabbitmq-server 便可,下面將 RabbitMQ 的安裝位置以 . 代替,啓動命令就是:
./sbin/rabbitmq-server

啓動正常的話會看到一些啓動過程信息和最後的 completed with 7 plugins,這也說明啓動的時候默認加載了7個插件。


正常啓動
  1. 後臺啓動
    若是想讓 RabbitMQ 以守護程序的方式在後臺運行,能夠在啓動的時候加上 -detached 參數:
./sbin/rabbitmq-server -detached
  1. 查詢服務器狀態
    sbin 目錄下有個特別重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理須要的幾乎一站式解決方案,絕大部分的運維命令它均可以提供。
    查詢 RabbitMQ 服務器的狀態信息能夠用參數 status :
./sbin/rabbitmqctl status

該命令將輸出服務器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名稱、內存等等

  1. 關閉 RabbitMQ 節點
    咱們知道 RabbitMQ 是用 Erlang 語言寫的,在Erlang 中有兩個概念:節點和應用程序。節點就是 Erlang 虛擬機的每一個實例,而多個 Erlang 應用程序能夠運行在同一個節點之上。節點之間能夠進行本地通訊(無論他們是否是運行在同一臺服務器之上)。好比一個運行在節點A上的應用程序能夠調用節點B上應用程序的方法,就好像調用本地函數同樣。若是應用程序因爲某些緣由奔潰,Erlang 節點會自動嘗試重啓應用程序。
    若是要關閉整個 RabbitMQ 節點能夠用參數 stop :
./sbin/rabbitmqctl stop

它會和本地節點通訊並指示其乾淨的關閉,也能夠指定關閉不一樣的節點,包括遠程節點,只須要傳入參數 -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 

-n node 默認 node 名稱是 rabbit@server ,若是你的主機名是 server.example.com ,那麼 node 名稱就是 rabbit@server.example.com

  1. 關閉 RabbitMQ 應用程序
    若是隻想關閉應用程序,同時保持 Erlang 節點運行則能夠用 stop_app:
./sbin/rabbitmqctl stop_app

這個命令在後面要講的集羣模式中將會頗有用。

  1. 啓動 RabbitMQ 應用程序
./sbin/rabbitmqctl start_app
  1. 重置 RabbitMQ 節點
./sbin/rabbitmqctl reset

該命令將清除全部的隊列。

  1. 查看已聲明的隊列
./sbin/rabbitmqctl list_queues
  1. 查看交換器
./sbin/rabbitmqctl list_exchanges

該命令還能夠附加參數,好比列出交換器的名稱、類型、是否持久化、是否自動刪除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete 
  1. 查看綁定
./sbin/rabbitmqctl list_bindings

Java 客戶端訪問

RabbitMQ 支持多種語言訪問,以 Java 爲例看下通常使用 RabbitMQ 的步驟。

  1. maven工程的pom文件中添加依賴
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> 
  1. 消息生產者
package org.study.rabbitmq; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //建立鏈接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); //設置 RabbitMQ 地址 factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); String routingKey = "hola"; //發佈消息 byte[] messageBodyBytes = "quit".getBytes(); channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes); channel.close(); conn.close(); } } 
  1. 消息消費者
package org.study.rabbitmq; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class Consumer { public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setUsername("guest"); factory.setPassword("guest"); factory.setHost("localhost"); //創建到代理服務器到鏈接 Connection conn = factory.newConnection(); //得到信道 final Channel channel = conn.createChannel(); //聲明交換器 String exchangeName = "hello-exchange"; channel.exchangeDeclare(exchangeName, "direct", true); //聲明隊列 String queueName = channel.queueDeclare().getQueue(); String routingKey = "hola"; //綁定隊列,經過鍵 hola 將隊列和交換器綁定起來 channel.queueBind(queueName, exchangeName, routingKey); while(true) { //消費消息 boolean autoAck = false; String consumerTag = ""; channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消費的路由鍵:" + routingKey); System.out.println("消費的內容類型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //確認消息 channel.basicAck(deliveryTag, false); System.out.println("消費的消息體內容:"); String bodyStr = new String(body, "UTF-8"); System.out.println(bodyStr); } }); } } } 
  1. 啓動 RabbitMQ 服務器
./sbin/rabbitmq-server
  1. 運行 Consumer
    先運行 Consumer ,這樣當生產者發送消息的時候能在消費者後端看到消息記錄。
  2. 運行 Producer
    接着運行 Producer ,發佈一條消息,在 Consumer 的控制檯能看到接收的消息:


    Consumer 控制檯

RabbitMQ 集羣

RabbitMQ 最優秀的功能之一就是內建集羣,這個功能設計的目的是容許消費者和生產者在節點崩潰的狀況下繼續運行,以及經過添加更多的節點來線性擴展消息通訊吞吐量。RabbitMQ 內部利用 Erlang 提供的分佈式通訊框架 OTP 來知足上述需求,使客戶端在失去一個 RabbitMQ 節點鏈接的狀況下,仍是可以從新鏈接到集羣中的任何其餘節點繼續生產、消費消息。

RabbitMQ 集羣中的一些概念

RabbitMQ 會始終記錄如下四種類型的內部元數據:

  1. 隊列元數據
    包括隊列名稱和它們的屬性,好比是否可持久化,是否自動刪除
  2. 交換器元數據
    交換器名稱、類型、屬性
  3. 綁定元數據
    內部是一張表格記錄如何將消息路由到隊列
  4. vhost 元數據
    爲 vhost 內部的隊列、交換器、綁定提供命名空間和安全屬性

在單一節點中,RabbitMQ 會將全部這些信息存儲在內存中,同時將標記爲可持久化的隊列、交換器、綁定存儲到硬盤上。存到硬盤上能夠確保隊列和交換器在節點重啓後可以重建。而在集羣模式下一樣也提供兩種選擇:存到硬盤上(獨立節點的默認設置),存在內存中。

若是在集羣中建立隊列,集羣只會在單個節點而不是全部節點上建立完整的隊列信息(元數據、狀態、內容)。結果是隻有隊列的全部者節點知道有關隊列的全部信息,所以當集羣節點崩潰時,該節點的隊列和綁定就消失了,而且任何匹配該隊列的綁定的新消息也丟失了。還好RabbitMQ 2.6.0以後提供了鏡像隊列以免集羣節點故障致使的隊列內容不可用。

RabbitMQ 集羣中能夠共享 user、vhost、exchange等,全部的數據和狀態都是必須在全部節點上覆制的,例外就是上面所說的消息隊列。RabbitMQ 節點能夠動態的加入到集羣中。

當在集羣中聲明隊列、交換器、綁定的時候,這些操做會直到全部集羣節點都成功提交元數據變動後才返回。集羣中有內存節點和磁盤節點兩種類型,內存節點雖然不寫入磁盤,可是它的執行比磁盤節點要好。內存節點能夠提供出色的性能,磁盤節點能保障配置信息在節點重啓後仍然可用,那集羣中如何平衡這二者呢?

RabbitMQ 只要求集羣中至少有一個磁盤節點,全部其餘節點能夠是內存節點,當節點加入火離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,恰好又是該節點崩潰了,那麼集羣能夠繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加用戶、更改權限、添加或刪除集羣節點。換句話說集羣中的惟一磁盤節點崩潰的話,集羣仍然能夠運行,但知道該節點恢復,不然沒法更改任何東西。

RabbitMQ 集羣配置和啓動

若是是在一臺機器上同時啓動多個 RabbitMQ 節點來組建集羣的話,只用上面介紹的方式啓動第2、第三個節點將會由於節點名稱和端口衝突致使啓動失敗。因此在每次調用 rabbitmq-server 命令前,設置環境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定惟一的節點名稱和端口。下面的例子端口號從5672開始,每一個新啓動的節點都加1,節點也分別命名爲test_rabbit_一、test_rabbit_二、test_rabbit_3。

啓動第1個節點:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached

啓動第2個節點:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

啓動第2個節點前建議將 RabbitMQ 默認激活的插件關掉,不然會存在使用了某個插件的端口號衝突,致使節點啓動不成功。

如今第2個節點和第1個節點都是獨立節點,它們並不知道其餘節點的存在。集羣中除第一個節點外後加入的節點須要獲取集羣中的元數據,因此要先中止 Erlang 節點上運行的 RabbitMQ 應用程序,並重置該節點元數據,再加入而且獲取集羣的元數據,最後從新啓動 RabbitMQ 應用程序。

中止第2個節點的應用程序:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app

重置第2個節點元數據:

./sbin/rabbitmqctl -n test_rabbit_2 reset

第2節點加入第1個節點組成的集羣:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost 

啓動第2個節點的應用程序

./sbin/rabbitmqctl -n test_rabbit_2 start_app

第3個節點的配置過程和第2個節點相似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached ./sbin/rabbitmqctl -n test_rabbit_3 stop_app ./sbin/rabbitmqctl -n test_rabbit_3 reset ./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost ./sbin/rabbitmqctl -n test_rabbit_3 start_app 
RabbitMQ 集羣運維

中止某個指定的節點,好比中止第2個節點:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop

查看節點3的集羣狀態:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
相關文章
相關標籤/搜索