RabbitMQ核心組件及應用場景

1、適用場景

1.解耦
2.最終一致性
3.廣播
4.錯峯與流控(秒殺業務用於流量削峯場景)

秒殺場景html

2、核心組件,關鍵點(交換器、隊列、綁定)

AMPQ消息路由必要三部分:交換器、隊列、綁定。java

Java核心組件:ConnectionFactory、Connection、Channel、Delivery、DeliverCallback、CancelCallbacklinux

隊列

1. 創建鏈接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
2. 聲明隊列

若是在同一條信道上訂閱了另外一個隊列,那就不能再聲明隊列,必須先取消訂閱。c++

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

queue:須要指定隊列名稱,若是不指定,MQ會隨機分配一個並在queue.declare命令中返回,git

durable:隊列將在服務器重啓後存在。github

exclusive:爲true時,隊列變成私有的。服務器

autoDelete: 爲true時,當最後一個消費者取消訂閱時,隊列自動移除。架構

3. 消費者經過AMQP的basic.consume命令訂閱消息,將信道置爲接收模式。

Java代碼Channel:併發

String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
4. 當有多個消費者存在時,隊列裏的消息將以循環的方式發送給消費者。消費者接收到消息後必須進行確認,可經過basic.ack顯示確認:

Java代碼Channel:tcp

void basicAck(long deliveryTag, boolean multiple) throws IOException;

上面的手動確認,第二個參數爲true,批量確認;若是爲false,會一次確認一條。當有耗時任務時,能夠利用手動確認延遲確認消息,防止消息大量涌入應用致使過載。
也能夠在訂閱隊列時就將basicConsume方法的autoAck參數設置爲true,開啓自動確認。確認成功後rabbitmq會從隊列中刪除消息。

5. 若是在確認過程當中和rabbitmq服務器斷鏈,那麼這條消息就會發送給下一個消費者。可使用basic.reject拒絕消息:

Java代碼Channel:

void basicReject(long deliveryTag, boolean requeue) throws IOException;

第二個參數requeue設置爲true,消息會從新排隊併發送給下一個消費者;爲false則會丟棄該條消息。能夠利用此性質丟棄錯誤格式的消息。

6. 發佈消息
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

exchange:第一個參數是交換的名稱。空字符串表示默認或無名交換,消息經過routingKey路由到指定隊列。

交換器和綁定

1. 交換器一共有四種類型:fanout、direct 、topic、headers。

Java中Channel申明交換器:

Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;

BuiltinExchangeType對應有四種枚舉類型:

DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
2. 隊列經過路由鍵(routing key)綁定到交換器。
channel.queueBind(String queue, String exchange, String routingKey)

RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。實際上,生產者甚至不知道消息是否會被傳遞到哪一個隊列。

不指定隊列名時,經過服務器隨機生成隊列名稱:

String queueName = channel.queueDeclare().getQueue();

不傳參數時,queueDeclare生成一個非持久的,獨佔的自動刪除隊列

在linux服務器上能夠經過命令查看全部交換器:

rabbitmqctl list_exchanges
3. fanout廣播方式

廣播方式會將消息投遞給全部附加在此交換器的隊列。

4. direct模式

direct類型在綁定時設定一個routing_key,消息的routing_key匹配時, 纔會被交換器投遞到綁定的隊列中去.

5. topic模式

按規則投遞,經過通配符#和*組合

*(星號)能夠替代一個單詞。

#(hash)能夠替換零個或多個單詞。

持久化

將隊列和交換器的durable屬性設置爲true

申明隊列時:

channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;

申明交換器時:

channel.exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;

投遞消息時將投遞模式(delivery mode)設置爲2,Java代碼中MessageProperties.PERSISTENT_TEXT_PLAIN來設置:

channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

在MessageProperties源碼中以下所示:

/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
    new BasicProperties("text/plain",
                        null,
                        null,
                        2,
                        0, null, null, null,
                        null, null, null, null,
                        null, null);

實現事務功能(發送方確認)

使用事務會使rabbitmq的性能大大下降,爲了不這個問題,rabbitmq支持:發送方確認模式。經過這個模式來保證消息的投遞。當生產者P投遞消息後會等待消費者C發送確認,P收到確認後能夠調用回調函數處理相關業務。在生產者P等待確認的同時也能夠繼續發送下一條消息。

服務器管理

一、虛擬主機vhost

rabbitmq支持建立虛擬主機,默認的虛擬主機爲「/」,默認用戶guest;當在rabbitmq集羣中建立虛擬主機時,整個集羣都會建立。

