當消息發送者發送消息之後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。java
消息隊列主要有兩種形式的目的地web
(1)點對點式redis
(2)發佈訂閱式spring
JMS | AMQP | |
定義 | Java api | 網絡線級協議跨語言 |
是跨平臺 | 否 | 是 |
Model | 提供兩種消息模型:(1)Peer-2-Peer (2)Pub/sub | 提供了五種消息模型:(1)direct exchange (2)fanout exchange (3)topic change (4)headers exchange (5)system exchange本質來說,後四種和JMS的pub/sub模型沒有太大差異,僅是在路由機制上作了更詳細的劃分; |
支持消息類型 | 多種消息類型:TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message(只有消息頭和屬性) | byte[]當實際應用時,有複雜的消息,能夠將消息序列化後發送。 |
綜合評價 | JMS 定義了JAVA API層面的標準;在java體系中,多個client都可以經過JMS進行交互,不須要應用修改代碼,可是其對跨平臺的支持較差; | AMQP定義了wire-level層的協議標準;自然具備跨平臺、跨語言特 |
無論咱們使用哪一種通訊規範,Spring底層都是支持的:docker
Spring Boot也提供了對應的自動配置類shell
RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。json
/
。以上核心的關係圖能夠以下所示: api
direct 消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。—— 有選擇性的單播
。 springboot
fanout(分列) 每一個發到fanout類型交換器的消息都會分到全部綁定的隊列上去。fanout交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout類型轉發消息是最快的。—— 廣播模式
。 服務器
topic topic交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「*」。#匹配0個或多個單詞,*匹配一個單詞。—— 有選擇性的選擇廣播
。
咱們前面的章節已經講過關於RabbitMQ的docker安裝方法。
# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
其中5678是RabbitMQ服務的服務端口,而15672是web管理網站的服務端口。
有management標誌的docker鏡像有web管理界面,推薦安裝這種版本的。
登陸ipaddress:15672,因爲這裏沒有指定登陸帳戶和密碼,則使用默認的guest:guest。
接下來咱們將根據下圖的模式搭建一個基本的測試環境:
exchange.交換器類型
注意在Type選項卡選擇對應的類型以及其餘選項默認便可。(例如持久化)
進入管理界面的消息隊列(Queues)管理項,添加圖示的4個消息隊列
名稱對應填寫,其餘的保持默認設置。
飲水思源。原教程這裏的命名是帶有教程做者的本身的相關信息的,這裏也幫忙廣告一下,joyblack是個人英文名,教程的原名是guigu,是教程網站尚硅谷的拼音;shangrong是我目前所在的公司名尚融,原名是guilixueyuan,即穀粒學院,這個我不太清楚,應該也是做者的教育機構的相關學院名稱。
英文學習
idle
(空閒的; 無心義的; 懶惰的; 無根據的;)
接下來咱們爲咱們建立的交換器綁定剛剛建立的隊列,其規則能夠參考咱們的參考圖(實際上是全綁定)。
點擊exchanges選項卡,分別選擇咱們建立的每個交換器exchange.xxx
,進入詳細配置界面,點擊詳細管理界面中的bindings
,添加綁定:
(1) exchange.direct
(2) exchange.fanout
(3) exchange.topic
這裏咱們須要注意一下,遵循圖中所示,joyblack.#
綁定前3個queue,而*.news
綁定以news結尾的兩個queue,所以咱們應該要建立5個綁定規則在exchange.topic交換器中:
joyblack.#
或者*.news
。這五個綁定規則大體是(to queue -> Routing key)
joyblack->joyblack.# joyblack.news->*.news joyblack.news->joyblack.# joyblack.users->joyblack.# shangrong.news->*.news
這樣,咱們基本的環境就搭建好了,接下來,咱們從該管理網站上進行一些簡單的消息測試。
咱們先對交換器exchange.direct中發送一條消息,點擊exchange選項卡,點擊exchange.direct進入詳細信息頁面。
往下拉,點擊publish message選項卡,配置發送消息:
點擊publish message按鈕發送消息。
前面提到,exchange.direct類型的交換器只會進行精確匹配,也就是點對點的單播模式。所以咱們能夠肯定,只有綁定規則中Routing key爲joyblack(咱們剛剛發送消息時指定的Rongting key)匹配規則,綁定規則中指定的消息隊列(joyblack)會收到此消息,其餘的綁定規則中對應的queue不符合此規則,也就不會收到此消息。
接下里咱們來驗證結果。點擊Queues->joyblack->Get Message->點擊GetMessage按鈕,就能夠查看到咱們剛剛發送的消息"this is message 1"
,同時留意一下其餘的queue,他們是沒有接收到任何消息的,點擊get message會獲得提示"Queue is empty"
。
"Message published, but not routed. "
,由於direct必須指定一個Routing key以進行精確匹配,否則沒有任何路由規則能夠選擇),發送的消息爲"this is message 2"
。前面提到,exchange.fanout類型的交換器不進行任何匹配,會將其接收的消息推到本身所綁定的全部消息隊列,也就是多對多的廣播模式(這也是咱們以前提到的,沒有必要指定Routing key的緣由)。所以咱們能夠肯定,全部的queue應該都會接收到此消息。
點擊queue選項卡,能夠看到列表中每個隊列都新收到了一條消息。分別進入每個queue中,能夠發現,點擊Get Message選項卡中的Get Message按鈕都可以收到消息"this is message 2"
(注意,若是你想接收到更多的消息,在點擊Get Message以前設置messages的數量爲2,不然只能收到最舊的一條消息)。
joyblack.news
(若是不指定會收到警告"Message published, but not routed. "
,由於topic類型的交換器必須指定一個Routing key以進行模糊匹配,否則沒有任何路由規則能夠選擇),發送的消息爲"this is message 3"
。前面提到,exchange.topic類型的交換器會進行模糊匹配。所以咱們能夠肯定,只有綁定規則中Routing key匹配模式符合joyblack.#
和*.news
的queue都會收到消息,查詢咱們綁定規則會發現,咱們綁定的四個queue都會收到消息。
點擊queue選項卡,能夠看到列表中每個隊列都新收到了一條消息(total + 1 )。分別進入每個queue中,能夠發現,點擊Get Message選項卡中的Get Message按鈕都可以收到消息"this is message 3"
(注意,若是你想接收到更多的消息,在點擊Get Message以前設置messages的數量爲3)。
爲了測試匹配規則,咱們能夠嘗試指定不一樣的其餘的Routing key來測試某些Queue的綁定規則不適用的狀況,看看是否符合預期(例如Rongting key指定爲joyblack.hello,顯然只有前3個隊列能夠收到消息,進入Queue中發現前3的total + 1)。
經過上面的學習,咱們差很少了解了MQ的運行模式。接下來的內容,咱們將會在springboot中整合RabbitMQ。
模塊選擇: web RabbitMQ(位於integrationx選項卡中),查看pom文件咱們能夠發現,引入RabbitMQ組件只需引入其場景啓動器便可:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
注意,我用的仍是spring boot 2.x版本。
進入啓動器的內部pom文件,能夠查看其引入的一些基本組件信息:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.1.1.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>5.1.3.RELEASE</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.1.2.RELEASE</version> <scope>compile</scope> <exclusions> <exclusion> <artifactId>http-client</artifactId> <groupId>com.rabbitmq</groupId> </exclusion> </exclusions> </dependency> </dependencies>
直接看RabbitAutoConfiguration,:
... @Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { @Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory( RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception { PropertyMapper map = PropertyMapper.get(); CachingConnectionFactory factory = new CachingConnectionFactory( getRabbitConnectionFactoryBean(properties).getObject()); map.from(properties::determineAddresses).to(factory::setAddresses); map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms); map.from(properties::isPublisherReturns).to(factory::setPublisherReturns); RabbitProperties.Cache.Channel channel = properties.getCache().getChannel(); map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize); map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis) .to(factory::setChannelCheckoutTimeout); RabbitProperties.Cache.Connection connection = properties.getCache() .getConnection(); map.from(connection::getMode).whenNonNull().to(factory::setCacheMode); map.from(connection::getSize).whenNonNull() .to(factory::setConnectionCacheSize); map.from(connectionNameStrategy::getIfUnique).whenNonNull() .to(factory::setConnectionNameStrategy); return factory; } private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean( RabbitProperties properties) throws Exception { PropertyMapper map = PropertyMapper.get(); RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); map.from(properties::determineHost).whenNonNull().to(factory::setHost); map.from(properties::determinePort).to(factory::setPort); map.from(properties::determineUsername).whenNonNull() .to(factory::setUsername); map.from(properties::determinePassword).whenNonNull() .to(factory::setPassword); map.from(properties::determineVirtualHost).whenNonNull() .to(factory::setVirtualHost); map.from(properties::getRequestedHeartbeat).whenNonNull() .asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat); RabbitProperties.Ssl ssl = properties.getSsl(); if (ssl.isEnabled()) { factory.setUseSSL(true); map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm); map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType); map.from(ssl::getKeyStore).to(factory::setKeyStore); map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase); map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType); map.from(ssl::getTrustStore).to(factory::setTrustStore); map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase); map.from(ssl::isValidateServerCertificate).to((validate) -> factory .setSkipServerCertificateValidation(!validate)); map.from(ssl::getVerifyHostname) .to(factory::setEnableHostnameVerification); } map.from(properties::getConnectionTimeout).whenNonNull() .asInt(Duration::toMillis).to(factory::setConnectionTimeout); factory.afterPropertiesSet(); return factory; } } @Configuration @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration { private final RabbitProperties properties; private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers; public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { this.properties = properties; this.messageConverter = messageConverter; this.retryTemplateCustomizers = retryTemplateCustomizers; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = this.messageConverter.getIfUnique(); if (messageConverter != null) { template.setMessageConverter(messageConverter); } template.setMandatory(determineMandatoryFlag()); RabbitProperties.Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate(new RetryTemplateFactory( this.retryTemplateCustomizers.orderedStream() .collect(Collectors.toList())).createRetryTemplate( properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER)); } map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReceiveTimeout); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReplyTimeout); map.from(properties::getExchange).to(template::setExchange); map.from(properties::getRoutingKey).to(template::setRoutingKey); map.from(properties::getDefaultReceiveQueue).whenNonNull() .to(template::setDefaultReceiveQueue); return template; } private boolean determineMandatoryFlag() { Boolean mandatory = this.properties.getTemplate().getMandatory(); return (mandatory != null) ? mandatory : this.properties.isPublisherReturns(); } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } @Configuration @ConditionalOnClass(RabbitMessagingTemplate.class) @ConditionalOnMissingBean(RabbitMessagingTemplate.class) @Import(RabbitTemplateConfiguration.class) protected static class MessagingTemplateConfiguration { @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate( RabbitTemplate rabbitTemplate) { return new RabbitMessagingTemplate(rabbitTemplate); } } }
主要留意其中爲咱們提供的RabbitTemplate
以及AmqpAdmin
兩個組件便可,他們一個用於咱們常規操做MQ以及操做管理MQ。
同時查看封裝的Rabbit的配置屬性類(代碼量太大就不列出了).
@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { /** * RabbitMQ host. */ private String host = "localhost"; /** * RabbitMQ port. */ private int port = 5672; /** * Login user to authenticate to the broker. */ private String username = "guest"; /** * Login to authenticate against the broker. */ private String password = "guest"; ...
由此能夠總結出咱們能夠配置的屬性以及Rabbit自動配置類爲咱們提供的可用組件等。
經過配置文件配置MQ配置.
spring: rabbitmq: addresses: 10.21.1.47 port: 5672 username: guest password: guest virtual-host: /
其中,除了addresss屬性,其餘的均可以不用配置,默認就是咱們指定的這些值。例如virtual-host
,查看源碼的取值處:
public void setVirtualHost(String virtualHost) { this.virtualHost = "".equals(virtualHost) ? "/" : virtualHost; }
這裏2.x版本和1.x版本的配置些許不一樣,請留意一下。
package com.zhaoyi.bweb; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.HashMap; @RunWith(SpringRunner.class) @SpringBootTest public class BwebApplicationTests { @Autowired RabbitTemplate rabbitTemplate; // test the direct send. @Test public void directSendTest() { // send mode // send object HashMap<String, Object> data = new HashMap<>(); data.put("result", true); data.put("data", "it is a direct message"); data.put("array", Arrays.asList("array1","array2")); rabbitTemplate.convertAndSend("exchange.direct", "joyblack", "This is a direct message test."); rabbitTemplate.convertAndSend("exchange.direct", "joyblack", data); } }
發送消息的方式有不少方法可使用,每一種類的方法又有不少重載方法,能夠按本身的需求去選擇具體的某一種,這裏我選擇的是convertAndSend
的重載方法
public void convertAndSend(String exchange, String routingKey, Object object)
即提供交換器、路由鍵以及發送的數據,並且使用默認的序列化器將數據進行序列化。
咱們往exchange.direct交換器發送了一條路由鍵爲joybalck
的消息,消息內容爲"This is a direct message test."
,查看joyblack消息隊列,咱們能夠獲取到查詢到剛發送的消息,若是您發送的是一個HashMap等複雜類型,消息的內容通常會是序列化的數據,這裏我發送的是字符串,能夠看到原來的樣子,而另外一個HashMap類型的數據在查看的時候則序列化(content_type: application/x-java-serialized-object
)。
接下來咱們測試接收消息隊列中的數據:
@Test public void receiveTest() { Object data = rabbitTemplate.receiveAndConvert("joyblack"); System.out.println(data.getClass()); System.out.println(data); }
咱們能夠看出,每次讀取一條隊列信息,獲取方式是FIFO,所以能夠將咱們以前設置在joyblack
隊列中的消息一條條的取出來。
消息被讀取以後就會從隊列中移除。
receiveAndConvert以及receive方法等也一樣具備不少重載方法。
若消息隊列中沒有消息則獲取到的結果是空,注意空指針異常,不過您若是用java8不少新特性以後,能夠忽略這一點。
咱們若是發送複雜對象,則會被默認的SimpleConvert對象序列化成難以閱讀的信息,同之前redis組件同樣,咱們但願定製本身的消息轉換器,經過觀察源碼,咱們能夠發現,能夠經過自定義一個類型爲org.springframework.amqp.support.converter.MessageConverter
的消息轉換器:
package com.zhaoyi.bweb.config; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration class MyAmqpConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
咱們使用Jackson2JsonMessageConverter
做爲消息轉化器。
接下來,再發送一次咱們以前發送過的HashMap類型的數據,就能夠在RabbitMQ管理網站看到數據已經再也不是序列化的,而是json字符串:
{"result":true,"data":"it is a direct message","array":["array1","array2"]}
同時留意消息頭的變化。
咱們還能夠自定義一個對象,例如User,來測試一下MQ對數據的存儲:
// test the direct send. @Test public void directSend2Test() { User user = new User(); user.setId(1); user.setLoginName("joyblack"); user.setUserName("黑傑克"); rabbitTemplate.convertAndSend("exchange.direct", "joyblack", user); }
用戶實體類就由您本身去定義了。
{"id":1,"loginName":"joyblack","userName":"黑傑克"}
接受的數據會爲咱們轉化爲User類型的對象,再也不作多演示。
測試廣播咱們就使用交換器exchange.fanout便可,同時無需指定路由鍵。例如:
rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("西遊記","吳承恩","今年下半年...."));
convertAndSend
並無提供convertAndSend(String exchangeName, Object data)
這樣的重載方法,只有convertAndSend(String routingKey, Object data)
。也就是說,咱們仍是得提供第二個參數即routingKey,設置爲空串或者其餘的都行(雖然值沒有意義,但仍是得提供,這個估計之後API會更改)。