大多數應用中,可經過消息服務中間件來提高系統異步通訊、擴展解耦的能力。html
消息服務中兩個重要概念:消息代理(message broker)和目的地(destination)。當消息發送者發送消息後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。前端
消息隊列主要有兩種形式的目的地:java
點對點:web
發佈/訂閱:spring
JMS(Java Message Service):數據庫
AMQP(Advanced Message Queuing Protocol):springboot
JMS | AMQP | |
---|---|---|
定義 | Java API | 網絡級協議 |
跨語言 | 否 | 是 |
跨平臺 | 否 | 是 |
Model | 提供 2 種消息模型:
|
提供了 5 種消息模型:
|
支持消息類型 | 多種消息類型:
|
因其要支持跨語言跨平臺,因此僅支持 byte[],當實際應用中有複雜的消息時,能夠將消息序列化後發送。 |
綜合 | HMS 定義了 Java API 層面的標準,在 Java 體系中,多個 client 都可經過 JMS 進行交互,不須要修改應用代碼,可是其對跨平臺支持較差。 | AMQP 定義了 wire-level 層的協議標準,自然具備跨平臺、跨語言特性。 |
場景說明:用戶註冊後,須要發註冊郵件和註冊短信。傳統的作法有兩種:串行方式、並行方式。服務器
一、串行方式:將註冊信息寫入數據庫成功後,發送註冊郵件,再發送註冊短信。以上三個任務所有完成後,返回給客戶端。網絡
二、並行方式:將註冊信息寫入數據庫成功後,發送註冊郵件的同時,發送註冊短信。以上三個任務完成後,返回給客戶端。與串行的差異是,並行的方式能夠提升處理的時間。架構
假設三個業務節點每一個使用50毫秒鐘,不考慮網絡等其餘開銷,則串行方式的時間是 150 毫秒,並行的時間多是 100 毫秒。
由於 CPU 在單位時間內處理的請求數是必定的,假設 CPU1 秒內吞吐量是 100 次。則串行方式 1 秒內 CPU 可處理的請求量是 7 次(1000/150)。並行方式處理的請求量是 10 次(1000/100)。
如以上案例描述,傳統的方式系統的性能(併發量,吞吐量,響應時間)很容易達到瓶頸。
三、引入消息隊列,將不是必須的業務邏輯,異步處理。改造後的架構以下:
按照以上約定,用戶的響應時間至關因而註冊信息寫入數據庫的時間,也就是50毫秒。註冊郵件,發送短信寫入消息隊列後,直接返回,所以寫入消息隊列的速度很快,基本能夠忽略,所以用戶的響應時間多是50毫秒。所以架構改變後,系統的吞吐量提升到每秒 20 QPS。比串行提升了 3 倍,比並行提升了 2 倍。
場景說明:用戶下單後,訂單系統須要通知庫存系統。傳統的作法是,訂單系統調用庫存系統的接口。
傳統模式:
傳統模式的缺點:
引入消息隊列:
訂單系統:用戶下單後,訂單系統完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
庫存系統:訂閱下單的消息,採用拉/推的方式,獲取下單信息,庫存系統根據下單信息,進行庫存操做。
假如:在下單時庫存系統不能正常使用。也不影響正常下單,由於下單後,訂單系統寫入消息隊列就再也不關心其餘的後續操做了。實現訂單系統與庫存系統的應用解耦。
場景說明:秒殺活動,通常會由於流量過大,致使流量暴增,應用掛掉。爲解決這個問題,通常須要在應用前端加入消息隊列。
能夠控制活動的人數。
能夠緩解短期內高流量壓垮應用。
用戶的請求,服務器接收後,首先寫入消息隊列。假如消息隊列長度超過最大數量,則直接拋棄用戶請求或跳轉到錯誤頁面。
秒殺業務根據消息隊列中的請求信息,再作後續處理。
該部份內容摘自 https://blog.csdn.net/cws1214/article/details/52922267。
RabbitMQ 採用 Erlang 語言開發,是 AMQP 的開源實現。Erlang 語言由 Ericson 設計,專門爲開發 concurrent 和 distribution 系統的一種語言,在電信領域使用普遍。OTP(Open Telecom Platform)做爲 Erlang 語言的一部分,包含了不少基於 Erlang 開發的中間件/庫/工具,如 mnesia/SASL,極大方便了 Erlang 應用的開發。OTP 就相似於 Python 語言中衆多的 module,用戶藉助這些 module 能夠很方便的開發應用。
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(相對於其餘消息的優先級)、delivery-mode(標識指定消息是否須要持久性存儲)等。
消息的生產者,也是一個向交換機發布消息的客戶端應用程序。
交換器,用來接收生產者發送的消息並將這些消息路由到服務器中的隊列,也是消息到達 Broker 的第一站,根據分發規則,匹配查詢表中的路由鍵,分發消息到隊列中去。經常使用的類型有:direct (point-to-point), topic (publish-subscribe) 和 fanout (multicast)。
消息隊列,用來保存信息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息能夠投入到一個或多個隊列。消息一致在隊列中,等待消費者鏈接到這個隊列將其取走。
綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。路由器和隊列的綁定能夠是多對多的關係。
鏈接,Publisher/Consumer 和 Broker 之間的 TCP 鏈接。斷開鏈接的操做只會在 client 端進行,Broker 不會斷開鏈接,除非出現網絡故障或 Broker 服務出現問題。
信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的 TCP 鏈接內的虛擬鏈接,AMQP 命令都是經過信道發出去的,不論是發佈消息、訂閱隊列仍是接受消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬 TCP 都是很是昂貴的開銷,因此引入了信道的概念以複用一條 TCP 鏈接。
消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證加密環境的獨立服務器域。每一個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost 是 AMQP 概念的基礎,必須在鏈接時指定,RabbitMQ 默認的 vhost 是 '/'。
表示消息隊列服務器實體,即接收和分發消息的應用,RabbitMQ Server 就是 Message Broker。
AMQP 中的消息路由與 JMS 存在一些差異,AMQP 中增長了 Exchange 和 Binding 的角色。生產者須要把消息發佈到 Exchange,最終由 Exchange 轉發到隊列並被消費者接收,而 Binding 就決定了交換器會將消息轉發到哪一個隊列。
Exchange 分發消息時根據類型的不一樣分發策略有區別,目前共有四種類型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由鍵,headers 交換器和 direct 交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三中類型:
Direct Exchange
消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致,交換器就將消息發送到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲"dog",則只轉發 routing key 標記爲 "dog" 的消息,不會轉發 "dog.puppy",也不會轉發"dog.guard"等。它是徹底匹配、單播的模式。
參考【Docker 安裝RabbitMQ】。
一、進入 RabbitMQ 的 web 可視化頁,用 guest 用戶登陸,密碼也爲 guest。
二、新建以下測試隊列:
三、新建以下測試交換器:
四、給新建的 direct 和 fanout 交換器新建以下綁定:
五、給新建的 topic 交換器新建以下綁定:
一、給「張三.msg」這個隊列發送消息:
二、「張三.msg」接收消息:
一、給全部綁定的隊列發送消息:
二、全部隊列都接收到消息:
一、給全部「姓張」的隊列發送消息:
二、全部「姓張」的隊列都接收到消息:
一、使用 maven 新建 SpringBoot 項目,引入 Rabbit 、Web 場景啓動器。
二、配置 RabbitMQ 鏈接信息:
spring.rabbitmq.host=192.168.202.136 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
三、註解配置啓用 RabbitMQ:
package com.springboot.config; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit // 啓用 Rabbit public class MyAmqpConfig { }
四、新建測試 JavaBean:
package com.springboot.bean; import java.io.Serializable; import java.util.Date; public class User implements Serializable { private Integer id; private String name; private Date birthday; private String city; public User() { } public User(Integer id, String name, Date birthday, String city) { this.id = id; this.name = name; this.birthday = birthday; this.city = city; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getBirthday() { return birthday; } public void setBirthday(Date birthday) { this.birthday = birthday; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } @Override public String toString() { return "User{" + "id=" + id + ", name='" + name + '\'' + ", birthday=" + birthday + ", city='" + city + '\'' + '}'; } }
下面經過 RabbitTemplate 來完成上述 RabbitMQ 在可視化界面中的幾個測試操做:
package com.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitTemplateTests { // org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration 自動配置類中註冊了 RabbitTemplate 的 bean @Autowired private RabbitTemplate rabbitTemplate; @Test public void test1(){ // 經過 direct 交換器給 「張三.msg」 隊列發送消息 // send 方法的 message 參數中須要本身定義消息頭和消息體 // rabbitTemplate.send(exchange,routingkey,message); rabbitTemplate.convertAndSend("my.direct","zhangsan.msg","你好 張三"); } @Test public void test2(){ // 接收 「張三.msg」 隊列的消息 Object o = rabbitTemplate.receiveAndConvert("張三.msg"); System.out.println(o.toString()); /* 你好 張三 */ } @Test public void test3(){ // 經過 fanout 交換器給全部隊列發送消息 rabbitTemplate.convertAndSend("my.fanout", "zhangsan.msg", "你們好"); } @Test public void test4(){ // 全部隊列接收消息 Object msg1 = rabbitTemplate.receiveAndConvert("張三.msg"); System.out.println(msg1.toString()); Object msg2 = rabbitTemplate.receiveAndConvert("張四.msg"); System.out.println(msg2.toString()); Object msg3 = rabbitTemplate.receiveAndConvert("李三.msg"); System.out.println(msg3.toString()); Object msg4 = rabbitTemplate.receiveAndConvert("李四.msg"); System.out.println(msg4.toString()); /* 你們好 你們好 你們好 你們好 */ } @Test public void test5(){ // 經過 topic 交換器給全部「姓張」的隊列發送消息 rabbitTemplate.convertAndSend("my.topic", "zhang.hello", "張先生 你好"); } @Test public void test6(){ // 全部「姓張」的隊列接收消息 Object msg1 = rabbitTemplate.receiveAndConvert("張三.msg"); Object msg2 = rabbitTemplate.receiveAndConvert("張四.msg"); System.out.println(msg1); System.out.println(msg2); /* 張先生 你好 張先生 你好 */ } }
在上述的操做中操做的都是字符串,而經過 RabbitTemplate 是能夠直接操做對象的,RabbitTemplate 內部的 Converter 會自動幫咱們完成對象的序列化與反序列化:
package com.springboot; import com.springboot.bean.User; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.ParseException; import java.text.SimpleDateFormat; @RunWith(SpringRunner.class) @SpringBootTest public class RabbitTemplateTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test1() throws ParseException { // 直接發送一個對象 User user = new User(1, "張三", new SimpleDateFormat("yyyy-MM-dd").parse("1998-6-5"), "深圳"); rabbitTemplate.convertAndSend("my.direct","zhangsan.msg",user); } @Test public void test2(){ Object o = rabbitTemplate.receiveAndConvert("張三.msg"); System.out.println(o.getClass()); System.out.println(o); /* class com.springboot.bean.User User{id=1, name='張三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'} */ // 根據輸出結果能夠看到,獲取的消息自動完成了反序列化轉換爲 java 對象 } }
查看 RabbitMQ 服務器中存儲的對象,會發現存儲的值爲 RabbitMQ 以默認消息轉換器 org.springframework.amqp.support.converter.SimpleMessageConverter 序列化後的值,若是咱們須要存儲的消息爲 Json 格式,只須要本身註冊一個 Json 格式消息轉換器到容器便可,而 Spring 已經給咱們提供了這個轉換器:
package com.springboot.config; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @EnableRabbit public class MyAmqpConfig { @Bean public MessageConverter messageConverter(){ Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); return jackson2JsonMessageConverter; } }
此時再次執行上述操做,查看服務器中存儲消息:
消息以轉換爲 Json 格式。
Spring 也爲咱們提供了監聽隊列支持的註解 @RabbitListener,它可以幫咱們很簡便的建立一個監聽服務,只須要標註在一個存放在 IoC 容器中實例的方法上。看以下示例:
一、建立一個服務類,註冊到 IoC 容器,使用 @RabbitListener 註解標註在方法上:
package com.springboot.service; import com.springboot.bean.User; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class UserService { @RabbitListener(queues = {"張三.msg"}) // 監聽指定隊列消息 public void receiveUserMsg(User user) { // 接收自動反序列化後的對象 System.out.println(user); } @RabbitListener(queues = {"李四.msg"}) public void receiveMessage(Message message){ // 接收源消息信息 // 得到消息體 System.out.println(message.getBody()); // 得到消息屬性信息 System.out.println(message.getMessageProperties()); } }
二、啓動程序,運行單元測試中發送 User 對象方法,監聽程序輸出以下:
User{id=1, name='張三', birthday=Fri Jun 05 00:00:00 CST 1998, city='深圳'}
Spring 自動註冊了一個 AmqpAdmin 組件,它的做用相似於數據庫中的 DDL 語句,能夠用來幫咱們定義(建立)交換器、隊列。以下:
package com.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest public class AmqpAdminTests { @Autowired private AmqpAdmin amqpAdmin; @Test public void testDeclareExchange(){ // 建立一個交換器 Exchange exchange = new DirectExchange("my.directNew"); amqpAdmin.declareExchange(exchange);
} @Test public void testDeclareQueue(){ // 建立 Queue Queue queue = new Queue("myQueue"); amqpAdmin.declareQueue(queue);
} @Test public void testBinding(){ // 建立一個 binding ,綁定交換器與隊列 amqpAdmin.declareBinding(new Binding("myQueue", Binding.DestinationType.QUEUE,"my.directNew","myQueue",null));
}
}