RabbitMQ是一個開源的消息代理軟件(面向消息的中間件),它的核心做用就是建立消息隊列,異步接收和發送消息,MQ的全程是:Message Queue中文的意思是消息隊列。java
<!--more-->git
削峯填谷:用於應對間歇性流量提高對於系統的「破壞」,好比秒殺活動,能夠把請求先發送到消息隊列在平滑的交由系統去處理,當訪問量大於必定數量的時候,還能夠直接屏蔽後續操做,給前臺的用戶友好的顯示;github
延遲處理:能夠進行事件後置,好比訂單超時業務,用戶下單30分鐘未支付取消訂單;spring
系統解耦:消息隊列也能夠幫開發人員完成業務的解耦,好比用戶上傳頭像的功能,最初的設計是用戶上傳完以後才能發帖,後面有增長了經驗系統,須要在上傳頭像以後增長經驗值,到後來又上線了金幣系統,上傳頭像以後能夠增長金幣,像這種需求的不斷升級,若是在業務代碼裏面寫死每次該業務代碼是很不優雅的,這個時候若是使用消息隊列,那麼只須要增長一個訂閱器用於介紹用戶上傳頭像的消息,再執行經驗的增長和金幣的增長是很是簡單的,而且在不改動業務模塊業務代碼的基礎上能夠輕鬆實現,若是後期須要撤銷某個模塊了,只須要刪除訂閱器便可,就這樣就下降了系統開發的耦合性;docker
如今市面上比較主流的消息隊列還有Kafka、RocketMQ、RabbitMQ,它們的介紹和區別以下:數據庫
Kafka是LinkedIn開源的分佈式發佈-訂閱消息系統,目前歸屬於Apache定級項目。Kafka主要特色是基於Pull的模式來處理消息消費,追求高吞吐量,一開始的目的就是用於日誌收集和傳輸。0.8版本開始支持複製,對消息的重複、丟失、錯誤沒有嚴格要求,適合產生大量數據的互聯網服務的數據收集業務。安全
RabbitMQ是使用Erlang語言開發的開源消息隊列系統,基於AMQP協議來實現。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。AMQP協議更多用在企業系統內,對數據一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次。springboot
RocketMQ是阿里開源的消息中間件,它是純Java開發,具備高吞吐量、高可用性、適合大規模分佈式系統應用的特色。RocketMQ思路起源於Kafka,但並非Kafka的一個Copy,它對消息的可靠傳輸及事務性作了優化,目前在阿里集團被普遍應用於交易、充值、流計算、消息推送、日誌流式處理、binglog分發等場景。服務器
簡單總結: Kafka的性能最好,適用於對消息吞吐量達,對消息丟失不敏感的系統;RocketMQ借鑑了Kafka並提升了消息的可靠性,修復了Kafka的不足;RabbitMQ性能略低於Kafka,並實現了AMQP(Advanced Message Queuing Protocol)高級消息隊列協議的標準,有很是好的穩定性。app
支持語言對比
RabbitMQ的特色是易用、擴展性好(集羣訪問)、高可用,具體以下:
在瞭解消息通信以前首先要了解3個概念:生產者、消費者和代理。
生產者:消息的建立者,負責建立和推送數據到消息服務器;
消費者:消息的接收方,用於處理數據和確認消息;
代理:就是RabbitMQ自己,用於扮演「快遞」的角色,自己不生產消息,只是扮演「快遞」的角色。
首先你必須鏈接到Rabbit才能發佈和消費消息,那怎麼鏈接和發送消息的呢?
你的應用程序和Rabbit Server之間會建立一個TCP鏈接,一旦TCP打開,並經過了認證,認證就是你試圖鏈接Rabbit以前發送的Rabbit服務器鏈接信息和用戶名和密碼,有點像程序鏈接數據庫,使用Java有兩種鏈接認證的方式,後面代碼會詳細介紹,一旦認證經過你的應用程序和Rabbit就建立了一條AMQP信道(Channel)。
信道是建立在「真實」TCP上的虛擬鏈接,AMQP命令都是經過信道發送出去的,每一個信道都會有一個惟一的ID,不管是發佈消息,訂閱隊列或者接收消息都是經過信道完成的。
對於操做系統來講建立和銷燬TCP會話是很是昂貴的開銷,假設高峯期每秒有成千上萬條鏈接,每一個鏈接都要建立一條TCP會話,這就形成了TCP鏈接的巨大浪費,並且操做系統每秒能建立的TCP也是有限的,所以很快就會遇到系統瓶頸。
若是咱們每一個請求都使用一條TCP鏈接,既知足了性能的須要,又能確保每一個鏈接的私密性,這就是引入信道概念的緣由。
ConnectionFactory(鏈接管理器): 應用程序與Rabbit之間創建鏈接的管理器,程序代碼中使用;
Channel(信道): 消息推送使用的通道;
Exchange(交換器): 用於接受、分配消息;
Queue(隊列): 用於存儲生產者的消息;
RoutingKey(路由鍵): 用於把生成者的數據分配到交換器上;
BindingKey(綁定鍵): 用於把交換器的消息綁定到隊列上;
看到上面的解釋,最難理解的路由鍵和綁定鍵了,那麼他們具體怎麼發揮做用的,請看下圖:
RabbitMQ的Exchange(交換器)分爲四類:
其中headers交換器容許你匹配AMQP消息的header而非路由鍵,除此以外headers交換器和direct交換器徹底一致,但性能卻不好,幾乎用不到,因此咱們這裏不作解釋。
direct爲默認的交換器類型,也很是的簡單,若是路由鍵匹配的話,消息就投遞到相應的隊列,以下圖:
fanout有別於direct交換器,fanout是一種發佈/訂閱模式的交換器,當你發送一條消息的時候,交換器會把消息廣播到全部附加到這個交換器的隊列上。
注意: 對於fanout交換器來講routingKey(路由鍵)是無效的,這個參數是被忽略的。
topic交換器運行和fanout相似,可是能夠更靈活的匹配本身想要訂閱的信息,這個時候routingKey路由鍵就排上用場了,使用路由鍵進行消息(規則)匹配。
topic路由器的關鍵在於定義路由鍵,定義routingKey名稱不能超過255字節,使用「.」做爲分隔符,例如:com.mq.rabbit.error。
匹配規則
匹配表達式能夠用「*」和「#」匹配任何字符,具體規則以下:
例如發佈了一個「cn.mq.rabbit.error」的消息:
能匹配上的路由鍵:
cn.mq.rabbit.*
cn.mq.rabbit.#
#.error
cn.mq.#
#
不能匹配上的路由鍵:
cn.mq.*
*.error
*
RabbitMQ隊列和交換器有一個不可告人的祕密,就是默認狀況下重啓服務器會致使消息丟失,那麼怎麼保證Rabbit在重啓的時候不丟失呢?答案就是消息持久化。
當你把消息發送到Rabbit服務器的時候,你須要選擇你是否要進行持久化,但這並不能保證Rabbit能從崩潰中恢復,想要Rabbit消息能恢復必須知足3個條件:
持久化工做原理
Rabbit會將你的持久化消息寫入磁盤上的持久化日誌文件,等消息被消費以後,Rabbit會把這條消息標識爲等待垃圾回收。
持久化的缺點
消息持久化的優勢顯而易見,但缺點也很明顯,那就是性能,由於要寫入硬盤要比寫入內存性能較低不少,從而下降了服務器的吞吐量,儘管使用SSD硬盤可使事情獲得緩解,但他仍然吸乾了Rabbit的性能,當消息成千上萬條要寫入磁盤的時候,性能是很低的。
因此使用者要根據本身的狀況,選擇適合本身的方式。
學習更多RabbitMQ知識,訪問:https://gitbook.cn/gitchat/activity/5b558d54c28306099b47ae9c
(1)下載鏡像
https://hub.docker.com/r/library/rabbitmq/tags/
從鏡像的大小也能夠很直觀的看出來alpine是輕量版。
使用命令:
docker pull rabbitmq:3.7.7-management
下載帶management插件的版本。
(2)運行RabbitMQ
使用命令:
docker run -d --hostname myrabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.7-management
正常啓動以後,訪問:http://localhost:15672/
登陸網頁管理頁面,用戶名密碼:guest/guest,登陸成功以下圖:
若是用Idea建立新項目,能夠直接在建立Spring Boot的時候,點擊「Integration」面板,選擇RabbitMQ集成,以下圖:
若是是老Maven項目,直接在pom.xml添加以下代碼:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在application.properties設置以下信息:
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=test spring.rabbitmq.password=test
3.3 代碼
本節分別來看三種交換器:direct、fanout、topic的實現代碼。
建立DirectConfig.java代碼以下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DirectConfig { final static String QUEUE_NAME = "direct"; //隊列名稱 final static String EXCHANGE_NAME = "mydirect"; //交換器名稱 @Bean public Queue queue() { // 聲明隊列 參數一:隊列名稱;參數二:是否持久化 return new Queue(DirectConfig.QUEUE_NAME, false); } // 配置默認的交換機,如下部分均可以不配置,不設置使用默認交換器(AMQP default) @Bean DirectExchange directExchange() { // 參數一:交換器名稱;參數二:是否持久化;參數三:是否自動刪除消息 return new DirectExchange(DirectConfig.EXCHANGE_NAME, false, false); } // 綁定「direct」隊列到上面配置的「mydirect」路由器 @Bean Binding bindingExchangeDirectQueue(Queue directQueue, DirectExchange directExchange) { return BindingBuilder.bind(directQueue).to(directExchange).with(DirectConfig.QUEUE_NAME); } }
建立Sender.java代碼以下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 消息發送者-生產消息 */ @Component public class Sender { @Autowired private AmqpTemplate rabbitTemplate; public void driectSend(String message) { System.out.println("Direct 發送消息:" + message); //參數一:交換器名稱,能夠省略(省略存儲到AMQP default交換器);參數二:路由鍵名稱(direct模式下路由鍵=隊列名稱);參數三:存儲消息 this.rabbitTemplate.convertAndSend("direct", message); } }
注意:
建立Receiver.java代碼以下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息接收者-消費消息 */ @Component @RabbitListener(queues = "direct") public class Receiver { @Autowired private AmqpTemplate rabbitTemplate; @RabbitHandler /** * 監聽消費消息 */ public void process(String message) { System.out.println("Direct 消費消息:" + message); } }
使用Spring Boot中的默認測試框架JUnit進行單元測試,不瞭解JUnit的能夠參考個人上一篇文章,建立MQTest.java代碼以下:
package com.example.rabbitmq.mq; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; import static org.junit.Assert.*; @RunWith(SpringRunner.class) @SpringBootTest public class MQTest { @Autowired private Sender sender; @Test public void driectTest() { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); sender.driectSend("Driect Data:" + sf.format(new Date())); } }
執行以後,效果以下圖:
表示消息已經被髮送並被消費了。
建立FanoutConfig.java代碼以下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { final static String QUEUE_NAME = "fanout"; //隊列名稱 final static String QUEUE_NAME2 = "fanout2"; //隊列名稱 final static String EXCHANGE_NAME = "myfanout"; //交換器名稱 @Bean public Queue queueFanout() { return new Queue(FanoutConfig.QUEUE_NAME); } @Bean public Queue queueFanout2() { return new Queue(FanoutConfig.QUEUE_NAME2); } //配置交換器 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(FanoutConfig.EXCHANGE_NAME); } // 綁定隊列到交換器 @Bean Binding bindingFanoutExchangeQueue(Queue queueFanout, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueFanout).to(fanoutExchange); } // 綁定隊列到交換器 @Bean Binding bindingFanoutExchangeQueue2(Queue queueFanout2, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueFanout2).to(fanoutExchange); } }
建立FanoutSender.java代碼以下:
package com.example.rabbitmq.mq; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class FanoutSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String message) { System.out.println("發送消息:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME, message); } public void send2(String message) { System.out.println("發送消息2:" + message); this.rabbitTemplate.convertAndSend(FanoutConfig.EXCHANGE_NAME,FanoutConfig.QUEUE_NAME2, message); } }
建立兩個監聽類,第一個FanoutReceiver.java代碼以下:
package com.example.rabbitmq.mq; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component @RabbitListener(queues = "fanout") public class FanoutReceiver { @RabbitHandler public void process(String msg) { System.out.println("Fanout(FanoutReceiver)消費消息:" + msg); } }
第二個FanoutReceiver2.java代碼以下:
package com.example.rabbitmq.mq; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "fanout2") public class FanoutReceiver2 { @RabbitHandler public void process(String message) { System.out.println("Fanout(FanoutReceiver2)消費消息:" + message); } }
建立FanoutTest.java代碼以下:
package com.example.rabbitmq.mq; import com.example.rabbitmq.RabbitmqApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; @RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitmqApplication.class) public class FanoutTest { @Autowired private FanoutSender sender; @Test public void Test() throws InterruptedException { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); sender.send("Time1 => " + sf.format(new Date())); sender.send2("Date2 => " + sf.format(new Date())); } }
運行測試代碼,輸出結果以下:
發送消息:Time1 => 2018-09-11 發送消息2:Date2 => 2018-09-11 Fanout(FanoutReceiver2)消費消息:Time1 => 2018-09-11 Fanout(FanoutReceiver2)消費消息:Date2 => 2018-09-11 Fanout(FanoutReceiver)消費消息:Time1 => 2018-09-11 Fanout(FanoutReceiver)消費消息:Date2 => 2018-09-11
總結: 能夠看出fanout會把消息分發到全部訂閱到該交換器的隊列,fanout模式是忽略路由鍵的。
@Configuration public class TopicConfig { final static String QUEUE_NAME = "log"; final static String QUEUE_NAME2 = "log.all"; final static String QUEUE_NAME3 = "log.all.error"; final static String EXCHANGE_NAME = "topicExchange"; //交換器名稱 @Bean public Queue queuetopic() { return new Queue(TopicConfig.QUEUE_NAME); } @Bean public Queue queuetopic2() { return new Queue(TopicConfig.QUEUE_NAME2); } @Bean public Queue queuetopic3() { return new Queue(TopicConfig.QUEUE_NAME3); } // 配置交換器 @Bean TopicExchange topicExchange() { return new TopicExchange(TopicConfig.EXCHANGE_NAME); } // 綁定隊列到交換器,並設置路由鍵(log.#) @Bean Binding bindingtopicExchangeQueue(Queue queuetopic, TopicExchange topicExchange) { return BindingBuilder.bind(queuetopic).to(topicExchange).with("log.#"); } // 綁定隊列到交換器,並設置路由鍵(log.*) @Bean Binding bindingtopicExchangeQueue2(Queue queuetopic2, TopicExchange topicExchange) { return BindingBuilder.bind(queuetopic2).to(topicExchange).with("log.*"); } // 綁定隊列到交換器,並設置路由鍵(log.*.error) @Bean Binding bindingtopicExchangeQueue3(Queue queuetopic3, TopicExchange topicExchange) { return BindingBuilder.bind(queuetopic3).to(topicExchange).with("log.*.error"); } }
@Component public class TopicSender { @Autowired private AmqpTemplate rabbitTemplate; public void topicSender(String message) { String routingKey = "log.all.error"; System.out.println(routingKey + " 發送消息:" + message); this.rabbitTemplate.convertAndSend(TopicConfig.EXCHANGE_NAME, routingKey, message); } }
@Component @RabbitListener(queues = "log") public class TopicReceiver { @RabbitHandler public void process(String msg) { System.out.println("log.# 消費消息:" + msg); } }
@Component @RabbitListener(queues = "log.all") public class TopicReceiver2 { @RabbitHandler public void process(String msg) { System.out.println("log.* 消費消息:" + msg); } }
@Component @RabbitListener(queues = "log.all.error") public class TopicReceiver3 { @RabbitHandler public void process(String msg) { System.out.println("log.*.error 消費消息:" + msg); } }
@RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitmqApplication.class) public class FanoutTest { @Autowired private FanoutSender fanoutSender; @Test public void Test() { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); fanoutSender.send("Time1 => " + sf.format(new Date())); fanoutSender.send2("Date2 => " + sf.format(new Date())); } }
輸出結果:
log.all.error 發送消息:time => 2018-09-11 log.# 消費消息:time => 2018-09-11 log.*.error 消費消息:time => 2018-09-11
總結: 在Topic Exchange中「#」能夠匹配全部內容,而「*」則是匹配一個字符段的內容。
以上示例代碼Github地址:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
參考文檔
阿里 RocketMQ 優點對比:https://juejin.im/entry/5a0abfb5f265da43062a4a91