【本人禿頂程序員】使用Spring Cloud Stream和RabbitMQ實現事件驅動的微服務

←←←←←←←←←←←← 快!點關注java

讓咱們展現如何使用Spring Cloud Stream來設計事件驅動的微服務。首先,Spring Cloud Stream首先有什麼好處?由於Spring AMPQ提供了訪問AMPQ工件所需的一切。若是您不熟悉Spring AMPQ,請查看此repo,其中包含許多有用的示例。那麼爲何要使用Spring Cloud Stream ......?git

Spring Cloud Stream概念

  • Spring Cloud Stream經過Binder概念將使用過的消息代理與Spring Integration消息通道綁定在一塊兒。支持RabbitMQ和Kafka。
  • Spring Cloud Stream將基礎架構配置從代碼中分離爲屬性文件。這意味着即便您更改了底層代理,您的Spring Integration代碼也將是相同的!

示例中的Spring Cloud Stream概念(RabbitMQ)

讓咱們有一個名爲streamInput的交換,它有兩個隊列streamInput.cities和streamInput.persons。如今讓咱們將這兩個隊列插入兩個消息通道citiesChannel和personsChannel來消費來自它的傳入消息。使用Spring AMPQ,您須要建立SimpleMessageListenerContainer並在代碼中鏈接基礎結構。但這有不少樣板代碼。使用Spring Cloud Stream,您能夠將AMPQ配置分離到屬性文件:程序員

spring.cloud.stream.bindings.citiesChannel.destination=streamInput  
spring.cloud.stream.bindings.citiesChannel.group=cities  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities

spring.cloud.stream.bindings.personsChannel.destination=streamInput  
spring.cloud.stream.bindings.personsChannel.group=persons  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons

配置詳細信息

在類路徑上使用RabbitMQ Binder,每一個目標都映射到TopicExchange。在示例中,我建立了名爲streamInput的TopicExchange, 並將其附加到兩個消息通道citiesChannel和personsChannel。spring

spring.cloud.stream.bindings.citiesChannel.destination = streamInput   
spring.cloud.stream.bindings.personsChannel.destination = streamInput

如今您須要瞭解RabbitMQ綁定器的靈感來自Kafka,隊列的消費者被分組到消費者組中,其中只有一個消費者將得到消息。這是有道理的,由於您能夠輕鬆擴展消費者。服務器

所以,讓咱們建立兩個隊列streamInput.persons和streamInput.cities並將它們附加到streamInput TopicExchange和提到的消息通道架構

# This will create queue "streamInput.cities" connected to message channel citiesChannel where input messages will land.  
spring.cloud.stream.bindings.citiesChannel.group=cities 

# Durable subscription, of course.  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.durableSubscription=true 

# AMPQ binding to exchange (previous spring.cloud.stream.bindings.<channel name>.destination settings).  
# Only messages with routingKey = 'cities' will land here.  
spring.cloud.stream.rabbit.bindings.citiesChannel.consumer.bindingRoutingKey=cities 

spring.cloud.stream.bindings.personsChannel.group=persons  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.durableSubscription=true  
spring.cloud.stream.rabbit.bindings.personsChannel.consumer.bindingRoutingKey=persons

鏈接屬性到Spring Integration

好的,到目前爲止我建立了兩個隊列。StreamInput.cities綁定到citiesChannel。StreamInput.persons綁定到peopleChannel。less

<destination>.<group>是Spring Cloud Stream約定的隊列命名,如今讓咱們將它鏈接到Spring Integration:微服務

package com.example.spring.cloud.configuration;

import org.springframework.cloud.stream.annotation.Input;  
import org.springframework.messaging.SubscribableChannel;

/**  
 \* Created by tomask79 on 30.03.17.  
 */  
public interface SinkRabbitAPI {

    String INPUT_CITIES = "citiesChannel";

    String INPUT_PERSONS = "personsChannel";

    @Input(SinkRabbitAPI.INPUT_CITIES)  
    SubscribableChannel citiesChannel();

    @Input(SinkRabbitAPI.INPUT_PERSONS)  
    SubscribableChannel personsChannel();  
}

Spring Boot啓動時加載這個屬性測試

