【Spring Boot】19.集成消息

消息服務簡介

  1. 大多應用中,可經過消息服務中間件來提高系統異步通訊、擴展解耦能力 具體而言,主要的應用模式有:
  • 異步處理
  • 應用解耦
  • 流量削峯(例如:京東秒殺)
  1. 消息服務中兩個重要概念
  • 消息代理(message broker)
  • 目的地(destination)

當消息發送者發送消息之後,將由消息代理接管,消息代理保證消息傳遞到指定目的地。java

  1. 消息通訊機制

消息隊列主要有兩種形式的目的地web

  • 隊列(queue):點對點消息通訊(point-to-point)
  • 主題(topic):發佈(publish)/訂閱(subscribe)消息通訊

(1)點對點式redis

  • 消息發送者發送消息,消息代理將其放入一個隊列中,消息接收者從隊列中獲取消息內容,消息讀取後被移出隊列。也就是說,消息一旦被消費,那麼立馬刪除。
  • 消息只有惟一的發送者和接受者,但並非說只能有一個接收者

(2)發佈訂閱式spring

  • 發送者(發佈者)發送消息到主題,多個接收者(訂閱者)監聽(訂閱)這個主題,那麼就會在消息到達時同時收到消息。
  1. 消息服務規範
  • JMS(Java Message Service)JAVA消息服務:
    • 基於JVM消息代理的規範。ActiveMQ、HornetMQ是JMS實現
  • AMQP(Advanced Message Queuing Protocol)
    • 高級消息隊列協議,也是一個消息代理的規範,兼容JMS
    • RabbitMQ是AMQP的實現(高級消息隊列)
JMS AMQP
定義 Java api 網絡線級協議跨語言
是跨平臺
Model 提供兩種消息模型:(1)Peer-2-Peer (2)Pub/sub 提供了五種消息模型:(1)direct exchange (2)fanout exchange (3)topic change (4)headers exchange (5)system exchange本質來說,後四種和JMS的pub/sub模型沒有太大差異,僅是在路由機制上作了更詳細的劃分;
支持消息類型 多種消息類型:TextMessage MapMessage BytesMessage StreamMessage ObjectMessage Message(只有消息頭和屬性) byte[]當實際應用時,有複雜的消息,能夠將消息序列化後發送。
綜合評價 JMS 定義了JAVA API層面的標準;在java體系中,多個client都可以經過JMS進行交互,不須要應用修改代碼,可是其對跨平臺的支持較差; AMQP定義了wire-level層的協議標準;自然具備跨平臺、跨語言特
  1. 無論咱們使用哪一種通訊規範,Spring底層都是支持的:docker

    • spring-jms提供了對JMS的支持
    • spring-rabbit提供了對AMQP的支持
    • 須要ConnectionFactory的實現來鏈接消息代理
    • 提供JmsTemplate、RabbitTemplate來發送消息
    • @JmsListener(JMS)、@RabbitListener(AMQP)註解在方法上監聽消息代理髮布的消息
    • @EnableJms、@EnableRabbit開啓支持
  2. Spring Boot也提供了對應的自動配置類shell

    • JmsAutoConfiguration
    • RabbitAutoConfiguration

2 RabbitMQ

簡介

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue Protocol)的開源實現。json

核心概念

  1. Message 消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組 成,這些屬性包括routing-key(路由鍵:消息發給誰?)、priority(相對於其餘消息的優先權)、delivery-mode(指出該消息可能須要持久性存儲)等。
  2. Publisher 消息的生產者,也是一個向交換器發佈消息的客戶端應用程序。
  3. Exchange 交換器,用來接收生產者發送的消息並將這些消息路由給服務器中的隊列(經過路由鍵)。Exchange有4種類型,不一樣類型的Exchange轉發消息的策略有所區別。
    • direct(默認)
    • fanout
    • topic
    • headers
  4. Queue 消息隊列,用來保存消息直到發送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列裏面,等待消費者鏈接到這個隊列將其取走。
  5. Binding 綁定,用於消息隊列和交換器之間的關聯。一個綁定就是基於路由鍵將交換器和消息隊列鏈接起來的路由規則,因此能夠將交換器理解成一個由綁定構成的路由表。Exchange 和Queue的綁定能夠是多對多的關係。
  6. Connection 網絡鏈接,好比一個TCP鏈接。
  7. Channel 信道,多路複用鏈接中的一條獨立的雙向數據流通道。信道是創建在真實的TCP鏈接內的虛擬鏈接,AMQP命令都是經過信道發出去的,無論是發佈消息、訂閱隊列仍是接收消息,這些動做都是經過信道完成。由於對於操做系統來講創建和銷燬TCP都是很是昂貴的開銷,因此引入了信道的概念,以複用一條TCP鏈接。
  8. Consumer 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
  9. Virtual Host 虛擬主機,表示一批交換器、消息隊列和相關對象。虛擬主機是共享相同的身份認證和加密環境的獨立服務器域。每一個vhost本質上就是一個 mini 版的 RabbitMQ服務器,擁有本身的隊列、交換器、綁定和權限機制。vhost是AMQP概念的基礎,必須在鏈接時指定,RabbitMQ默認的vhost是/
  10. Broker 表示消息隊列服務器實體。

