Spring Boot 參考指南(消息傳遞)

32. 消息傳遞

Spring框架爲與消息傳遞系統集成提供了普遍的支持,從使用JmsTemplate簡化的JMS API到使用完整的基礎設施異步接收消息,Spring AMQP爲高級消息隊列協議提供了相似的特性集。Spring Boot還爲RabbitTemplate和RabbitMQ提供自動配置選項,Spring WebSocket原生包括對STOMP消息的支持,Spring Boot經過啓動器和少許的自動配置支持這一點,Spring Boot還支持Apache Kafka。html

32.1 JMS

javax.jms.ConnectionFactory接口提供了建立javax.jms.Connection與JMS代理交互的標準方法,儘管Spring須要一個ConnectionFactory來處理JMS,你一般不須要本身直接使用它,而是能夠依賴於更高級別的消息傳遞抽象。(詳見Spring Framework參考文檔的相關部分。)Spring Boot還自動配置發送和接收消息所需的基礎設施。java

32.1.1 ActiveMQ支持

ActiveMQ在類路徑上可用時,Spring Boot還能夠配置ConnectionFactory,若是代理存在,則會自動啓動和配置嵌入式代理(只要沒有經過配置指定代理URL)。git

若是你使用 spring-boot-starter-activemq,那麼將提供鏈接或嵌入ActiveMQ實例所需的依賴項,與JMS集成的Spring基礎設施也是同樣。

ActiveMQ配置由在spring.activemq.*中的外部配置屬性控制,例如,你能夠在application.properties中聲明如下部分:github

spring.activemq.broker-url=tcp://192.168.1.210:9876
spring.activemq.user=admin
spring.activemq.password=secret

你還能夠經過向org.apache.activemq:activemq-pool添加一個依賴項來共享JMS資源並相應地配置PooledConnectionFactory,以下例所示:spring

spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=50
有關更多支持的選項,請參見 ActiveMQProperties,你還能夠註冊一個任意數量的實現 activemqconnectionfactorycustomzer的bean進行更高級的自定義。

默認狀況下,ActiveMQ建立一個目的地,若是它還不存在,那麼目的地將根據它們提供的名稱解析。apache

32.1.2 Artemis支持

Spring Boot能夠自動配置ConnectionFactory,當它檢測到在類路徑上可用的Artemis時,若是代理存在,則自動啓動和配置嵌入式代理(除非模式屬性已顯式設置),所支持的模式是embedded(要明確地說明須要一個嵌入式代理,若是代理在類路徑上不可用,就會出現錯誤)和native(使用netty傳輸協議鏈接到代理),在配置後者時,Spring Boot使用默認設置配置鏈接到運行在本地機器上的代理的ConnectionFactoryjson

若是你使用 spring-boot-starter-artemis,則提供了鏈接到現有的Artemis實例的必要依賴項,以及與JMS集成的Spring基礎設施,添加 org.apache.activemq:artemis-jms-server到你的應用程序以容許你使用嵌入式模式。

Artemis配置由spring.artemis.*的外部配置屬性控制,例如,你能夠在application.properties中聲明如下部分:bootstrap

spring.artemis.mode=native
spring.artemis.host=192.168.1.210
spring.artemis.port=9876
spring.artemis.user=admin
spring.artemis.password=secret

在嵌入代理時,你能夠選擇是否啓用持久性,並列出應該可用的目的地,能夠將它們指定爲逗號分隔的列表,以使用默認選項建立它們,或者能夠定義類型爲org.apache.activemq.artemis.jms.server.config.JMSQueueConfigurationorg.apache.activemq.artemis.jms.server.config.TopicConfiguration的bean,分別用於高級隊列和主題配置。segmentfault

有關更多受支持的選項,請參閱ArtemisPropertiesapi

不涉及JNDI查找,目的地根據它們的名稱進行解析,使用Artemis配置中的name屬性或經過配置提供的名稱。

32.1.3 使用JNDI ConnectionFactory

若是你正在應用服務器中運行應用程序,Spring Boot試圖使用JNDI定位JMS ConnectionFactory,默認狀況下,檢查java:/JmsXAjava:/XAConnectionFactory位置,若是須要指定替代位置,可使用spring.jms.jndi-name屬性,以下例所示:

spring.jms.jndi-name=java:/MyConnectionFactory

32.1.4 發送消息

Spring的JmsTemplate是自動配置的,你能夠將其自動鏈接到你本身的bean中,以下面的示例所示:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final JmsTemplate jmsTemplate;

    @Autowired
    public MyBean(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    // ...

}
JmsMessagingTemplate能夠以相似的方式注入,若是定義了 DestinationResolverMessageConverter bean,則它將自動關聯到自動配置的 JmsTemplate