二、錯誤日誌查看

rabbitmq的日誌文件在/var/log/rabbitmq/下的rabbit@[localhost].log

三、rabbitmq配置文件

配置文件在rpm安裝/usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example,複製一份:

cp /usr/share/doc/rabbitmq-server-3.5.3/rabbitmq.config.example /etc/rabbitmq.config

參考官網配置:https://www.rabbitmq.com/configure.html#configuration-file

3、底層原理,主要實現

應用程序和rabbitmq服務器之間創建一條tcp鏈接,tcp鏈接打開後,應用程序就能夠和rabbitmq建立多條AMQP信道,信道是創建在tcp鏈接上的虛擬鏈接。

4、同類技術產品比較

1.ActiveMQ

優勢

單機吞吐量:萬級
topic數量都吞吐量的影響:
時效性:ms級
可用性:高,基於主從架構實現高可用性
消息可靠性:有較低的機率丟失數據
功能支持:MQ領域的功能極其完備
缺點:

官方社區如今對ActiveMQ 5.x維護愈來愈少,較少在大規模吞吐的場景中使用。

2.Kafka

號稱大數據的殺手鐗,談到大數據領域內的消息傳輸,則繞不開Kafka,這款爲大數據而生的消息中間件,以其百萬級TPS的吞吐量名聲大噪,迅速成爲大數據領域的寵兒,在數據採集、傳輸、存儲的過程當中發揮着舉足輕重的做用。

Apache Kafka它最初由LinkedIn公司基於獨特的設計實現爲一個分佈式的提交日誌系統( a distributed commit log),以後成爲Apache項目的一部分。

目前已經被LinkedIn,Uber, Twitter, Netflix等大公司所採納。

優勢

性能卓越,單機寫入TPS約在百萬條/秒,最大的優勢,就是吞吐量高。
時效性:ms級
可用性:很是高,kafka是分佈式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會致使不可用
消費者採用Pull方式獲取消息, 消息有序, 經過控制可以保證全部消息被消費且僅被消費一次;
有優秀的第三方Kafka Web管理界面Kafka-Manager;
在日誌領域比較成熟,被多家公司和多個開源項目使用;
功能支持:功能較爲簡單,主要支持簡單的MQ功能,在大數據領域的實時計算以及日誌採集被大規模使用
缺點:

Kafka單機超過64個隊列/分區,Load會發生明顯的飆高現象,隊列越多,load越高,發送消息響應時間變長
使用短輪詢方式,實時性取決於輪詢間隔時間;
消費失敗不支持重試;
支持消息順序,可是一臺代理宕機後,就會產生消息亂序;
社區更新較慢;

3.RabbitMQ

RabbitMQ 2007年發佈,是一個在AMQP(高級消息隊列協議)基礎上完成的,可複用的企業消息系統,是當前最主流的消息中間件之一。

RabbitMQ優勢:

因爲erlang語言的特性,mq 性能較好,高併發;
吞吐量到萬級,MQ功能比較完備
健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全;
開源提供的管理界面很是棒,用起來很好用
社區活躍度高;
RabbitMQ缺點:

erlang開發,很難去看懂源碼,基本職能依賴於開源社區的快速維護和修復bug,不利於作二次開發和維護。
RabbitMQ確實吞吐量會低一些,這是由於他作的實現機制比較重。
須要學習比較複雜的接口和協議,學習和維護成本較高。

4.RocketMQ

RocketMQ出自 阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並作出了本身的一些改進。

RocketMQ在阿里集團被普遍應用在訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等場景。

RocketMQ優勢:

單機吞吐量:十萬級
可用性:很是高,分佈式架構
消息可靠性:通過參數優化配置,消息能夠作到0丟失
功能支持:MQ功能較爲完善,仍是分佈式的,擴展性好
支持10億級別的消息堆積,不會由於堆積致使性能降低
源碼是java,咱們能夠本身閱讀源碼,定製本身公司的MQ,能夠掌控
RocketMQ缺點:

支持的客戶端語言很少,目前是java及c++,其中c++不成熟;
社區活躍度通常
沒有在 mq 核心中去實現JMS等接口,有些系統要遷移須要修改大量代碼

github代碼

https://github.com/LighterTang/RabbitMQUtils/tree/master/rabbitmq_base/src/main/java/com/mq/mqutils/mqdoc

參考資料

https://www.rabbitmq.com

http://youzhixueyuan.com/comparison-of-kafka-rocketmq-rabbitmq.html

相關文章
相關標籤/搜索