Spring Cloud Stream應用與自定義RocketMQ Binder:實現RocketMQ綁定器

前言: 本文做者張天,節選自筆者與其合著的《Spring Cloud微服務架構進階》,即將在八月出版問世。本文將其中Spring Cloud Stream應用與自定義Rocketmq Binder的內容抽取出來,主要介紹實現Spring Cloud Stream 的RocketMQ綁定器。java

Stream的Binder機制

在上一篇中,介紹了Spring Cloud Stream基本的概念及其編程模型。除此以外,Spring Cloud Stream提供了Binder接口來用於和外部消息隊列進行綁定。本文將講述Binder SPI的基本概念,主要組件和實現細節。 Binder SPI經過一系列的接口,工具類和檢測機制提供了與外部消息隊列綁定的綁定器機制。SPI的關鍵點是Binder接口,這個接口負責提供和外部消息隊列進行綁定的具體實現。git

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
}
複製代碼

一個典型的自定義Binder組件實現應該包括如下幾點:github

  • 一個實現Binder接口的類。
  • 一個Spring的@Configuration類來建立上述類型的實例。
  • 在classpath上一個包含自定義Binder相關配置類的META-INF/spring.binders文件,好比說:
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
複製代碼

Spring Cloud Stream基於Binder SPI的實現來進行channel和消息隊列的綁定任務。不一樣類型的消息隊列中間件實現了不一樣的綁定器Binder。好比說:Spring-Cloud-Stream-Binder-Kafka是針對Kafka的Binder實現,而Spring-Cloud-Stream-Binder-Rabbit則是針對RabbitMQ的Binder實現。spring

Spring Cloud Stream依賴於Spring Boot的自動配置機制來配置Binder。若是一個Binder實如今項目的classpath中被發現,Spring Cloud Stream將會自動使用它。好比說,一個Spring Cloud Stream項目須要綁定RabbitMQ中間件的Binder,在pom文件中加入下面的依賴來輕鬆實現。編程

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
複製代碼

Binder For RocketMQ

Spring Cloud Stream爲接入不一樣的消息隊列提供了一整套的自定義機制,經過爲每一個消息隊裏開發一個Binder來接入該消息隊列。目前官方認定的Binder爲rabbitmq binder和kafka binder。可是開發人員能夠基於Stream Binder的機制來制定本身的Binder。下面咱們就構建一個簡單的RocketMQ的Binder。bash

配置類

須要在resources/META-INF/spring.binders文件中配置有關RocketMQ的Configuration類,該配置類會使用@Import來導入爲RocketMQ制定的RocketMessageChannelBinderConfiguration微信

rocket:\
org.springframework.cloud.stream.binder.rocket.config.RocketServiceAutoConfiguration
複製代碼

RocketMessageChannelBinderConfiguration將會提供兩個極其重要的bean實例,分別爲RocketMessageChannelBinderRocketExchangeQueueProvisionerRocketMessageChannelBinder主要是用於channel和消息隊列的綁定,而RocketExchangeQueueProvisioner則封裝了RocketMQ的相關API,能夠用於建立消息隊列的基礎組件,好比說隊列,交換器等。架構

@Configuration
public class RocketMessageChannelBinderConfiguration {
    @Autowired
    private ConnectionFactory rocketConnectionFactory;
    @Autowired
    private RocketProperties  rocketProperties;
    @Bean
    RocketMessageChannelBinder rocketMessageChannelBinder() throws Exception {
        RocketMessageChannelBinder binder = new RocketMessageChannelBinder(this.rocketConnectionFactory,
                this.rocketProperties, provisioningProvider());
        return binder;
    }
    @Bean
    RocketExchangeQueueProvisioner provisioningProvider() {
        return new RocketExchangeQueueProvisioner(this.rocketConnectionFactory);
    }
}
複製代碼

RocketMessageChannelBinder繼承了抽象類AbstractMessageChannelBinder,並實現了#producerMessageHandler和#createConsumerEndpoint函數。併發

