可經過消息服務中間件來提高系統異步通訊、擴展解耦能力html
消息服務中兩個重要概念:消息代理(message broker)和目的地(destination)當消息發送者發送消息之後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。java
消息隊列主要有兩種形式的目的地:python
隊列(queue):點對點消息通訊(point-to-point)linux
主題(topic):發佈(publish)/訂閱(subscribe)消息通訊web
注:經過ActiveMQ的學習便可知道以上的概念redis
在未引入消息中間件的狀況下,響應時間並不能降到最低;在引入消息中間件時,響應時間由150ms下降爲55ms;spring
在秒殺系統中,咱們能夠引入消息隊列進行流量削峯。如,5件商品,100人搶購,若是搶購完了,則後面搶購的消息所有拒絕。docker
消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取後被移出隊列json
消息只有惟一的發送者和接受者,但並非說只能有一個接收者瀏覽器
發送者(發佈者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那麼就會在消息到達時同時收到消息.類比微信公衆號
JAVA消息服務(Java Message Service),基於JVM消息代理的規範。ActiveMQ、HornetMQ是JMS實現
高級消息隊列協議(Advanced Message Queuing Protocol),也是一個消息代理的規範,兼容JMS.RabbitMQ是AMQP的實現;
spring-jms提供了對JMS的支持;
spring-rabbit提供了對AMQP的支持;
須要ConnectionFactory的實現來鏈接消息代理;
提供JmsTemplate、RabbitTemplate來發送消息;
@JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽消息代理髮布的消息;
@EnableJms、@EnableRabbit開啓支持;
JmsAutoConfiguration
RabbitAutoConfiguration
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。
Message:消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
Publisher:消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
Exchange:交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列。Exchange有4種類型:direct(默認),fanout, topic, 和headers,不一樣類型的Exchange轉發消息的策略有所區別.
Queue:消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
Binding:綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。Exchange 和Queue的綁定能夠是多對多的關係。
Connection:網絡鏈接,好比一個TCP鏈接。
Channel:信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內的虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念,以複用一條 TCP 鏈接。
Consumer:消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Virtual Host虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是/。
Broker:表示消息隊列服務器實體。
AMQP 中消息的路由過程和 Java 開發者熟悉的 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者把消息發佈到 Exchange 上,消息最終到達隊列並被消費者接收,而 Binding 決定交換器的消息應該發送到那個隊列。
Exchange分發消息時根據類型的不一樣分發策略有區別,目前共四種類型:direct、fanout、topic、headers;headers 匹配 AMQP 消息的 header 而不是路由鍵, headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:
如,usa.news和usa.weather會匹配到usa.#上;而usa.weather和europe.weather就會匹配到#.weather;相似於模糊匹配
https://www.cnblogs.com/HOsystem/p/13789551.html |
https://www.cnblogs.com/HOsystem/p/13789551.html |
在docker上pull rabbitmq
[root@hosystem ~]# docker pull rabbitmq Using default tag: latest latest: Pulling from library/rabbitmq 171857c49d0f: Pull complete 419640447d26: Pull complete 61e52f862619: Pull complete 856781f94405: Pull complete 125d5ee3d600: Pull complete 42de77c4d197: Pull complete 4d65f87814dd: Pull complete f6c0bf06039f: Pull complete 01671add1b7b: Pull complete 088ff84cf8cb: Pull complete Digest: sha256:3da3bcd2167a1fc9bdbbc40ec0ae2b195df5df05e3c10c64569c969cb3d86435 Status: Downloaded newer image for rabbitmq:latest docker.io/library/rabbitmq:latest [root@hosystem ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE redis latest 62f1d3402b78 4 days ago 104MB rabbitmq latest ea2bf0a30abf 4 weeks ago 156MB hello-world latest bf756fb1ae65 10 months ago 13.3kB |
經過docker啓動rabbitmq
[root@hosystem ~]# docker images REPOSITORY TAG IMAGE ID CREATED SIZE rabbitmq latest ea2bf0a30abf 4 weeks ago 156MB [root@hosystem ~]# docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq ea2bf0a30abf e687835a6ea784d55717dc402d5d447d62e486e78f6c770ec703dfdec3d64f16 [root@hosystem ~]# |
-d:表示後臺啓動
-p:進行端口映射
--name:從新名,修改爲咱們想要的名字
訪問rabbitmq,由於個人ip爲192.168.188.198因此只要在瀏覽器上輸入192.168.188.198:15672便可;帳號:guest 密碼:guest
注:rabbitMQ啓動後用web訪問顯示服務器拒絕訪問,用如下方法解決
#添加防火牆規則 [root@hosystem ~]# firewall-cmd --permanent --zone=public --add-port=15672/tcp success [root@hosystem ~]# firewall-cmd --reload success |
#https://blog.csdn.net/tl1242616458/article/details/105586984 [root@hosystem ~]# docker exec -it myrabbitmq /bin/bash [root@e687835a6ea7:/]# rabbitmq-plugins enable rabbitmq_management |
按照上圖依次添加,exchange.direct、exchange.fanout、exchange.topic這三個exchange.效果以下
點擊須要的exchange,進去後在bingdings裏填寫與之綁定的queues。
咱們發送key爲hello.news的消息,由於咱們topic有#.news,因此只要有#.news均可以接收
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> |
#rabbitmq配置信息 spring.rabbitmq.host=192.168.188.198 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port= #spring.rabbitmq.virtual-host= |
AmqpAdmin:管理組件
RabbitTemplate:消息發送處理組件
RabbitProperties是封裝RabbitMQ相關配置的類
RabbitTemplate是用於RabbitMQ發送和接收消息
AmqpAdmin是RabbitMQ系統管理功能組件
配置rabbitmq參數
#rabbitmq配置信息 spring.rabbitmq.host=192.168.188.198 spring.rabbitmq.username=guest spring.rabbitmq.password=guest #spring.rabbitmq.port= #spring.rabbitmq.virtual-host= |
發送消息到rabbitmq,默認使用java-serialized序列化
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 單播(點對點) */ @Test void contextLoads() { //message須要本身定義;定義消息體內容和消息頭(org.springframework.amqp.core.Message()) // rabbitTemplate.send(exchange,routekey,message);
//object默認當成消息體,只須要傳入要發送的對象,自動序列化發送給rabbitmq // rabbitTemplate.convertAndSend(exchange,routekey,message); Map<String,Object> map = new HashMap<>(); map.put("msg","test1"); map.put("data", Arrays.asList("helloworld",123,true)); //對象被默認序列化(java-serialized-object)後發送 // rabbitTemplate.convertAndSend("exchange.direct","hos.news",map);
} |
/** * 接收rabbitmsq消息 * 將數據轉爲json發送出去(private MessageConverter messageConverter = new SimpleMessageConverter();) */ @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o.getClass()); System.out.println(o); Object o1 = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o1.getClass()); System.out.println(o1);
} |
將數據轉爲json發送出去MessageConverter messageConverter = new SimpleMessageConverter();(org.springframework.amqp.support.converter.MessageConverter)
package com.hosystem.amqp.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class MyAMQPConfig {
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } } |
package com.hosystem.amqp.bean;
public class Book {
private String bookName; private String author;
public Book() { }
public Book(String bookName, String author) {
this.bookName = bookName; this.author = author; }
public String getBookName() { return bookName; }
public String getAuthor() { return author; }
public void setBookName(String bookName) { this.bookName = bookName; }
public void setAuthor(String author) { this.author = author; } } |
使用自定義對象發送給rabbitmq
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 單播(點對點) */ @Test void contextLoads() { //message須要本身定義;定義消息體內容和消息頭(org.springframework.amqp.core.Message()) // rabbitTemplate.send(exchange,routekey,message);
//object默認當成消息體,只須要傳入要發送的對象,自動序列化發送給rabbitmq // rabbitTemplate.convertAndSend(exchange,routekey,message); Map<String,Object> map = new HashMap<>(); map.put("msg","test1"); map.put("data", Arrays.asList("helloworld",123,true)); //發送自定義對象 rabbitTemplate.convertAndSend("exchange.direct","hos.news",new Book("Linux","linux"));
} } |
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 接收rabbitmsq消息 * 將數據轉爲json發送出去(private MessageConverter messageConverter = new SimpleMessageConverter();) */ @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("hos.news"); System.out.println(o.getClass()); System.out.println(o); } |
package com.hosystem.amqp;
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
/** * 廣播 */ @Test public void sendMsg(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("python書籍","python做者")); } } |
@EnableRabbit + @RabbitListener 監聽消息隊列內容
package com.hosystem.amqp.service;
import com.hosystem.amqp.bean.Book; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service;
@Service public class BookService {
@RabbitListener(queues = "hos.news") public void receive(Book book){ System.out.println("收到消息:"+book); }
@RabbitListener(queues = "hos") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } } |
@EnableRabbit //開啓基於註解的rabbitmq模式 @SpringBootApplication public class Springboot02AmqpApplication {
public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); }
} |
AmqpAdmin(org.springframework.amqp.core.AmqpAdmin):RabbitMQ系統管理功能組件。
AmqpAdmin:建立和刪除Queue、exchange、binding
@SpringBootTest class Springboot02AmqpApplicationTests {
@Autowired RabbitTemplate rabbitTemplate;
// @Bean // @ConditionalOnSingleCandidate(ConnectionFactory.class) // @ConditionalOnProperty( // prefix = "spring.rabbitmq", // name = {"dynamic"}, // matchIfMissing = true // ) // @ConditionalOnMissingBean // public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { // return new RabbitAdmin(connectionFactory); // } @Autowired AmqpAdmin amqpAdmin;
//建立exchange queues binding @Test public void createExchange(){ //org.springframework.amqp.core.Exchange //org.springframework.amqp.core.DirectExchange //建立exchange // amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); // System.out.println("建立成功");
//建立queues //org.springframework.amqp.core.AmqpAdmin // amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
//org.springframework.amqp.core.Binding //建立binding amqpAdmin.declareBinding(new Binding("amqpadmin.queue",Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); } } |