以上核心的關係圖能夠以下所示: 關係圖api

運行機制

  1. AMQP中的消息路由
  • AMQP中消息的路由過程和Java開發者熟悉的JMS存在一些差異,AMQP中增長了Exchange和Binding的角色。生產者把消息發佈到Exchange上,消息最終到達隊列並被消費者接收,而Binding決定交換器的消息應該發送到那個隊列。 3
  • Exchange分發消息時根據類型的不一樣分發策略有區別目前共四種類型:direct、fanout、topic、headers。headers匹配AMQP消息的header而不是路由鍵,headers交換器和direct交換器徹底一致,但性能差不少,目前幾乎用不到了,因此直接看另外三種類型:
    • direct 消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配,若是一個隊列綁定到交換機要求路由鍵爲「dog」,則只轉發 routing key 標記爲「dog」的消息,不會轉發「dog.puppy」,也不會轉發「dog.guard」等等。它是徹底匹配、單播的模式。—— 有選擇性的單播4springboot

    • fanout(分列) 每一個發到fanout類型交換器的消息都會分到全部綁定的隊列上去。fanout交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每一個發送到交換器的消息都會被轉發到與該交換器綁定的全部隊列上。很像子網廣播,每臺子網內的主機都得到了一份複製的消息。fanout類型轉發消息是最快的。—— 廣播模式5服務器

    • topic topic交換器經過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列須要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分紅單詞,這些單詞之間用點隔開。它一樣也會識別兩個通配符:符號「#」和符號「*」。#匹配0個或多個單詞,*匹配一個單詞。—— 有選擇性的選擇廣播6

RabbitMQ安裝與簡單測試

咱們前面的章節已經講過關於RabbitMQ的docker安裝方法。

# docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

其中5678是RabbitMQ服務的服務端口,而15672是web管理網站的服務端口。

有management標誌的docker鏡像有web管理界面,推薦安裝這種版本的。

登陸ipaddress:15672,因爲這裏沒有指定登陸帳戶和密碼,則使用默認的guest:guest。

基本環境搭建

接下來咱們將根據下圖的模式搭建一個基本的測試環境: 基本環境搭建示意圖

  1. 建立交換器 進入管理界面的交換器(exchanges)管理項,添加圖示3種類型的交換器(header類型的已經不經常使用了,咱們只測試其餘3種經常使用類型),名稱都爲:exchange.交換器類型
  • direct
  • fanout
  • topic

注意在Type選項卡選擇對應的類型以及其餘選項默認便可。(例如持久化)

  1. 建立消息隊列

進入管理界面的消息隊列(Queues)管理項,添加圖示的4個消息隊列

  • joyblack
  • joyblack.news
  • joyblack.users
  • shangrong.news

名稱對應填寫,其餘的保持默認設置。

飲水思源。原教程這裏的命名是帶有教程做者的本身的相關信息的,這裏也幫忙廣告一下,joyblack是個人英文名,教程的原名是guigu,是教程網站尚硅谷的拼音;shangrong是我目前所在的公司名尚融,原名是guilixueyuan,即穀粒學院,這個我不太清楚,應該也是做者的教育機構的相關學院名稱。

英文學習 idle(空閒的; 無心義的; 懶惰的; 無根據的;)

  1. 給交換器綁定隊列

接下來咱們爲咱們建立的交換器綁定剛剛建立的隊列,其規則能夠參考咱們的參考圖(實際上是全綁定)。