MessageHandler有向消息隊列發送消息的能力,#createProducerMessageHandler函數就是爲了建立MessageHandler對象,來將輸出型Channel的消息發送到消息隊列上。異步

protected MessageHandler createProducerMessageHandler( ProducerDestination destination, ExtendedProducerProperties<RocketProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
    final AmqpOutboundEndpoint endpoint = new AmqpOutboundEndpoint(
            buildRocketTemplate(producerProperties.getExtension(), errorChannel != null));
    return endpoint;
}
複製代碼

MessageProducer可以從消息隊列接收消息,並將該消息發送輸入型Channel。

@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String group, ExtendedConsumerProperties<RocketConsumerProperties> properties) throws Exception {
    SimpleRocketMessageListenerContainer listenerContainer = new SimpleRocketMessageListenerContainer();
    RocketInboundChannelAdapter rocketInboundChannelAdapter = new RocketInboundChannelAdapter(listenerContainer);
    return rocketInboundChannelAdapter;
}
複製代碼

消息接收功能的實現流程

相似於RabbitMQ的Binder,須要實現下面一系列的類來實現從RocketMQ到對應MessageChannel的消息傳遞。

  1. RocketBlockingQueueConsumer.InnerConsumer實現了MessageListenerConcurrently來接收RocketMQ傳遞的消息。
  2. RocketBlockingQueueConsumer將InnerConsumer註冊給RocketMQ的DefaultMQPushConsumer來接收RocketMQ傳遞過來的消息,並存儲在自身的阻塞隊列中。供SimpleRocketMessageListenerContainer獲取。
  3. SimpleRocketMessageListenerContainer,啓動一個線程來不停從RocketBlockingQueueConsumer獲取消息,而後調用RocketInboundChannelAdapter.Listener的回調函數,將消息傳遞給RocketInboundChannelAdapter。
  4. RocketInboundChannelAdapter.Listener供SimpleRocketMessageListenerContainer回調,將消息發送給RocketInboundChannelAdapter。
  5. RocketInboundChannelAdapter,接受SimpleRocketMessageListenerContainer傳遞過來的消息,而後經過MessageTemplate發送給相應的MessageChannel。從而傳遞給由@StreamListener的修飾的函數。

InnerConsumer接收RocketMQ消息

InnerConsumer實現的MessageListenerConcurrently接口是RocketMQ中用於併發接受異步消息的接口,該接口能夠接收到RocketMQ發送過來的異步消息。而InnerConsumer在接受到消息以後,會將消息封裝成RocketDelivery加入到阻塞隊列中。

RocketBlockingQueueConsumer有一個阻塞隊列來存儲RocketMQ傳遞給RocketBlockingQueueConsumer.InnerConsumer的消息,而nextMessage函數能夠從阻塞隊列中拉取一個消息並返回。

AsyncMessageProcessingConsumer獲取消息

SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer是實現了Runnable接口,在run()接口中會無限循環地調用SimpleRocketMessageListenerContainer自己的receiveAndExecute。

@Override
public void run() {
    if (!isActive()) {
        return;
    }
    try {
        //只要consumer的狀態正常,就會一直循環
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            try {
                boolean receivedOk = receiveAndExecute(this.consumer);
            }
            catch (ListenerExecutionFailedException ex) {
                if (ex.getCause() instanceof NoSuchMethodException) {
                    throw new FatalListenerExecutionException("Invalid listener", ex);
                }
            }
            catch (AmqpRejectAndDontRequeueException rejectEx) {
            } catch (Throwable e) {
            }
        }
    } catch (Exception e) {
    }
    finally {
        if (getTransactionManager() != null) {
            ConsumerChannelRegistry.unRegisterConsumerChannel();
        }
    }
    this.start.countDown();
    if (!isActive(this.consumer) || aborted) {
        this.consumer.stop();
    }
    else {
        restart(this.consumer);
    }
}
複製代碼