package com.example.spring.cloud;

import com.example.spring.cloud.configuration.SinkRabbitAPI;  
import com.example.spring.cloud.configuration.SourceRabbitAPI;  
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;  
import org.springframework.cloud.stream.annotation.EnableBinding;  
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication  
@EnableBinding({SinkRabbitAPI.class})  
public class StreamingApplication {

    public static void main(String\[\] args) {  
        SpringApplication.run(StreamingApplication.class, args);  
    }  
}

在此以後,咱們能夠建立消費者從綁定的消息通道中的隊列接收消息:ui

import com.example.spring.cloud.configuration.SinkRabbitAPI;  
import com.example.spring.cloud.configuration.SourceRabbitAPI;  
import org.springframework.cloud.stream.annotation.StreamListener;  
import org.springframework.integration.support.MessageBuilder;  
import org.springframework.messaging.MessageChannel;  
import org.springframework.messaging.handler.annotation.SendTo;  
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**  
 \* Created by tomask79 on 30.03.17.  
 */  
@Service  
public class ProcessingAMPQEndpoint {

    @StreamListener(SinkRabbitAPI.INPUT_CITIES)  
    public void processCity(final String city) {  
        System.out.println("Trying to process input city: "+city);  
    }

    @StreamListener(SinkRabbitAPI.INPUT_PERSONS)  
    public void processPersons(final String person) {  
        System.out.println("Trying to process input person: "+person);  
    }  
}

RabbitMQ綁定器和代理配置

Spring Cloud Stream如何知道在哪裏尋找消息中間件?若是在類路徑中找到RabbitMQ綁定器,則使用默認RabbitMQ主機(localhost)和端口(5672)鏈接到RabbitMQ服務器。若是您的消息中間件配置在不一樣端口,則須要配置屬性:

spring:  
  cloud:  
    stream:  
      bindings:  
        ...  
      binders:  
          rabbitbinder:  
            type: rabbit  
            environment:  
              spring:  
                rabbitmq:  
                  host: rabbitmq  
                  port: 5672  
                  username: XXX  
                  password: XXX

測試消息消費

  • 安裝並運行RabbitMQ代理
  • rabbitmq.git
  • mvn clean install
  • java -jar target / streaming-0.0.1-SNAPSHOT.jar
  • 如今使用路由鍵'cities'或'persons'在streamInput Exchange上發佈消息...輸出應該是:
Started StreamingApplication in 6.513 seconds (JVM running for 6.92)   
Trying to process input city: sdjfjljksdflkjsdflkjsdfsfd  
Trying to process input person: sdjfjljksdflkjsdflkjsdfsfd

使用Spring Cloud Stream從新傳遞消息

您一般但願在進入DLX交換以前再次嘗試接收消息。首先,讓咱們配置Spring Cloud Stream嘗試從新發送失敗消息的次數:

spring.cloud.stream.bindings.personsChannel.consumer.maxAttempts = 6

這意味着若是從streamInput.persons隊列接收的消息出錯,那麼Spring Cloud Stream將嘗試從新發送六次。讓咱們試試,首先讓咱們修改接收端點以模擬接收崩潰:

 @StreamListener(SinkRabbitAPI.INPUT_PERSONS)  
    public void processPersons(final String person) {  
        System.out.println("Trying to process input person: "+person);  
        throw new RuntimeException();  
    }

若是我如今嘗試使用人員路由鍵將某些內容發佈到streamInput交換中,那麼這應該是輸出:

Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
Trying to process input person: sfsdfsdfsd  
 Retry Policy Exhausted  
        at org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer.recover  
(RejectAndDontRequeueRecoverer.java:45) ~\[spring-rabbit-1.7.0.RELEASE.jar! /:na\]  
        at org.springframework.amqp.rabbit.config.StatelessRetryOperationsInterc       

建議將Spring Cloud Stream 用於事件驅動的MicroServices,由於它能夠節省時間,並且您不須要爲Java中的AMPQ基礎架構編寫樣板代碼。

寫在最後:

禿頂程序員的不易,看到這裏,點了關注吧! 點關注,不迷路,持續更新!!!

相關文章
相關標籤/搜索