RabbitMQ是一款基於AMQP(消息隊列協議),由Erlang開發的開源消息隊列組件。是一款優秀的消息隊列組件,他由兩部分組成:服務端和客戶端,客戶端支持多種語言的驅動,如:.Net、JAVA、Erlang等。
對於RabbitMQ來講,生產者和消息隊列之間存在隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列。消費者經過讀取消息隊列從而實現消息的發送和接收。html
交換機的主要做用是接收相應的消息而且綁定到指定的隊列.交換機有四種類型,分別爲Direct,topic,headers,Fanout.git
Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即建立消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中.github
topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息纔會被髮送到該消息隊列中.web
headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被髮送到匹配的消息隊列中.spring
Fanout是路由廣播的形式,將會把消息發給綁定它的所有隊列,即使設置了key,也會被忽略.c#
消息隊列兩個用處:服務間解耦,緩解壓力(削峯平谷);
RabbitMQ實現了AQMP協議,AQMP協議定義了消息路由規則和方式。生產端經過路由規則發送消息到不一樣queue,消費端根據queue名稱消費消息。
RabbitMQ既支持內存隊列也支持持久化隊列,消費端爲推模型,消費狀態和訂閱關係由服務端負責維護,消息消費完後當即刪除,不保留歷史消息。
(1)點對點
生產端發送一條消息經過路由投遞到Queue,只有一個消費者能消費到。springboot
(2)多訂閱
當RabbitMQ須要支持多訂閱時,發佈者發送的消息經過路由同時寫到多個Queue,不一樣訂閱組消費不一樣的Queue。因此支持多訂閱時,消息會多個拷貝。服務器
二、安裝RabbitMQ服務端
(1)下載Erlang安裝包:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度網盤地址)
(2)安裝和配置RabbitMQ服務端,3.6.0版本:https://pan.baidu.com/s/1bEbYnc#list/path=%2F(百度網盤地址)
(官方:安裝Erland,經過官方下載頁面http://www.erlang.org/downloads獲取exe安裝包,直接打開並完成安裝。
安裝RabbitMQ,經過官方下載頁面https://www.rabbitmq.com/download.html獲取exe安裝包。)
(3)啓用web管理插件:rabbitmq-plugins enable rabbitmq_management
(4)啓動RabbitMQ:chkconfig rabbitmq-server on /sbin/service rabbitmq-server start
(5)防火牆開通端口
# firewall-cmd --permanent --zone=public --add-port=5672/tcp
# firewall-cmd --permanent --zone=public --add-port=15672/tcp
# firewall-cmd --reload
(6)rabbitmq默認會建立guest帳號,只能用於localhost登陸頁面管理員,本機訪問地址:http://localhost:15672/app
三.SpringBoot整合RabbitMQ(Topic轉發模式)tcp
首先咱們看發送端,咱們須要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規則綁定到交換機上:
首先 配置pom.xml
<!-- 添加springboot對amqp的支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
接着在application.properties中,去編輯和RabbitMQ相關的配置信息,配置信息的表明什麼內容根據鍵就能很直觀的看出了.這裏端口是5672,不是15672...15672是管理端的端口!
server.port=17080
#mq
spring.application.name=spirng-boot-rabbitmq-sender
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=ncs
spring.rabbitmq.password=12345678
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
咱們看發送端,咱們須要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規則綁定到交換機上:
package com.example.demo.conf; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * Created by ningcs on 2017/10/30. */ @Configuration public class SenderConf { @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞 } }
在SpringBoot中,咱們使用AmqpTemplate去發送消息!代碼以下:
package com.example.demo.sender; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Created by ningcs on 2017/10/30. */ @Component public class HelloSender { @Autowired private AmqpTemplate template; public void send(String msg) { System.out.println(msg); template.convertAndSend("exchange","topic.message","hello,rabbit~"); } }
測試
package com.example.demo.controller; import com.example.demo.sender.HelloSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; /** * Created by ningcs on 2017/10/30. */ @RestController @RequestMapping("rabbit") public class RabbitController { @Autowired private HelloSender helloSender; @RequestMapping(value = "/hello",method = {RequestMethod.GET, RequestMethod.POST}) public String helloSender(){ helloSender.send("hello,rabbit~"); return "發送成功"; } }
接收端(能夠放在兩個項目,配置文件同樣):
package com.example.demo.receiver;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Created by ningcs on 2017/10/30.
*/
@Component
public class HelloReceive {
// @RabbitListener(queues="queue") //監聽器監聽指定的Queue
// public void processC(String str) {
// System.out.println("Receive:"+str);
// }
@RabbitListener(queues="topic.message") //監聽器監聽指定的Queue
public void process1(String str) {
System.out.println("message:"+str);
}
@RabbitListener(queues="topic.messages") //監聽器監聽指定的Queue
public void process2(String str) {
System.out.println("messages:"+str);
}
}
訪問地址:http://localhost:17080/rabbit/hello
控制檯輸出如下日誌說明成功:
2017-11-02 11:30:11.982 INFO 4476 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17080 (http)
2017-11-02 11:30:11.994 INFO 4476 --- [ main] d.SpringbootRabbitTopicSenderApplication : Started SpringbootRabbitTopicSenderApplication in 12.292 seconds (JVM running for 12.829)
2017-11-02 11:31:06.712 INFO 4476 --- [io-17080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet'
2017-11-02 11:31:06.713 INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2017-11-02 11:31:06.729 INFO 4476 --- [io-17080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 16 ms
hello,rabbit~
hello,rabbit~
2017-11-02 11:30:13.165 INFO 11064 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 17081 (http)
2017-11-02 11:30:13.170 INFO 11064 --- [ main] SpringbootRabbitTopicReceiverApplication : Started SpringbootRabbitTopicReceiverApplication in 13.48 seconds (JVM running for 17.121)
messages:hello,rabbit~
message:hello,rabbit~
messages:hello,rabbit~
message:hello,rabbit~
Fanout 和Direct模式詳見最下面github地址。
術語解釋
Broker:簡單來講就是消息隊列服務器實體。
Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列。
Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列。
Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來。
Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。
vhost:虛擬主機,一個broker裏能夠開設多個vhost,用做不一樣用戶的權限分離。
producer:消息生產者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務。
RabbitMQ消息隊列詳細介紹(主要涉及術語)
http://blog.csdn.net/leyangjun/article/details/52529047
通訊協議AMQP(Advanced Message Queuing Protocol)
AMQP模型中,消息在producer中產生,發送到MQ的exchange上,exchange根據配置的路由方式發到相應的Queue上,Queue又將消息發送給consumer,消息從queue到consumer有push和pull兩種方式。 消息隊列的使用過程大概以下:
客戶端鏈接到消息隊列服務器,打開一個channel。
客戶端聲明一個exchange,並設置相關屬性。
客戶端聲明一個queue,並設置相關屬性。
客戶端使用routing key,在exchange和queue之間創建好綁定關係。
客戶端投遞消息到exchange。
exchange接收到消息後,就根據消息的key和已經設置的binding,進行消息路由,將消息投遞到一個或多個隊列裏。
RabbitMQ消息隊列-RabbitMQ的優劣勢及產生背景
推薦博客:
http://blog.csdn.net/super_rd/article/details/70229714
詳細代碼訪問github:https://github.com/ningcs/Springboot-rabbit-mq