前言: 本文做者張天,節選自筆者與其合著的《Spring Cloud微服務架構進階》,即將在八月出版問世。本文將其中Spring Cloud Stream應用與自定義Rocketmq Binder的內容抽取出來,主要介紹實現Spring Cloud Stream 的RocketMQ綁定器。java
在上一篇中,介紹了Spring Cloud Stream基本的概念及其編程模型。除此以外,Spring Cloud Stream提供了Binder接口來用於和外部消息隊列進行綁定。本文將講述Binder SPI的基本概念,主要組件和實現細節。 Binder SPI經過一系列的接口,工具類和檢測機制提供了與外部消息隊列綁定的綁定器機制。SPI的關鍵點是Binder接口,這個接口負責提供和外部消息隊列進行綁定的具體實現。git
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
複製代碼
一個典型的自定義Binder組件實現應該包括如下幾點:github
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
複製代碼
Spring Cloud Stream基於Binder SPI的實現來進行channel和消息隊列的綁定任務。不一樣類型的消息隊列中間件實現了不一樣的綁定器Binder。好比說:Spring-Cloud-Stream-Binder-Kafka是針對Kafka的Binder實現,而Spring-Cloud-Stream-Binder-Rabbit則是針對RabbitMQ的Binder實現。spring
Spring Cloud Stream依賴於Spring Boot的自動配置機制來配置Binder。若是一個Binder實如今項目的classpath中被發現,Spring Cloud Stream將會自動使用它。好比說,一個Spring Cloud Stream項目須要綁定RabbitMQ中間件的Binder,在pom文件中加入下面的依賴來輕鬆實現。編程
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
複製代碼
Spring Cloud Stream爲接入不一樣的消息隊列提供了一整套的自定義機制,經過爲每一個消息隊裏開發一個Binder來接入該消息隊列。目前官方認定的Binder爲rabbitmq binder和kafka binder。可是開發人員能夠基於Stream Binder的機制來制定本身的Binder。下面咱們就構建一個簡單的RocketMQ的Binder。bash
須要在resources/META-INF/spring.binders文件中配置有關RocketMQ的Configuration類,該配置類會使用@Import來導入爲RocketMQ制定的RocketMessageChannelBinderConfiguration
。微信
rocket:\
org.springframework.cloud.stream.binder.rocket.config.RocketServiceAutoConfiguration
複製代碼
RocketMessageChannelBinderConfiguration
將會提供兩個極其重要的bean實例,分別爲RocketMessageChannelBinder
和RocketExchangeQueueProvisioner
。RocketMessageChannelBinder
主要是用於channel和消息隊列的綁定,而RocketExchangeQueueProvisioner
則封裝了RocketMQ的相關API,能夠用於建立消息隊列的基礎組件,好比說隊列,交換器等。架構
@Configuration
public class RocketMessageChannelBinderConfiguration {
@Autowired
private ConnectionFactory rocketConnectionFactory;
@Autowired
private RocketProperties rocketProperties;
@Bean
RocketMessageChannelBinder rocketMessageChannelBinder() throws Exception {
RocketMessageChannelBinder binder = new RocketMessageChannelBinder(this.rocketConnectionFactory,
this.rocketProperties, provisioningProvider());
return binder;
}
@Bean
RocketExchangeQueueProvisioner provisioningProvider() {
return new RocketExchangeQueueProvisioner(this.rocketConnectionFactory);
}
}
複製代碼
RocketMessageChannelBinder
繼承了抽象類AbstractMessageChannelBinder
,並實現了#producerMessageHandler和#createConsumerEndpoint函數。併發
MessageHandler有向消息隊列發送消息的能力,#createProducerMessageHandler函數就是爲了建立MessageHandler對象,來將輸出型Channel的消息發送到消息隊列上。異步
protected MessageHandler createProducerMessageHandler( ProducerDestination destination, ExtendedProducerProperties<RocketProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
buildRocketTemplate(producerProperties.getExtension(), errorChannel != null));
return endpoint;
}
複製代碼
MessageProducer可以從消息隊列接收消息,並將該消息發送輸入型Channel。
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties<RocketConsumerProperties> properties) throws Exception {
SimpleRocketMessageListenerContainer listenerContainer = new SimpleRocketMessageListenerContainer();
RocketInboundChannelAdapter rocketInboundChannelAdapter = new RocketInboundChannelAdapter(listenerContainer);
return rocketInboundChannelAdapter;
}
複製代碼
相似於RabbitMQ的Binder,須要實現下面一系列的類來實現從RocketMQ到對應MessageChannel的消息傳遞。
InnerConsumer實現的MessageListenerConcurrently接口是RocketMQ中用於併發接受異步消息的接口,該接口能夠接收到RocketMQ發送過來的異步消息。而InnerConsumer在接受到消息以後,會將消息封裝成RocketDelivery加入到阻塞隊列中。
RocketBlockingQueueConsumer有一個阻塞隊列來存儲RocketMQ傳遞給RocketBlockingQueueConsumer.InnerConsumer的消息,而nextMessage函數能夠從阻塞隊列中拉取一個消息並返回。
SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer是實現了Runnable接口,在run()接口中會無限循環地調用SimpleRocketMessageListenerContainer自己的receiveAndExecute。
@Override
public void run() {
if (!isActive()) {
return;
}
try {
//只要consumer的狀態正常,就會一直循環
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
try {
boolean receivedOk = receiveAndExecute(this.consumer);
}
catch (ListenerExecutionFailedException ex) {
if (ex.getCause() instanceof NoSuchMethodException) {
throw new FatalListenerExecutionException("Invalid listener", ex);
}
}
catch (AmqpRejectAndDontRequeueException rejectEx) {
} catch (Throwable e) {
}
}
} catch (Exception e) {
}
finally {
if (getTransactionManager() != null) {
ConsumerChannelRegistry.unRegisterConsumerChannel();
}
}
this.start.countDown();
if (!isActive(this.consumer) || aborted) {
this.consumer.stop();
}
else {
restart(this.consumer);
}
}
複製代碼
函數#receiveAndExecute最終的做用就是調用RocketBlockingQueueConsumer的nextMessage,而後再將消息調用messageListener.onMessage函數將消息傳遞出去。
SimpleRocketMessageListenerContainer的doStart函數會初始化RocketBlockingQueueConsumer而且啓動SimpleRocketMessageListenerContainer的AsyncMessageProcessingConsumer會無限循環地從RocketBlockingQueueConsumer中獲取RocketMQ傳遞過來的消息。
private void doStart() {
synchronized (this.lifecycleMonitor) {
this.active = true;
this.running = true;
this.lifecycleMonitor.notifyAll();
}
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
//初始化Consumer
int newConsumers = initializeConsumers();
if (this.consumers == null) {
return;
}
if (newConsumers <= 0) {
return;
}
Set<SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer> processors =
new HashSet<>();
//對於每一個RocketBlockingQueueConsumer啓動一個
//AsyncMessageProcessingConsumer來執行任務
for (RocketBlockingQueueConsumer consumer : this.consumers) {
SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer
processor = new SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
getTaskExecutor().execute(processor);
}
}
}
複製代碼
RocketInboundChannelAdapter實現了MessageProducer接口。它主要將SimpleRocketMessageListenerContainer傳遞過來的消息通過MessageTemplate傳遞給MessageChannel。
接下來則是RocketInboundChannelAdapter.Listener的實現,它就是RocketBlockingQueueConsumer.nextMessage函數中的messageListener。
public class Listener implements ChannelAwareMessageListener, RetryListener {
public void onMessage(Message message, Channel channel) throws Exception {
try {
this.createAndSend(message, channel);
} catch (RuntimeException var7) {
if (RocketInboundChannelAdapter.this.getErrorChannel() == null) {
throw var7;
}
RocketInboundChannelAdapter.this.getMessagingTemplate().send(RocketInboundChannelAdapter.this.getErrorChannel(), RocketInboundChannelAdapter.this.buildErrorMessage((org.springframework.messaging.Message)null, new ListenerExecutionFailedException("Message conversion failed", var7, message)));
}
}
private void createAndSend(Message message, Channel channel) {
org.springframework.messaging.Message<Object> messagingMessage = this.createMessage(message, channel);
RocketInboundChannelAdapter.this.sendMessage(messagingMessage);
}
private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
Object payload = RocketInboundChannelAdapter.this.messageConverter.fromMessage(message);
org.springframework.messaging.Message<Object> messagingMessage = RocketInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).build();
return messagingMessage;
}
}
複製代碼
RocketProvisioningProvider實現了ProvisioningProvider接口,它有兩個函數:provisionProducerDestination和provisionConsumerDestination,分別用於建立ProducerDestination和ConsumerDestination。RocketProvisioningProvider的實現相似於RabbitProvisioningProvider。只不過在聲明隊列,交換器和綁定時使用了RocketAdmin所實現的RocketMQ的相關API。
本文概要介紹了Spring Cloud Stream的Rocketmq綁定器的實現,限於篇幅不展開具體的代碼講解。讀者感興趣,能夠關注GitHub上的代碼。根據Spring Cloud Stream抽象的接口,咱們能夠自由地實現各類消息隊列的綁定器。
項目GitHub地址:https://github.com/ztelur/spring-cloud-stream-binder-rocket 推薦閱讀:Spring Cloud Stream應用與自定義RocketMQ Binder:編程模型