點擊exchanges選項卡,分別選擇咱們建立的每個交換器exchange.xxx,進入詳細配置界面,點擊詳細管理界面中的bindings,添加綁定:

(1) exchange.direct

  • 選擇 to queue: 每個queue的名字,例如joyblack;
  • Routing key:和綁定的queue的名字保持一致,例如若是上面填寫的隊列是joyblack,此處就填寫joyblack;
  • Arguments 不填,保持默認

(2) exchange.fanout

  • 選擇 to queue: 每個queue的名字,例如joyblack;
  • Routing key:和綁定的queue的名字保持一致,例如若是上面填寫的隊列是joyblack,此處就填寫joyblack;聰明的你還記得fanout的特色嗎?若是記得的話你就會發現這個選項沒有填寫的必要。
  • Arguments 不填,保持默認

(3) exchange.topic

這裏咱們須要注意一下,遵循圖中所示,joyblack.#綁定前3個queue,而*.news綁定以news結尾的兩個queue,所以咱們應該要建立5個綁定規則在exchange.topic交換器中:

  • 選擇 to queue: 分別前3個以及第二第四個;
  • Routing key:joyblack.#或者*.news
  • Arguments 不填,保持默認

這五個綁定規則大體是(to queue -> Routing key)joyblack->joyblack.# joyblack.news->*.news joyblack.news->joyblack.# joyblack.users->joyblack.# shangrong.news->*.news

這樣,咱們基本的環境就搭建好了,接下來,咱們從該管理網站上進行一些簡單的消息測試。

環境內部消息測試

  1. 針對exchange.direct發送消息

咱們先對交換器exchange.direct中發送一條消息,點擊exchange選項卡,點擊exchange.direct進入詳細信息頁面。

往下拉,點擊publish message選項卡,配置發送消息:

  • Routing key: joyblack
  • payload: this is message 1.
  • 其餘配置內容保持默認。

點擊publish message按鈕發送消息。

前面提到,exchange.direct類型的交換器只會進行精確匹配,也就是點對點的單播模式。所以咱們能夠肯定,只有綁定規則中Routing key爲joyblack(咱們剛剛發送消息時指定的Rongting key)匹配規則,綁定規則中指定的消息隊列(joyblack)會收到此消息,其餘的綁定規則中對應的queue不符合此規則,也就不會收到此消息。

接下里咱們來驗證結果。點擊Queues->joyblack->Get Message->點擊GetMessage按鈕,就能夠查看到咱們剛剛發送的消息"this is message 1",同時留意一下其餘的queue,他們是沒有接收到任何消息的,點擊get message會獲得提示"Queue is empty"

  1. 針對exchange.fanout發送消息 進入exchange.fanout詳細界面,發送消息的流程和exchange.direct如出一轍,不過無需指定Routing key(若是你在exchange.direct中這樣作會收到警告"Message published, but not routed. ",由於direct必須指定一個Routing key以進行精確匹配,否則沒有任何路由規則能夠選擇),發送的消息爲"this is message 2"

前面提到,exchange.fanout類型的交換器不進行任何匹配,會將其接收的消息推到本身所綁定的全部消息隊列,也就是多對多的廣播模式(這也是咱們以前提到的,沒有必要指定Routing key的緣由)。所以咱們能夠肯定,全部的queue應該都會接收到此消息。

點擊queue選項卡,能夠看到列表中每個隊列都新收到了一條消息。分別進入每個queue中,能夠發現,點擊Get Message選項卡中的Get Message按鈕都可以收到消息"this is message 2"(注意,若是你想接收到更多的消息,在點擊Get Message以前設置messages的數量爲2,不然只能收到最舊的一條消息)。

  1. 針對exchange.topic發送消息 進入exchange.topic詳細界面,發送消息的流程和exchange.direct如出一轍,指定Routing key爲joyblack.news(若是不指定會收到警告"Message published, but not routed. ",由於topic類型的交換器必須指定一個Routing key以進行模糊匹配,否則沒有任何路由規則能夠選擇),發送的消息爲"this is message 3"

前面提到,exchange.topic類型的交換器會進行模糊匹配。所以咱們能夠肯定,只有綁定規則中Routing key匹配模式符合joyblack.#*.news的queue都會收到消息,查詢咱們綁定規則會發現,咱們綁定的四個queue都會收到消息。

