SpringCloud學習(七):stream 消息驅動

菜鳥學渣接觸spring cloud 系列...java

公司也上微服務了,再不學習下就涼了,因此來踩坑吧...web

版本:spring

  spring-boot:  2.0apache

  spring-cloud: Finchley.SR1app

已有項目:maven

  [eureka-server]              # 註冊中心   port 8761ide

  [eureka-client-one]       #  微服務1    port 8501spring-boot

  [eureka-client-two]       #  微服務2    port 8502微服務

  [eureka-client-turbine] #  斷路監控   port 8503學習

  [eureka-client-zuul]      #  網關服務   port 8601

  [eureka-client-sleuth]      #  鏈路追蹤 port 8602

能上圖毫不BB

  

  spring-cloud-stream 支持RabbitMQ、Kafka 組件的消息系統,這裏選RabbitMQ

  大體這樣理解: 微服務ABCD(吃貨)不斷髮(吃披薩的)消息到RabbitMQ(餓了嗎),微服務F(賣披薩的)一直監聽着RabbitMQ,收到ABCD的消息後,立馬打包披薩送到ABCD家裏,地址從消息裏來的。

 

零、 安裝RabbitMQ-server端

  這裏使用stream-rabbitmq基於rabbitMQ實現,須要先在電腦安裝RabbitMQ-server

  Windows 下安裝RabbitMQ:

    1. 安裝erlang 語言環境

    2. 安裝RabbitMQ-server

    3. 啓用plugin

1、stream 消息產生和接收

  這裏就把產生和接收放到一個微服務裏面了,不分開寫了

  新建 [eureka-client-stream]  

  引入依賴   spring-cloud-stream、spring-cloud-starter-stream-rabbit

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.renzku</groupId>
    <artifactId>eureka-client-stream</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>eureka-client-stream</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.SR1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <!-- or '*-stream-kafka' -->
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>
View Code

  配置文件   application.yml

server:
  port: 8603

spring:
  application:
    name: eureka-client-stream
  cloud:
    stream:
      bindings:
        input:        # 接收
          destination:  eureka-client-stream-des    # exchange名稱
        output:     # 產生
          destination:  eureka-client-stream-des    # input和output一致即可溝通
  rabbitmq:   # rabbitMQ-server 信息
    host: localhost 
    port: 5672
    username: guest
    password: guest

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

  啓動類   EurekaClientStreamApplication.java

@SpringBootApplication
public class EurekaClientStreamApplication {

    public static void main(String[] args) {

        SpringApplication.run(EurekaClientStreamApplication.class, args);
    }
}

  產生消息的類   HelloStreamSource.java

@EnableBinding(Source.class)
public class HelloStreamSource {

    @Resource
    private MessageChannel output;

    public void sendTestData() {
        String s = "source msg";
        this.output.send(MessageBuilder.withPayload(s).build());  // 發出消息
    }
}

  接收消息的類   HelloStreamSink.java

@EnableBinding(Sink.class)
public class HelloStreamSink {

    @StreamListener(Sink.INPUT)
    public void input(String s){

        System.out.println("input:" + s);
    }
}

  Rest服務  HelloStream.java

@RestController
public class HelloStream {

    @Autowired
    private HelloStreamSource helloStreamSource;

    @RequestMapping("/stream")
    public String HelloStream(){
        // 發出消息
        helloStreamSource.sendTestData();
        return "hello stream";
    }
}

  目錄結構

  

  啓動 [eureka-client-stream]

  訪問  http://localhost:8603/stream, rabbitMQ中Exchanges添加了 eureka-client-stream-des 

  

  控制檯也打印了接收到的信息:

  

 

2、自定義消息通道

  新建通道接口類  CustomProcessor.java

/**
 * 自定義消息通道
 */
public interface CustomProcessor {
    String INPUT = "customInput";
    String OUTPUT = "customOutput";

    @Input(CustomProcessor.INPUT)
    MessageChannel customInput();

    @Output(CustomProcessor.OUTPUT)
    MessageChannel customOutput();
}

  配置文件  application.yml

server:
  port: 8603

spring:
  application:
    name: eureka-client-stream
  cloud:
    stream:
      bindings:
        input:        # 接收
          destination:  eureka-client-stream-des    # exchange名稱
        output:     # 產生
          destination:  eureka-client-stream-des    # input和output一致即可溝通
        customInput:
          destination:  eureka-client-stream-cust-des
        customOutput:
          destination:  eureka-client-stream-cust-des  # 若是用上面output值,那上面input也會接收到這裏發的消息
  rabbitmq:   # rabbitMQ-server 信息
    host: localhost 
    port: 5672
    username: guest
    password: guest

eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

 

  新建消息產生類  HelloStreamCustomProcessorSource.java

@EnableBinding(CustomProcessor.class)
public class HelloStreamCustomProcessorSource {

    @Resource
    private MessageChannel customOutput;

    public void sendTestData() {
        String s = "custom source msg";
        this.customOutput.send(MessageBuilder.withPayload(s).build());
    }
}

  新建消息接收類   HelloStreamCustomProcessorSink.java

@EnableBinding(CustomProcessor.class)
public class HelloStreamCustomProcessorSink {

    @StreamListener(CustomProcessor.INPUT)
    public void input(String s){

        System.out.println("custom input:" + s);
    }
}

  Rest服務   HelloStream.java

@RestController
public class HelloStream {

    @Autowired
    private HelloStreamSource helloStreamSource;

    @Autowired private HelloStreamCustomProcessorSource helloStreamCustomProcessorSource;

    @RequestMapping("/stream")
    public String HelloStream(){
        helloStreamSource.sendTestData();
        return "hello stream";
    }

    @RequestMapping("/stream/cust")
    public String HelloStreamCust(){
        helloStreamCustomProcessorSource.sendTestData();
        return "hello stream cust"; }
}

  啓動並訪問  http://localhost:8603/stream/cust , rabbitMQ中Exchanges添加了 eureka-client-stream-cust-des

  

  控制檯也打印了新通道接收到的信息:

  

 

這個能夠用來和spring cloud config 結合,更新微服務配置信息

spring-cloud-bus.jar!\org\springframwork\cloud\bus\SpringCloudBusClient.java  瞭解一下

相關文章
相關標籤/搜索