spring-cloud stream + rabbitmq 小記

此篇主要記錄spring-cloud stream,對rabbitmq的安裝使用不作累述。java

  1. 建立stream-receiver做爲消費者
    pom.xml:
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud-stream</artifactId>
            <groupId>com.cherrish</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>stream-receiver</artifactId>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    
    </project>

    application.properties:
     web

    spring.application.name=customer
    server.port=7889
    
    spring.rabbitmq.host=192.168.1.17
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    
    spring.cloud.stream.bindings.input.destination=sink-channel
    spring.cloud.stream.bindings.output.destination=sink-channel#不指定該輸出通道沒法接收消息

    Java 代碼:
     spring

    /**********************************CustomerApp.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    /**
     * @author cherrishccl
     * @time 2018-08-31 14:53
     * @name CustomerApp
     * @desc:
     */
    @SpringBootApplication
    public class CustomerApp {
        private static Logger logger = LoggerFactory.getLogger(CustomerApp.class);
        public static void main(String[] args) {
            SpringApplication.run(CustomerApp.class, args);
        }
    
    }
    /**********************************SinkReceiver.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.messaging.Message;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    
    /**
     * @author cherrish
     * @time 2018-08-31 14:39
     * @name SinkReceiver
     * @desc:
     */
    @EnableBinding(value = Sink.class)
    public class SinkReceiver {
        private static final AtomicInteger NUM = new AtomicInteger(0);
        private static Logger log = LoggerFactory.getLogger(SinkReceiver.class);
        @StreamListener(Sink.INPUT)
        public void receive(Message<String> payload) {
            log.info(NUM.getAndIncrement() + " Received : " + payload.getPayload());
        }
    }

     

  2. 建立stream-sender做爲生產者
    pom.xml:
     apache

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cloud-stream</artifactId>
            <groupId>com.cherrish</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>stream-sender</artifactId>
    
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            </dependency>
        </dependencies>
    
    </project>

    application.properties:
     app

    spring.application.name=producer
    server.port=7888
    
    spring.rabbitmq.host=192.168.1.195
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=admin
    spring.rabbitmq.password=admin
    
    spring.cloud.stream.bindings.input.destination=sink-channel
    spring.cloud.stream.bindings.output.destination=sink-channel


    Java代碼:maven

    /**********************************ProducerApp.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    /**
     * @author cherrish
     * @time 2018-08-31 14:52
     * @name ProducerApp
     * @desc:
     */
    @EnableScheduling
    @SpringBootApplication
    public class ProducerApp {
        private static Logger logger = LoggerFactory.getLogger(ProducerApp.class);
        public static void main(String[] args) {
            SpringApplication.run(ProducerApp.class, args);
        }
    }
    /**********************************SinkSender.java*********************************/
    package com.cherrish;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.integration.annotation.InboundChannelAdapter;
    import org.springframework.integration.annotation.Poller;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author cherrish
     * @time 2018-08-30 16:16
     * @name SinkSender
     * @desc:
     */
    @EnableBinding(value = {Source.class})
    public class SinkSender {
        private static Logger log = LoggerFactory.getLogger(SinkSender.class);
        private final static AtomicInteger NUM = new AtomicInteger(0);
        @InboundChannelAdapter(value = Source.OUTPUT,poller = @Poller(fixedRate = "3000"))
        public String timerMessageSource() {
            String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
            log.info(NUM.getAndIncrement() + " Send message : " + format);
            return format;
        }
    
        @Autowired
        Source source;
        public void send(String message){
            source.output().send(org.springframework.integration.support.MessageBuilder.withPayload(message).build());
        }
    }
    /**********************************ScheduleTimer.java*********************************/
    
    package com.cherrish;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    
    /**
     * @author cherrish
     * @time 2018-09-03 10:58
     * @name ScheduleTimer
     * @desc:
     */
    @Component
    public class ScheduleTimer {
        @Autowired
        SinkSender sender;
        @Scheduled(fixedRate = 3000)
        public void send(){
            sender.send("定時調用發送消息" + new Date());
        }
    }

    ---------------------------
    父pom.xml:
     spring-boot

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
           <groupId>com.cherrish</groupId>
            <version>1.0-SNAPSHOT</version>
        <artifactId>cloud-stream</artifactId>
        <packaging>pom</packaging>
        <modules>
            <module>stream-sender</module>
            <module>stream-receiver</module>
        </modules>
    
       <properties>
            <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <java.version>1.8</java.version>
            <spring-boot.version>2.0.4.RELEASE</spring-boot.version>
            <skipTests>true</skipTests>
        </properties>
    
       <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
        <dependencyManagement>
            <dependencies>
                <dependency>
                    <groupId>org.springframework.cloud</groupId>
                    <artifactId>spring-cloud-dependencies</artifactId>
                    <version>${spring-cloud.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
                <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-dependencies</artifactId>
                    <version>${spring-boot.version}</version>
                    <type>pom</type>
                    <scope>import</scope>
                </dependency>
            </dependencies>
        </dependencyManagement>
    
    </project>
相關文章
相關標籤/搜索