1. 大多應用中,可經過消息服務中間件來提高系統異步通訊、擴展解耦能力java
2. 消息服務中兩個重要概念:spring
消息代理(message broker)和目的地(destination) json
當消息發送者發送消息之後,將由消息代理接管,消息代理保證消息傳遞到指定目 的地。springboot
3. 消息隊列主要有兩種形式的目的地服務器
1. 隊列(queue):點對點消息通訊(point-to-point)網絡
2. 主題(topic):發佈(publish)/訂閱(subscribe)消息通訊多線程
一、同步處理方式app
二、多線程處理方式異步
三、消息隊列ide
1. 點對點式:
– 消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容, 消息讀取後被移出隊列 – 消息只有惟一的發送者和接受者,但並非說只能有一個接收者
2. 發佈訂閱式:
– 發送者(發佈者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那麼 就會在消息到達時同時收到消息
3. JMS(Java Message Service)JAVA消息服務:
– 基於JVM消息代理的規範。ActiveMQ、HornetMQ是JMS實現
4. AMQP(Advanced Message Queuing Protocol)
– 高級消息隊列協議,也是一個消息代理的規範,兼容JMS – RabbitMQ是AMQP的實現
JMS vs AMQP
– spring-jms提供了對JMS的支持
– spring-rabbit提供了對AMQP的支持
– 須要ConnectionFactory的實現來鏈接消息代理
– 提供JmsTemplate、RabbitTemplate來發送消息
– @JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽消息代理髮 布的消息
– @EnableJms、@EnableRabbit開啓支持
– JmsAutoConfiguration
– RabbitAutoConfiguration
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組 成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出 該消息可能須要持久性存儲)等。
Publisher
消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
Exchange
交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。 Exchange有4種類型:direct(默認),fanout, topic, 和headers,不一樣類型的Exchange轉發消息的策略有 所區別
Queue
消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息 可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
Binding
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列連 接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。 Exchange 和Queue的綁定能夠是多對多的關係。
Connection
網絡鏈接,好比一個TCP鏈接。
Channel
信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內的虛 擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這 些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,所 以引入了信道的概念,以複用一條 TCP 鏈接。
Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加 密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有 本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定, RabbitMQ 默認的 vhost 是 / 。
Broker
表示消息隊列服務器實體
AMQP 中的消息路由
• AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被 消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
• Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型: direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵, headers 交換器和 direct 交換器徹底一致,但性能差不少, 目前幾乎用不到了,因此直接看另外三種類型:
這個找官網下載就好,而後在服務端配置好對應的exchange以及queue便可
a、建立exchange
b、建立queue
c、exchange綁定隊列
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
/** * 自動配置 * 一、RabbitAutoConfiguration * 二、有自動配置了鏈接工廠ConnectionFactory; * 三、RabbitProperties 封裝了 RabbitMQ的配置 * 四、 RabbitTemplate :給RabbitMQ發送和接受消息; * 五、 AmqpAdmin : RabbitMQ系統管理功能組件; * AmqpAdmin:建立和刪除 Queue,Exchange,Binding * 六、@EnableRabbit + @RabbitListener 監聽消息隊列的內容 * */ @EnableRabbit //開啓基於註解的RabbitMQ模式 @SpringBootApplication public class Springboot02AmqpApplication { public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); } }
spring.rabbitmq.host=118.24.44.169 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.virtual-host=
package com.atguigu.amqp.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class MyAMQPConfig {
//自定義消息轉換器,將對象轉換成json數據。默認使用的是jdk的序列化類,序列化以後的消息看着不是很友好 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
package com.atguigu.amqp.bean; public class Book { private String bookName; private String author; public Book() { } public Book(String bookName, String author) { this.bookName = bookName; this.author = author; } public String getBookName() { return bookName; } public void setBookName(String bookName) { this.bookName = bookName; } public String getAuthor() { return author; } public void setAuthor(String author) { this.author = author; } @Override public String toString() { return "Book{" + "bookName='" + bookName + '\'' + ", author='" + author + '\'' + '}'; } }
package com.atguigu.amqp.service; import com.atguigu.amqp.bean.Book; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class BookService { @RabbitListener(queues = "atguigu.news") public void receive(Book book){ System.out.println("收到消息:"+book); } @RabbitListener(queues = "atguigu") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
package com.atguigu.amqp; import com.atguigu.amqp.bean.Book; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @RunWith(SpringRunner.class) @SpringBootTest public class Springboot02AmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void createExchange(){ // amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); // System.out.println("建立完成"); // amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); //建立綁定規則 // amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); //amqpAdmin.de } /** * 一、單播(點對點) */ @Test public void contextLoads() { //Message須要本身構造一個;定義消息體內容和消息頭 //rabbitTemplate.send(exchage,routeKey,message); //object默認當成消息體,只須要傳入要發送的對象,自動序列化發送給rabbitmq; //rabbitTemplate.convertAndSend(exchage,routeKey,object); Map<String,Object> map = new HashMap<>(); map.put("msg","這是第一個消息"); map.put("data", Arrays.asList("helloworld",123,true)); //對象被默認序列化之後發送出去 rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西遊記","吳承恩")); } //接受數據,如何將數據自動的轉爲json發送出去 @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("atguigu.news"); System.out.println(o.getClass()); System.out.println(o); } /** * 廣播 */ @Test public void sendMsg(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("紅樓夢","曹雪芹")); } }
若是事先沒有建立exchange、queue等,能夠經過amqpadmin管理
import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; @RunWith(SpringRunner.class) @SpringBootTest public class Springboot02AmqpApplicationTests { @Autowired AmqpAdmin amqpAdmin; @Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("建立完成"); amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); //建立綁定規則 amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); } }