此篇主要記錄spring-cloud stream,對rabbitmq的安裝使用不作累述。java
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud-stream</artifactId> <groupId>com.cherrish</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>stream-receiver</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> </dependencies> </project>
application.properties:
web
spring.application.name=customer server.port=7889 spring.rabbitmq.host=192.168.1.17 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.cloud.stream.bindings.input.destination=sink-channel spring.cloud.stream.bindings.output.destination=sink-channel#不指定該輸出通道沒法接收消息
Java 代碼:
spring
/**********************************CustomerApp.java*********************************/ package com.cherrish; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author cherrishccl * @time 2018-08-31 14:53 * @name CustomerApp * @desc: */ @SpringBootApplication public class CustomerApp { private static Logger logger = LoggerFactory.getLogger(CustomerApp.class); public static void main(String[] args) { SpringApplication.run(CustomerApp.class, args); } } /**********************************SinkReceiver.java*********************************/ package com.cherrish; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import java.util.concurrent.atomic.AtomicInteger; /** * @author cherrish * @time 2018-08-31 14:39 * @name SinkReceiver * @desc: */ @EnableBinding(value = Sink.class) public class SinkReceiver { private static final AtomicInteger NUM = new AtomicInteger(0); private static Logger log = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(Message<String> payload) { log.info(NUM.getAndIncrement() + " Received : " + payload.getPayload()); } }
建立stream-sender做爲生產者
pom.xml:
apache
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>cloud-stream</artifactId> <groupId>com.cherrish</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>stream-sender</artifactId> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> </dependencies> </project>
application.properties:
app
spring.application.name=producer server.port=7888 spring.rabbitmq.host=192.168.1.195 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.cloud.stream.bindings.input.destination=sink-channel spring.cloud.stream.bindings.output.destination=sink-channel
Java代碼:maven
/**********************************ProducerApp.java*********************************/ package com.cherrish; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; /** * @author cherrish * @time 2018-08-31 14:52 * @name ProducerApp * @desc: */ @EnableScheduling @SpringBootApplication public class ProducerApp { private static Logger logger = LoggerFactory.getLogger(ProducerApp.class); public static void main(String[] args) { SpringApplication.run(ProducerApp.class, args); } } /**********************************SinkSender.java*********************************/ package com.cherrish; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.atomic.AtomicInteger; /** * @author cherrish * @time 2018-08-30 16:16 * @name SinkSender * @desc: */ @EnableBinding(value = {Source.class}) public class SinkSender { private static Logger log = LoggerFactory.getLogger(SinkSender.class); private final static AtomicInteger NUM = new AtomicInteger(0); @InboundChannelAdapter(value = Source.OUTPUT,poller = @Poller(fixedRate = "3000")) public String timerMessageSource() { String format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); log.info(NUM.getAndIncrement() + " Send message : " + format); return format; } @Autowired Source source; public void send(String message){ source.output().send(org.springframework.integration.support.MessageBuilder.withPayload(message).build()); } } /**********************************ScheduleTimer.java*********************************/ package com.cherrish; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.Date; /** * @author cherrish * @time 2018-09-03 10:58 * @name ScheduleTimer * @desc: */ @Component public class ScheduleTimer { @Autowired SinkSender sender; @Scheduled(fixedRate = 3000) public void send(){ sender.send("定時調用發送消息" + new Date()); } }
---------------------------
父pom.xml:
spring-boot
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.cherrish</groupId> <version>1.0-SNAPSHOT</version> <artifactId>cloud-stream</artifactId> <packaging>pom</packaging> <modules> <module>stream-sender</module> <module>stream-receiver</module> </modules> <properties> <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <java.version>1.8</java.version> <spring-boot.version>2.0.4.RELEASE</spring-boot.version> <skipTests>true</skipTests> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> </project>