關於消息隊列,從前年開始斷斷續續看了些資料,想寫好久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術選型,是時候把這塊的知識整理記錄一下了。html
市面上的消息隊列產品有不少,好比老牌的 ActiveMQ、RabbitMQ ,目前我看最火的 Kafka ,還有 ZeroMQ ,去年末阿里巴巴捐贈給 Apache 的 RocketMQ ,連 redis 這樣的 NoSQL 數據庫也支持 MQ 功能。總之這塊知名的產品就有十幾種,就我本身的使用經驗和興趣只打算談談 RabbitMQ、Kafka 和 ActiveMQ ,本文先講 RabbitMQ ,在此以前先看下消息隊列的相關概念。java
消息(Message)是指在應用間傳送的數據。消息能夠很是簡單,好比只包含文本字符串,也能夠更復雜,可能包含嵌入對象。node
消息隊列(Message Queue)是一種應用間的通訊方式,消息發送後能夠當即返回,由消息系統來確保消息的可靠傳遞。消息發佈者只管把消息發佈到 MQ 中而不用管誰來取,消息使用者只管從 MQ 中取消息而無論是誰發佈的。這樣發佈者和使用者都不用知道對方的存在。redis
從上面的描述中能夠看出消息隊列是一種應用間的異步協做機制,那何時須要使用 MQ 呢?數據庫
以常見的訂單系統爲例,用戶點擊【下單】按鈕以後的業務邏輯可能包括:扣減庫存、生成相應單據、發紅包、發短信通知。在業務發展初期這些邏輯可能放在一塊兒同步執行,隨着業務的發展訂單量增加,須要提高系統服務的性能,這時能夠將一些不須要當即生效的操做拆分出來異步執行,好比發放紅包、發短信通知等。這種場景下就能夠用 MQ ,在下單的主流程(好比扣減庫存、生成相應單據)完成以後發送一條消息到 MQ 讓主流程快速完結,而由另外的單獨線程拉取MQ的消息(或者由 MQ 推送消息),當發現 MQ 中有發紅包或發短信之類的消息時,執行相應的業務邏輯。windows
以上是用於業務解耦的狀況,其它常見場景包括最終一致性、廣播、錯峯流控等等。後端
RabbitMQ 是一個由 Erlang 語言開發的 AMQP 的開源實現。安全
AMQP :Advanced Message Queue,高級消息隊列協議。它是應用層協議的一個開放標準,爲面向消息的中間件設計,基於此協議的客戶端與消息中間件可傳遞消息,並不受產品、開發語言等條件的限制。bash
RabbitMQ 最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特色包括:服務器
可靠性(Reliability) RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發佈確認。
靈活的路由(Flexible Routing) 在消息進入隊列以前,經過 Exchange 來路由消息的。對於典型的路由功能,RabbitMQ 已經提供了一些內置的 Exchange 來實現。針對更復雜的路由功能,能夠將多個 Exchange 綁定在一塊兒,也經過插件機制實現本身的 Exchange 。
消息集羣(Clustering) 多個 RabbitMQ 服務器能夠組成一個集羣,造成一個邏輯 Broker 。
高可用(Highly Available Queues) 隊列能夠在集羣中的機器上進行鏡像,使得在部分節點出問題的狀況下隊列仍然可用。
多種協議(Multi-protocol) RabbitMQ 支持多種消息隊列協議,好比 STOMP、MQTT 等等。
多語言客戶端(Many Clients) RabbitMQ 幾乎支持全部經常使用語言,好比 Java、.NET、Ruby 等等。
管理界面(Management UI) RabbitMQ 提供了一個易用的用戶界面,使得用戶能夠監控和管理消息 Broker 的許多方面。
跟蹤機制(Tracing) 若是消息異常,RabbitMQ 提供了消息跟蹤機制,使用者能夠找出發生了什麼。
插件機制(Plugin System) RabbitMQ 提供了許多插件,來從多方面進行擴展,也能夠編寫本身的插件。
全部 MQ 產品從模型抽象上來講都是同樣的過程: 消費者(consumer)訂閱某個隊列。生產者(producer)建立消息,而後發佈到隊列(queue)中,最後將消息發送到監聽的消費者。
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念須要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,因此其內部實際上也是 AMQP 中的基本概念:
AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:
通常來講安裝 RabbitMQ 以前要安裝 Erlang ,能夠去Erlang官網下載。接着去RabbitMQ官網下載安裝包,以後解壓縮便可。根據操做系統不一樣官網提供了相應的安裝說明:Windows、Debian / Ubuntu、RPM-based Linux、Mac
若是是Mac 用戶,我的推薦使用 HomeBrew 來安裝,安裝前要先更新 brew:
brew update
複製代碼
接着安裝 rabbitmq 服務器:
brew install rabbitmq
複製代碼
這樣 RabbitMQ 就安裝好了,安裝過程當中會自動其所依賴的 Erlang 。
./sbin/rabbitmq-server
複製代碼
啓動正常的話會看到一些啓動過程信息和最後的 completed with 7 plugins,這也說明啓動的時候默認加載了7個插件。
./sbin/rabbitmq-server -detached
複製代碼
./sbin/rabbitmqctl status
複製代碼
該命令將輸出服務器的不少信息,好比 RabbitMQ 和 Erlang 的版本、OS 名稱、內存等等
./sbin/rabbitmqctl stop
複製代碼
它會和本地節點通訊並指示其乾淨的關閉,也能夠指定關閉不一樣的節點,包括遠程節點,只須要傳入參數 -n :
./sbin/rabbitmqctl -n rabbit@server.example.com stop
複製代碼
-n node 默認 node 名稱是 rabbit@server ,若是你的主機名是 server.example.com ,那麼 node 名稱就是 rabbit@server.example.com 。
./sbin/rabbitmqctl stop_app
複製代碼
這個命令在後面要講的集羣模式中將會頗有用。
./sbin/rabbitmqctl start_app
複製代碼
./sbin/rabbitmqctl reset
複製代碼
該命令將清除全部的隊列。
./sbin/rabbitmqctl list_queues
複製代碼
./sbin/rabbitmqctl list_exchanges
複製代碼
該命令還能夠附加參數,好比列出交換器的名稱、類型、是否持久化、是否自動刪除:
./sbin/rabbitmqctl list_exchanges name type durable auto_delete
複製代碼
./sbin/rabbitmqctl list_bindings
複製代碼
RabbitMQ 支持多種語言訪問,以 Java 爲例看下通常使用 RabbitMQ 的步驟。
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.1.0</version>
</dependency>
複製代碼
package org.study.rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立鏈接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
//設置 RabbitMQ 地址
factory.setHost("localhost");
//創建到代理服務器到鏈接
Connection conn = factory.newConnection();
//得到信道
Channel channel = conn.createChannel();
//聲明交換器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
String routingKey = "hola";
//發佈消息
byte[] messageBodyBytes = "quit".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
channel.close();
conn.close();
}
}
複製代碼
package org.study.rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setHost("localhost");
//創建到代理服務器到鏈接
Connection conn = factory.newConnection();
//得到信道
final Channel channel = conn.createChannel();
//聲明交換器
String exchangeName = "hello-exchange";
channel.exchangeDeclare(exchangeName, "direct", true);
//聲明隊列
String queueName = channel.queueDeclare().getQueue();
String routingKey = "hola";
//綁定隊列,經過鍵 hola 將隊列和交換器綁定起來
channel.queueBind(queueName, exchangeName, routingKey);
while(true) {
//消費消息
boolean autoAck = false;
String consumerTag = "";
channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
System.out.println("消費的路由鍵:" + routingKey);
System.out.println("消費的內容類型:" + contentType);
long deliveryTag = envelope.getDeliveryTag();
//確認消息
channel.basicAck(deliveryTag, false);
System.out.println("消費的消息體內容:");
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr);
}
});
}
}
}
複製代碼
./sbin/rabbitmq-server
複製代碼
RabbitMQ 最優秀的功能之一就是內建集羣,這個功能設計的目的是容許消費者和生產者在節點崩潰的狀況下繼續運行,以及經過添加更多的節點來線性擴展消息通訊吞吐量。RabbitMQ 內部利用 Erlang 提供的分佈式通訊框架 OTP 來知足上述需求,使客戶端在失去一個 RabbitMQ 節點鏈接的狀況下,仍是可以從新鏈接到集羣中的任何其餘節點繼續生產、消費消息。
RabbitMQ 會始終記錄如下四種類型的內部元數據:
在單一節點中,RabbitMQ 會將全部這些信息存儲在內存中,同時將標記爲可持久化的隊列、交換器、綁定存儲到硬盤上。存到硬盤上能夠確保隊列和交換器在節點重啓後可以重建。而在集羣模式下一樣也提供兩種選擇:存到硬盤上(獨立節點的默認設置),存在內存中。
若是在集羣中建立隊列,集羣只會在單個節點而不是全部節點上建立完整的隊列信息(元數據、狀態、內容)。結果是隻有隊列的全部者節點知道有關隊列的全部信息,所以當集羣節點崩潰時,該節點的隊列和綁定就消失了,而且任何匹配該隊列的綁定的新消息也丟失了。還好RabbitMQ 2.6.0以後提供了鏡像隊列以免集羣節點故障致使的隊列內容不可用。
RabbitMQ 集羣中能夠共享 user、vhost、exchange等,全部的數據和狀態都是必須在全部節點上覆制的,例外就是上面所說的消息隊列。RabbitMQ 節點能夠動態的加入到集羣中。
當在集羣中聲明隊列、交換器、綁定的時候,這些操做會直到全部集羣節點都成功提交元數據變動後才返回。集羣中有內存節點和磁盤節點兩種類型,內存節點雖然不寫入磁盤,可是它的執行比磁盤節點要好。內存節點能夠提供出色的性能,磁盤節點能保障配置信息在節點重啓後仍然可用,那集羣中如何平衡這二者呢?
RabbitMQ 只要求集羣中至少有一個磁盤節點,全部其餘節點能夠是內存節點,當節點加入或離開集羣時,它們必需要將該變動通知到至少一個磁盤節點。若是隻有一個磁盤節點,恰好又是該節點崩潰了,那麼集羣能夠繼續路由消息,但不能建立隊列、建立交換器、建立綁定、添加用戶、更改權限、添加或刪除集羣節點。換句話說集羣中的惟一磁盤節點崩潰的話,集羣仍然能夠運行,但直到該節點恢復,不然沒法更改任何東西。
若是是在一臺機器上同時啓動多個 RabbitMQ 節點來組建集羣的話,只用上面介紹的方式啓動第2、第三個節點將會由於節點名稱和端口衝突致使啓動失敗。因此在每次調用 rabbitmq-server 命令前,設置環境變量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 來明確指定惟一的節點名稱和端口。下面的例子端口號從5672開始,每一個新啓動的節點都加1,節點也分別命名爲test_rabbit_一、test_rabbit_二、test_rabbit_3。
啓動第1個節點:
RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached
複製代碼
啓動第2個節點:
RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached
複製代碼
啓動第2個節點前建議將 RabbitMQ 默認激活的插件關掉,不然會存在使用了某個插件的端口號衝突,致使節點啓動不成功。
如今第2個節點和第1個節點都是獨立節點,它們並不知道其餘節點的存在。集羣中除第一個節點外後加入的節點須要獲取集羣中的元數據,因此要先中止 Erlang 節點上運行的 RabbitMQ 應用程序,並重置該節點元數據,再加入而且獲取集羣的元數據,最後從新啓動 RabbitMQ 應用程序。
中止第2個節點的應用程序:
./sbin/rabbitmqctl -n test_rabbit_2 stop_app
複製代碼
重置第2個節點元數據:
./sbin/rabbitmqctl -n test_rabbit_2 reset
複製代碼
第2節點加入第1個節點組成的集羣:
./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost
複製代碼
啓動第2個節點的應用程序
./sbin/rabbitmqctl -n test_rabbit_2 start_app
複製代碼
第3個節點的配置過程和第2個節點相似:
RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached
./sbin/rabbitmqctl -n test_rabbit_3 stop_app
./sbin/rabbitmqctl -n test_rabbit_3 reset
./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost
./sbin/rabbitmqctl -n test_rabbit_3 start_app
複製代碼
中止某個指定的節點,好比中止第2個節點:
RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop
複製代碼
查看節點3的集羣狀態:
./sbin/rabbitmqctl -n test_rabbit_3 cluster_status
複製代碼