32.1.5 接收消息

當出現JMS基礎設施時,可使用@JmsListener註解任何bean,以建立監聽器端點,若是沒有定義JmsListenerContainerFactory,則會自動配置默認工廠,若是定義了DestinationResolverMessageConverter bean,則它將自動關聯到默認工廠。

默認狀況下,默認工廠是事務性的,若是你運行的基礎設施中存在JtaTransactionManager,那麼它默認與偵聽器容器相關聯,若是沒有,則啓用sessionTransacted標誌。在後一個場景中,你能夠經過在監聽器方法(或委託)上添加@Transactional,將本地數據存儲事務與接收消息的處理相關聯,這確保在本地事務完成以後,傳入消息獲得確認,這還包括髮送在相同JMS會話上執行的響應消息。

如下組件在someQueue目的地上建立監聽器端點:

@Component
public class MyBean {

    @JmsListener(destination = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
有關更多細節,請參見 @EnableJms的Javadoc

若是你須要建立更多的JmsListenerContainerFactory實例,或者但願重寫默認的實例,Spring Boot提供了一個DefaultJmsListenerContainerFactoryConfigurer,你可使用它來初始化一個DefaultJmsListenerContainerFactory,其設置與自動配置的工廠相同。

例如,下面的示例公開了另外一個使用特定MessageConverter的工廠:

@Configuration
static class JmsConfiguration {

    @Bean
    public DefaultJmsListenerContainerFactory myFactory(
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory());
        factory.setMessageConverter(myMessageConverter());
        return factory;
    }

}

而後你能夠在任何@JmsListener註解的方法中使用工廠,以下所示:

32.2 AMQP

高級消息隊列協議(AMQP)是面向消息的中間件的一種平臺無關的、有線級別的協議。Spring AMQP項目將核心Spring概念應用於基於AMQP的消息傳遞解決方案的開發,Spring Boot爲經過RabbitMQ使用AMQP提供了一些方便,包括spring-boot-starter-amqp「啓動器」。

32.2.1 RabbitMQ支持

RabbitMQ是一個輕量級的、可靠的、可伸縮的、可移植的消息代理,基於AMQP協議,Spring使用RabbitMQ經過AMQP協議進行通訊。

RabbitMQ配置由spring.rabbitmq.*的外部配置屬性控制,例如,你能夠在application.properties中聲明如下部分:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret

若是上下文中存在ConnectionNameStrategy bean,那麼它將自動用於命名由自動配置的ConnectionFactory建立的鏈接。有關更多受支持的選項,請參閱RabbitProperties

有關詳細信息,請參閱 RabbitMQ使用的協議AMQP

32.2.2 發送消息

Spring的AmqpTemplateAmqpAdmin是自動配置的,你能夠將它們自動鏈接到你本身的bean中,以下面的示例所示:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

    private final AmqpAdmin amqpAdmin;
    private final AmqpTemplate amqpTemplate;

    @Autowired
    public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
        this.amqpAdmin = amqpAdmin;
        this.amqpTemplate = amqpTemplate;
    }

    // ...

}
RabbitMessagingTemplate能夠以相似的方式注入,若是定義了 MessageConverter bean,它會自動關聯到自動配置的 AmqpTemplate

若是有必要,任何定義爲bean的org.springframework.amqp.core.Queue自動用於在RabbitMQ實例上聲明相應的隊列。

重試操做,能夠對AmqpTemplate啓用重試(例如,若是代理鏈接丟失了),默認狀況下禁用重試。

32.2.3 接收消息

當Rabbit基礎設施存在時,可使用@RabbitListener對任何bean進行註解,以建立監聽器端點,若是沒有定義RabbitListenerContainerFactory,則會自動配置默認的SimpleRabbitListenerContainerFactory,你可使用spring.rabbitmq.listener.type屬性切換到直接容器。若是定義了MessageConverterMessageRecoverer bean,則它將自動與默認工廠相關聯。

如下示例組件在someQueue隊列上建立監聽器端點:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue")
    public void processMessage(String content) {
        // ...
    }

}
有關更多細節,請參見 @EnableRabbit的Javadoc

若是你須要建立更多的RabbitListenerContainerFactory實例,或者你想要覆蓋缺省值,Spring Boot提供了一個SimpleRabbitListenerContainerFactoryConfigurer和一個DirectRabbitListenerContainerFactoryConfigurer,你可使用它們初始化一個SimpleRabbitListenerContainerFactory,以及一個DirectRabbitListenerContainerFactory,其設置與自動配置使用的工廠相同。