函數#receiveAndExecute最終的做用就是調用RocketBlockingQueueConsumer的nextMessage,而後再將消息調用messageListener.onMessage函數將消息傳遞出去。

初始化RocketBlockingQueueConsumer和AsyncMessageProcessingConsumer

SimpleRocketMessageListenerContainer的doStart函數會初始化RocketBlockingQueueConsumer而且啓動SimpleRocketMessageListenerContainer的AsyncMessageProcessingConsumer會無限循環地從RocketBlockingQueueConsumer中獲取RocketMQ傳遞過來的消息。

private void doStart() {
    synchronized (this.lifecycleMonitor) {
        this.active = true;
        this.running = true;
        this.lifecycleMonitor.notifyAll();
    }
    synchronized (this.consumersMonitor) {
        if (this.consumers != null) {
            throw new IllegalStateException("A stopped container should not have consumers");
        }
        //初始化Consumer
        int newConsumers = initializeConsumers();
        if (this.consumers == null) {
            return;
        }
        if (newConsumers <= 0) {
            return;
        }
        Set<SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer> processors =
                new HashSet<>();
        //對於每一個RocketBlockingQueueConsumer啓動一個
        //AsyncMessageProcessingConsumer來執行任務
        for (RocketBlockingQueueConsumer consumer : this.consumers) {
            SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer
                    processor = new SimpleRocketMessageListenerContainer.AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            getTaskExecutor().execute(processor);
        }
    }
}
複製代碼

發送消息給MessageChannel

RocketInboundChannelAdapter實現了MessageProducer接口。它主要將SimpleRocketMessageListenerContainer傳遞過來的消息通過MessageTemplate傳遞給MessageChannel。

接下來則是RocketInboundChannelAdapter.Listener的實現,它就是RocketBlockingQueueConsumer.nextMessage函數中的messageListener。

public class Listener implements ChannelAwareMessageListener, RetryListener {
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            this.createAndSend(message, channel);
        } catch (RuntimeException var7) {
            if (RocketInboundChannelAdapter.this.getErrorChannel() == null) {
                throw var7;
            }
       RocketInboundChannelAdapter.this.getMessagingTemplate().send(RocketInboundChannelAdapter.this.getErrorChannel(), RocketInboundChannelAdapter.this.buildErrorMessage((org.springframework.messaging.Message)null, new ListenerExecutionFailedException("Message conversion failed", var7, message)));
        }
    }
    private void createAndSend(Message message, Channel channel) {
        org.springframework.messaging.Message<Object> messagingMessage = this.createMessage(message, channel);
        RocketInboundChannelAdapter.this.sendMessage(messagingMessage);
    }
    private org.springframework.messaging.Message<Object> createMessage(Message message, Channel channel) {
        Object payload = RocketInboundChannelAdapter.this.messageConverter.fromMessage(message);
        org.springframework.messaging.Message<Object> messagingMessage = RocketInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).build();
        return messagingMessage;
    }
}
複製代碼

RocketMQ的管理器

RocketProvisioningProvider實現了ProvisioningProvider接口,它有兩個函數:provisionProducerDestination和provisionConsumerDestination,分別用於建立ProducerDestination和ConsumerDestination。RocketProvisioningProvider的實現相似於RabbitProvisioningProvider。只不過在聲明隊列,交換器和綁定時使用了RocketAdmin所實現的RocketMQ的相關API。

總結

本文概要介紹了Spring Cloud Stream的Rocketmq綁定器的實現,限於篇幅不展開具體的代碼講解。讀者感興趣,能夠關注GitHub上的代碼。根據Spring Cloud Stream抽象的接口,咱們能夠自由地實現各類消息隊列的綁定器。

項目GitHub地址:https://github.com/ztelur/spring-cloud-stream-binder-rocket 推薦閱讀:Spring Cloud Stream應用與自定義RocketMQ Binder:編程模型

訂閱最新文章,歡迎關注個人公衆號

微信公衆號
相關文章
相關標籤/搜索