點擊queue選項卡,能夠看到列表中每個隊列都新收到了一條消息(total + 1 )。分別進入每個queue中,能夠發現,點擊Get Message選項卡中的Get Message按鈕都可以收到消息"this is message 3"(注意,若是你想接收到更多的消息,在點擊Get Message以前設置messages的數量爲3)。

爲了測試匹配規則,咱們能夠嘗試指定不一樣的其餘的Routing key來測試某些Queue的綁定規則不適用的狀況,看看是否符合預期(例如Rongting key指定爲joyblack.hello,顯然只有前3個隊列能夠收到消息,進入Queue中發現前3的total + 1)。

經過上面的學習,咱們差很少了解了MQ的運行模式。接下來的內容,咱們將會在springboot中整合RabbitMQ。

RabbitMQ整合

建立工程

模塊選擇: web RabbitMQ(位於integrationx選項卡中),查看pom文件咱們能夠發現,引入RabbitMQ組件只需引入其場景啓動器便可:

pom.xml
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

注意,我用的仍是spring boot 2.x版本。

進入啓動器的內部pom文件,能夠查看其引入的一些基本組件信息:

amqp.pom
<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
      <version>2.1.1.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-messaging</artifactId>
      <version>5.1.3.RELEASE</version>
      <scope>compile</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>2.1.2.RELEASE</version>
      <scope>compile</scope>
      <exclusions>
        <exclusion>
          <artifactId>http-client</artifactId>
          <groupId>com.rabbitmq</groupId>
        </exclusion>
      </exclusions>
    </dependency>
  </dependencies>

自動配置原理

直接看RabbitAutoConfiguration,:

org.springframework.boot.autoconfigure.amqp.RabbitProperties
...
@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {

	@Configuration
	@ConditionalOnMissingBean(ConnectionFactory.class)
	protected static class RabbitConnectionFactoryCreator {

		@Bean
		public CachingConnectionFactory rabbitConnectionFactory(
				RabbitProperties properties,
				ObjectProvider<ConnectionNameStrategy> connectionNameStrategy)
				throws Exception {
			PropertyMapper map = PropertyMapper.get();
			CachingConnectionFactory factory = new CachingConnectionFactory(
					getRabbitConnectionFactoryBean(properties).getObject());
			map.from(properties::determineAddresses).to(factory::setAddresses);
			map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms);
			map.from(properties::isPublisherReturns).to(factory::setPublisherReturns);
			RabbitProperties.Cache.Channel channel = properties.getCache().getChannel();
			map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize);
			map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis)
					.to(factory::setChannelCheckoutTimeout);
			RabbitProperties.Cache.Connection connection = properties.getCache()
					.getConnection();
			map.from(connection::getMode).whenNonNull().to(factory::setCacheMode);
			map.from(connection::getSize).whenNonNull()
					.to(factory::setConnectionCacheSize);
			map.from(connectionNameStrategy::getIfUnique).whenNonNull()
					.to(factory::setConnectionNameStrategy);
			return factory;
		}

		private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(
				RabbitProperties properties) throws Exception {
			PropertyMapper map = PropertyMapper.get();
			RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
			map.from(properties::determineHost).whenNonNull().to(factory::setHost);
			map.from(properties::determinePort).to(factory::setPort);
			map.from(properties::determineUsername).whenNonNull()
					.to(factory::setUsername);
			map.from(properties::determinePassword).whenNonNull()
					.to(factory::setPassword);
			map.from(properties::determineVirtualHost).whenNonNull()
					.to(factory::setVirtualHost);
			map.from(properties::getRequestedHeartbeat).whenNonNull()
					.asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat);
			RabbitProperties.Ssl ssl = properties.getSsl();
			if (ssl.isEnabled()) {
				factory.setUseSSL(true);
				map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm);
				map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType);
				map.from(ssl::getKeyStore).to(factory::setKeyStore);
				map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase);
				map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType);
				map.from(ssl::getTrustStore).to(factory::setTrustStore);
				map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase);
				map.from(ssl::isValidateServerCertificate).to((validate) -> factory
						.setSkipServerCertificateValidation(!validate));
				map.from(ssl::getVerifyHostname)
						.to(factory::setEnableHostnameVerification);
			}
			map.from(properties::getConnectionTimeout).whenNonNull()
					.asInt(Duration::toMillis).to(factory::setConnectionTimeout);
			factory.afterPropertiesSet();
			return factory;
		}

	}

	@Configuration
	@Import(RabbitConnectionFactoryCreator.class)
	protected static class RabbitTemplateConfiguration {

		private final RabbitProperties properties;

		private final ObjectProvider<MessageConverter> messageConverter;

		private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers;

		public RabbitTemplateConfiguration(RabbitProperties properties,
				ObjectProvider<MessageConverter> messageConverter,
				ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) {
			this.properties = properties;
			this.messageConverter = messageConverter;
			this.retryTemplateCustomizers = retryTemplateCustomizers;
		}

		@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnMissingBean
		public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
			PropertyMapper map = PropertyMapper.get();
			RabbitTemplate template = new RabbitTemplate(connectionFactory);
			MessageConverter messageConverter = this.messageConverter.getIfUnique();
			if (messageConverter != null) {
				template.setMessageConverter(messageConverter);
			}
			template.setMandatory(determineMandatoryFlag());
			RabbitProperties.Template properties = this.properties.getTemplate();
			if (properties.getRetry().isEnabled()) {
				template.setRetryTemplate(new RetryTemplateFactory(
						this.retryTemplateCustomizers.orderedStream()
								.collect(Collectors.toList())).createRetryTemplate(
										properties.getRetry(),
										RabbitRetryTemplateCustomizer.Target.SENDER));
			}
			map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis)
					.to(template::setReceiveTimeout);
			map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis)
					.to(template::setReplyTimeout);
			map.from(properties::getExchange).to(template::setExchange);
			map.from(properties::getRoutingKey).to(template::setRoutingKey);
			map.from(properties::getDefaultReceiveQueue).whenNonNull()
					.to(template::setDefaultReceiveQueue);
			return template;
		}

		private boolean determineMandatoryFlag() {
			Boolean mandatory = this.properties.getTemplate().getMandatory();
			return (mandatory != null) ? mandatory : this.properties.isPublisherReturns();
		}

		@Bean
		@ConditionalOnSingleCandidate(ConnectionFactory.class)
		@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
		@ConditionalOnMissingBean
		public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
			return new RabbitAdmin(connectionFactory);
		}

	}

	@Configuration
	@ConditionalOnClass(RabbitMessagingTemplate.class)
	@ConditionalOnMissingBean(RabbitMessagingTemplate.class)
	@Import(RabbitTemplateConfiguration.class)
	protected static class MessagingTemplateConfiguration {

		@Bean
		@ConditionalOnSingleCandidate(RabbitTemplate.class)
		public RabbitMessagingTemplate rabbitMessagingTemplate(
				RabbitTemplate rabbitTemplate) {
			return new RabbitMessagingTemplate(rabbitTemplate);
		}

	}

}

主要留意其中爲咱們提供的RabbitTemplate以及AmqpAdmin兩個組件便可,他們一個用於咱們常規操做MQ以及操做管理MQ。

同時查看封裝的Rabbit的配置屬性類(代碼量太大就不列出了).

org.springframework.boot.autoconfigure.amqp.RabbitProperties
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitProperties {

	/**
	 * RabbitMQ host.
	 */
	private String host = "localhost";

	/**
	 * RabbitMQ port.
	 */
	private int port = 5672;

	/**
	 * Login user to authenticate to the broker.
	 */
	private String username = "guest";

	/**
	 * Login to authenticate against the broker.
	 */
	private String password = "guest";
...

由此能夠總結出咱們能夠配置的屬性以及Rabbit自動配置類爲咱們提供的可用組件等。

配置Rabbit

經過配置文件配置MQ配置.

application.yml
spring:
  rabbitmq:
    addresses: 10.21.1.47
    port: 5672
    username: guest
    password: guest
    virtual-host: /

其中,除了addresss屬性,其餘的均可以不用配置,默認就是咱們指定的這些值。例如virtual-host,查看源碼的取值處:

org.springframework.boot.autoconfigure.amqp.RabbitProperties
public void setVirtualHost(String virtualHost) {
		this.virtualHost = "".equals(virtualHost) ? "/" : virtualHost;
	}

這裏2.x版本和1.x版本的配置些許不一樣,請留意一下。

測試操做RabbitMQ

測試單播

test/BwebApplicationTests.class
package com.zhaoyi.bweb;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;

@RunWith(SpringRunner.class)
@SpringBootTest
public class BwebApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    // test the direct send.
    @Test
    public void directSendTest() {
        // send mode
        // send object
        HashMap<String, Object> data = new HashMap<>();
        data.put("result", true);
        data.put("data", "it is a direct message");
        data.put("array", Arrays.asList("array1","array2"));
        rabbitTemplate.convertAndSend("exchange.direct", "joyblack", "This is a direct message test.");
        rabbitTemplate.convertAndSend("exchange.direct", "joyblack", data);
    }

}

