一 RabbitMQ的介紹 java
RabbitMQ是消息中間件的一種,消息中間件即分佈式系統中完成消息的發送和接收的基礎軟件.這些軟件有不少,包括ActiveMQ(apache公司的),RocketMQ(阿里巴巴公司的,現已經轉讓給apache).程序員
消息中間件的工做過程能夠用生產者消費者模型來表示.即,生產者不斷的向消息隊列發送信息,而消費者從消息隊列中消費信息.具體過程以下:spring
從上圖可看出,對於消息隊列來講,生產者,消息隊列,消費者是最重要的三個概念,生產者發消息到消息隊列中去,消費者監聽指定的消息隊列,而且當消息隊列收到消息以後,接收消息隊列傳來的消息,而且給予相應的處理.消息隊列經常使用於分佈式系統之間互相信息的傳遞.apache
對於RabbitMQ來講,除了這三個基本模塊之外,還添加了一個模塊,即交換機(Exchange).它使得生產者和消息隊列之間產生了隔離,生產者將消息發送給交換機,而交換機則根據調度策略把相應的消息轉發給對應的消息隊列.那麼RabitMQ的工做流程以下所示:tomcat
緊接着說一下交換機.交換機的主要做用是接收相應的消息而且綁定到指定的隊列.交換機有四種類型,分別爲Direct,topic,headers,Fanout.springboot
Direct是RabbitMQ默認的交換機模式,也是最簡單的模式.即建立消息隊列的時候,指定一個BindingKey.當發送者發送消息的時候,指定對應的Key.當Key和消息隊列的BindingKey一致的時候,消息將會被髮送到該消息隊列中.app
topic轉發信息主要是依據通配符,隊列和交換機的綁定主要是依據一種模式(通配符+字符串),而當發送消息的時候,只有指定的Key和該模式相匹配的時候,消息纔會被髮送到該消息隊列中.maven
headers也是根據一個規則進行匹配,在消息隊列和交換機綁定的時候會指定一組鍵值對規則,而發送消息的時候也會指定一組鍵值對規則,當兩組鍵值對規則相匹配的時候,消息會被髮送到匹配的消息隊列中.分佈式
Fanout是路由廣播的形式,將會把消息發給綁定它的所有隊列,即使設置了key,也會被忽略.ide
二.SpringBoot整合RabbitMQ(Direct模式)
SpringBoot整合RabbitMQ很是簡單!感受SpringBoot真的極大簡化了開發的搭建環境的時間..這樣咱們程序員就能夠把更多的時間用在業務上了,下面開始搭建環境:
首先建立兩個maven工程,這是爲了模擬分佈式應用系統中,兩個應用之間互相交流的過程,一個發送者(Sender),一個接收者(Receiver)
緊接着,配置pom.xml文件,注意其中用到了springboot對於AMQP(高級消息隊列協議,即面向消息的中間件的設計)
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.0.RELEASE</version> </parent> <properties> <java.version>1.7</java.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> <scope>true</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- 添加springboot對amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.tomcat.embed</groupId> <artifactId>tomcat-embed-jasper</artifactId> <scope>provided</scope> </dependency> </dependencies>
緊接着,咱們編寫發送者相關的代碼.首先毫無疑問,要書寫啓動類:
@SpringBootApplication public class App{ public static void main(String[] args) { SpringApplication.run(App.class, args); } }
接着在application.properties中,去編輯和RabbitMQ相關的配置信息,配置信息的表明什麼內容根據鍵就能很直觀的看出了.這裏端口是5672,不是15672...15672是管理端的端口!
spring.application.name=spirng-boot-rabbitmq-sender spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
隨後,配置Queue(消息隊列).那注意因爲採用的是Direct模式,須要在配置Queue的時候,指定一個鍵,使其和交換機綁定.
@Configuration public class SenderConf { @Bean public Queue queue() { return new Queue("queue"); } }
接着就能夠發送消息啦!在SpringBoot中,咱們使用AmqpTemplate去發送消息!代碼以下:
@Component public class HelloSender { @Autowired private AmqpTemplate template; public void send() { template.convertAndSend("queue","hello,rabbit~"); } }
編寫測試類!這樣咱們的發送端代碼就編寫完了~
@SpringBootTest(classes=App.class) @RunWith(SpringJUnit4ClassRunner.class) public class TestRabbitMQ { @Autowired private HelloSender helloSender; @Test public void testRabbit() { helloSender.send(); } }
接着咱們編寫接收端.接收端的pom文件,application.properties(修改spring.application.name),Queue配置類,App啓動類都是一致的!這裏省略不計.主要在於咱們須要配置監聽器去監聽綁定到的消息隊列,當消息隊列有消息的時候,予以接收,代碼以下:
@Component public class HelloReceive { @RabbitListener(queues="queue") //監聽器監聽指定的Queue public void processC(String str) { System.out.println("Receive:"+str); } }
接下來就能夠測試啦,首先啓動接收端的應用,緊接着運行發送端的單元測試,接收端應用打印出來接收到的消息,測試即成功!
須要注意的地方,Direct模式至關於一對一模式,一個消息被髮送者發送後,會被轉發到一個綁定的消息隊列中,而後被一個接收者接收!
實際上RabbitMQ還能夠支持發送對象:固然因爲涉及到序列化和反序列化,該對象要實現Serilizable接口.HelloSender作出以下改寫:
public void send() { User user=new User(); //實現Serializable接口 user.setUsername("hlhdidi"); user.setPassword("123"); template.convertAndSend("queue",user); }
HelloReceiver作出以下改寫:
@RabbitListener(queues="queue") //監聽器監聽指定的Queue public void process1(User user) { //用User做爲參數 System.out.println("Receive1:"+user); }
三.SpringBoot整合RabbitMQ(Topic轉發模式)
首先咱們看發送端,咱們須要配置隊列Queue,再配置交換機(Exchange),再把隊列按照相應的規則綁定到交換機上:
@Configuration public class SenderConf { @Bean(name="message") public Queue queueMessage() { return new Queue("topic.message"); } @Bean(name="messages") public Queue queueMessages() { return new Queue("topic.messages"); } @Bean public TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("message") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一個詞,#表示零個或多個詞 } }
而在接收端,咱們配置兩個監聽器,分別監聽不一樣的隊列:
@RabbitListener(queues="topic.message") //監聽器監聽指定的Queue public void process1(String str) { System.out.println("message:"+str); } @RabbitListener(queues="topic.messages") //監聽器監聽指定的Queue public void process2(String str) { System.out.println("messages:"+str); }
好啦!接着咱們能夠進行測試了!首先咱們發送以下內容:
方法的第一個參數是交換機名稱,第二個參數是發送的key,第三個參數是內容,RabbitMQ將會根據第二個參數去尋找有沒有匹配此規則的隊列,若是有,則把消息給它,若是有不止一個,則把消息分發給匹配的隊列(每一個隊列都有消息!),顯然在咱們的測試中,參數2匹配了兩個隊列,所以消息將會被髮放到這兩個隊列中,而監聽這兩個隊列的監聽器都將收到消息!那麼若是把參數2改成topic.messages呢?顯然只會匹配到一個隊列,那麼process2方法對應的監聽器收到消息!
四.SpringBoot整合RabbitMQ(Fanout Exchange形式)
那前面已經介紹過了,Fanout Exchange形式又叫廣播形式,所以咱們發送到路由器的消息會使得綁定到該路由器的每個Queue接收到消息,這個時候就算指定了Key,或者規則(即上文中convertAndSend方法的參數2),也會被忽略!那麼直接上代碼,發送端配置以下:
@Configuration public class SenderConf { @Bean(name="Amessage") public Queue AMessage() { return new Queue("fanout.A"); } @Bean(name="Bmessage") public Queue BMessage() { return new Queue("fanout.B"); } @Bean(name="Cmessage") public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange");//配置廣播路由器 } @Bean Binding bindingExchangeA(@Qualifier("Amessage") Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(@Qualifier("Bmessage") Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(@Qualifier("Cmessage") Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
發送端使用以下代碼發送:
接收端監聽器配置以下:
@Component public class HelloReceive { @RabbitListener(queues="fanout.A") public void processA(String str1) { System.out.println("ReceiveA:"+str1); } @RabbitListener(queues="fanout.B") public void processB(String str) { System.out.println("ReceiveB:"+str); } @RabbitListener(queues="fanout.C") public void processC(String str) { System.out.println("ReceiveC:"+str); } }
運行測試代碼,發現三個監聽器都接收到了數據,測試成功!