選擇哪一種容器類型並不重要,這兩個bean經過自動配置公開。

例如,下面的configuration類公開另外一個使用特定MessageConverter的工廠:

@Configuration
static class RabbitConfiguration {

    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer) {
        SimpleRabbitListenerContainerFactory factory =
                new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(myMessageConverter());
        return factory;
    }

}

而後你能夠在任何@RabbitListener註解的方法中使用工廠,以下所示:

@Component
public class MyBean {

    @RabbitListener(queues = "someQueue", containerFactory="myFactory")
    public void processMessage(String content) {
        // ...
    }

}

你能夠啓用重試來處理監聽器拋出異常的狀況,默認狀況下,使用RejectAndDontRequeueRecoverer,可是能夠定義本身的MessageRecoverer,當重試結束時,消息將被拒絕,若是將代理配置爲這樣作,則消息將被刪除或路由到死信交換,默認狀況下,重試被禁用。

重要
默認狀況下,若是重試被禁用而且監聽器拋出異常,該遞送被無限期地重試,你能夠用兩種方式修改此行爲:將 defaultRequeueRejected屬性設置爲 false,以便嘗試零重複發送,或者拋出 AmqpRejectAndDontRequeueException來通知消息應該被拒絕,後者是在啓用重試並達到最大提交嘗試次數時使用的機制。

32.3 Apache Kafka支持

經過提供spring-kafka項目的自動配置來支持Apache Kafka

Kafka配置由spring.kafka.*中的外部配置屬性控制,例如,你能夠在application.properties中聲明如下部分:

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
要在啓動時建立主題,請添加一個類型 NewTopic的bean,若是主題已經存在,則忽略bean。

有關更多受支持的選項,請參閱KafkaProperties

32.3.1 發送消息

Spring的KafkaTemplate是自動配置的,你能夠直接在你本身的bean中自動鏈接它,以下面的示例所示:

@Component
public class MyBean {

    private final KafkaTemplate kafkaTemplate;

    @Autowired
    public MyBean(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    // ...

}
若是定義了一個 RecordMessageConverter bean,它將自動關聯到自動配置的 KafkaTemplate

32.3.2 接收消息

當存在Apache Kafka基礎設施時,任何bean均可以使用@KafkaListener進行註解,以建立監聽器端點,若是沒有定義KafkaListenerContainerFactory,默認設置爲使用spring.kafka.listener.*中定義的鍵,此外,若是定義了一個RecordMessageConverter bean,它將自動關聯到默認的工廠。

@Component
public class MyBean {

    @KafkaListener(topics = "someTopic")
    public void processMessage(String content) {
        // ...
    }

}

32.3.3 附加的Kafka屬性

自動配置所支持的屬性顯示在附錄A中,通用的應用程序屬性。注意,在大多數狀況下,這些屬性(連字符或駝峯式大小寫)直接映射到Apache Kafka *屬性,有關詳細信息,請參閱Apache Kafka文檔。

前幾個屬性同時適用於生產者和消費者,可是若是你但願對每一個屬性使用不一樣的值,能夠在生產者或消費者級別指定,Apache Kafka設計具備高、中或低重要性的屬性,Spring Boot自動配置支持全部重要屬性、一些選定的中屬性和低屬性,以及任何沒有默認值的屬性。

Kafka支持的屬性中只有一部分是能夠經過KafkaProperties類得到的,若是你但願爲生產者或消費者配置不受直接支持的其餘屬性,請使用如下屬性:

spring.kafka.properties.prop.one=first
spring.kafka.admin.properties.prop.two=second
spring.kafka.consumer.properties.prop.three=third
spring.kafka.producer.properties.prop.four=fourth

這將設置通用的prop.oneKafka屬性爲first(適用於生產者、消費者和管理員),prop.two 管理員屬性爲secondprop.three消費者屬性爲third而且prop.four生產者屬性爲fourth

你還能夠配置Spring Kafka JsonDeserializer,以下所示:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.Invoice
spring.kafka.consumer.properties.spring.json.trusted.packages=com.example,org.acme

一樣,能夠禁用JsonSerializer在header中發送類型信息的默認行爲:

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.add.type.headers=false
以這種方式設置的屬性將覆蓋Spring Boot顯式支持的任何配置項。

下一篇:使用RestTemplate調用REST服務

相關文章
相關標籤/搜索