發送消息的方式有不少方法可使用,每一種類的方法又有不少重載方法,能夠按本身的需求去選擇具體的某一種,這裏我選擇的是convertAndSend的重載方法

public void convertAndSend(String exchange, String routingKey, Object object)

即提供交換器、路由鍵以及發送的數據,並且使用默認的序列化器將數據進行序列化。

咱們往exchange.direct交換器發送了一條路由鍵爲joybalck的消息,消息內容爲"This is a direct message test.",查看joyblack消息隊列,咱們能夠獲取到查詢到剛發送的消息,若是您發送的是一個HashMap等複雜類型,消息的內容通常會是序列化的數據,這裏我發送的是字符串,能夠看到原來的樣子,而另外一個HashMap類型的數據在查看的時候則序列化(content_type: application/x-java-serialized-object )。

接下來咱們測試接收消息隊列中的數據:

test/BwebApplicationTests.class
@Test
    public void receiveTest() {
        Object data = rabbitTemplate.receiveAndConvert("joyblack");
        System.out.println(data.getClass());
        System.out.println(data);
    }

咱們能夠看出,每次讀取一條隊列信息,獲取方式是FIFO,所以能夠將咱們以前設置在joyblack隊列中的消息一條條的取出來。

消息被讀取以後就會從隊列中移除。

receiveAndConvert以及receive方法等也一樣具備不少重載方法。

若消息隊列中沒有消息則獲取到的結果是空,注意空指針異常,不過您若是用java8不少新特性以後,能夠忽略這一點。

裝配消息轉化器

咱們若是發送複雜對象,則會被默認的SimpleConvert對象序列化成難以閱讀的信息,同之前redis組件同樣,咱們但願定製本身的消息轉換器,經過觀察源碼,咱們能夠發現,能夠經過自定義一個類型爲org.springframework.amqp.support.converter.MessageConverter的消息轉換器:

config/MyRmqpConfig.class
package com.zhaoyi.bweb.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
class MyAmqpConfig {
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

咱們使用Jackson2JsonMessageConverter做爲消息轉化器。

接下來,再發送一次咱們以前發送過的HashMap類型的數據,就能夠在RabbitMQ管理網站看到數據已經再也不是序列化的,而是json字符串:

RabbitMQ->Queues->joyblack->getMessage
{"result":true,"data":"it is a direct message","array":["array1","array2"]}

同時留意消息頭的變化。

咱們還能夠自定義一個對象,例如User,來測試一下MQ對數據的存儲:

test/BwebApplicationTests.class
// test the direct send.
    @Test
    public void directSend2Test() {
        User user = new User();
        user.setId(1);
        user.setLoginName("joyblack");
        user.setUserName("黑傑克");
        rabbitTemplate.convertAndSend("exchange.direct", "joyblack", user);
    }

用戶實體類就由您本身去定義了。

{"id":1,"loginName":"joyblack","userName":"黑傑克"}

接受的數據會爲咱們轉化爲User類型的對象,再也不作多演示。

測試廣播

測試廣播咱們就使用交換器exchange.fanout便可,同時無需指定路由鍵。例如:

test/BwebApplicationTests.class
rabbitTemplate.convertAndSend("exchange.fanout", "", new Book("西遊記","吳承恩","今年下半年...."));

convertAndSend並無提供convertAndSend(String exchangeName, Object data)這樣的重載方法,只有convertAndSend(String routingKey, Object data)。也就是說,咱們仍是得提供第二個參數即routingKey,設置爲空串或者其餘的都行(雖然值沒有意義,但仍是得提供,這個估計之後API會更改)。

相關文章
相關標籤/搜索