1.maven依賴
2.發送消息及單元測試web
2.1發送消息 使用默認提供的Source類,提供一個"Output" MessageChannel 2.2測試消息發送 使用testsupport類MessageCollector拉取消息 2.3配置內容
3.消息接收和轉發及單元測試spring
3.1 消息處理 含接收和轉發 EnableBinding Processor.class 3.2測試類 3.3配置
4.消息接收處理及參數驗證測試json
4.1 消息接受處理 4.2測試驗證接收入參 4.3配置
5.自定義通道發送接收app
5.1自定義消息發送接收通道 MySource MySink 5.2配置通道 5.3消息發送使用 5.4消息接收使用
1.maven依賴dom
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
2.1發送消息
使用默認提供的Source類,提供一個"Output" MessageChannelmaven
/*** * 定時任務發送消息 * */ @EnableScheduling @EnableBinding(Source.class) public class UsageDetailSender { @Autowired private Source source; private String[] users = {"user1", "user2", "user3", "user4", "user5"}; @Scheduled(fixedDelay = 1000) public void sendEvents() { UsageDetail usageDetail = new UsageDetail(); usageDetail.setUserId(this.users[new Random().nextInt(5)]); usageDetail.setDuration(new Random().nextInt(300)); usageDetail.setData(new Random().nextInt(700)); this.source.output() .send(MessageBuilder.withPayload(usageDetail).build()); } }
2.2測試消息發送
使用testsupport類MessageCollector拉取消息ide
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class UsageDetailSenderApplicationTests { @Autowired private MessageCollector messageCollector; @Autowired private Source source; @Test public void contextLoads() { } @Test public void testUsageDetailSender() throws Exception { Message message = this.messageCollector.forChannel( this.source.output()) .poll(1, TimeUnit.SECONDS); String usageDetailJSON = message.getPayload().toString(); assertTrue(usageDetailJSON.contains("userId")); assertTrue(usageDetailJSON.contains("duration")); assertTrue(usageDetailJSON.contains("data")); } }
2.3配置內容spring-boot
spring.cloud.stream.bindings.output.destination=usage-detail spring.cloud.stream.bindings.output.producer.requiredGroups=usage-cost-consumer
3.1 消息處理 EnableBinding Processor.class單元測試
@EnableBinding(Processor.class) public class UsageCostProcessor { private double ratePerSecond = 0.1; private double ratePerMB = 0.05; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public UsageCostDetail processUsageCost(UsageDetail usageDetail) { UsageCostDetail usageCostDetail = new UsageCostDetail(); usageCostDetail.setUserId(usageDetail.getUserId()); usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond); usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB); return usageCostDetail; } }
3.2測試類測試
@RunWith(SpringRunner.class) @SpringBootTest public class UsageCostProcessorRabbitApplicationTests { @Autowired private Processor processor; @Autowired private MessageCollector messageCollector; @Test public void testUsageCostProcessor() throws Exception { this.processor.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"duration\":101,\"data\":502}").build()); Message message = this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS); assertTrue(message.getPayload().toString().equals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}")); } }
3.3配置
spring.cloud.stream.bindings.input.destination=usage-detail spring.cloud.stream.bindings.input.group=usage-cost-consumer spring.cloud.stream.bindings.output.destination=usage-cost spring.cloud.stream.bindings.output.producer.requiredGroups=logger
4.1 消息接受處理
@EnableBinding(Sink.class) public class UsageCostLogger { private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerRabbitApplication.class); @StreamListener(Sink.INPUT) public void process(UsageCostDetail usageCostDetail) { logger.info(usageCostDetail.toString()); } }
4.2測試驗證接收入參
@RunWith(SpringRunner.class) @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class UsageCostLoggerRabbitApplicationTests { @Autowired protected Sink sink; @Autowired protected UsageCostLogger usageCostLogger; @Test public void testUsageCostLogger() throws Exception { ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class); this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build()); //捕獲usageCostLogger.process的入參 verify(this.usageCostLogger).process(captor.capture()); //校驗入參值 assertEquals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}", captor.getValue()); } @EnableAutoConfiguration @EnableBinding(Sink.class) static class TestConfig { // Override `UsageCostLogger` bean for spying. @Bean @Primary public UsageCostLogger usageCostLogger() { return spy(new UsageCostLogger()); } } }
4.3配置
spring.cloud.stream.bindings.input.destination=usage-cost spring.cloud.stream.bindings.input.group=logger
5.1自定義消息發送接收通道
public interface MySource { String OC_BILL_OUTPUT = "oc-bill-output"; /** * 發佈對帳單通知 * @return 消息體 */ @Output(OC_BILL_OUTPUT) MessageChannel paymentBill(); }
public interface MySink { String OC_BILL_INPUT = "oc-bill-input"; /** * 對帳消息接收 * @return 消息體 */ @Input(OC_BILL_INPUT) SubscribableChannel paymentBill(); }
5.2配置通道
spring.cloud.stream.bindings.oc-bill-input.destination=payment-bill spring.cloud.stream.bindings.oc-bill-input.content-type=application/json spring.cloud.stream.bindings.oc-bill-input.group=pay-admin spring.cloud.stream.bindings.oc-bill-output.destination=pay-payment-bill spring.cloud.stream.bindings.oc-bill-output.content-type=application/json spring.cloud.stream.bindings.oc-bill-output.group=pay-admin
5.3消息發送使用
@RestController public class MyController { @Autowired private MySource mySource; /**發送對帳通知*/ @PostMapping(value = "/send") public ResponseResult send(@RequestBody BillVO billVO) { Message<BillVO> message = MessageBuilder.withPayload(billVO).build(); mySource.paymentBill().send(message); return ResponseResult.ofSuccess(); } }
5.4消息接收使用
@StreamListener(MySink.OC_BILL_INPUT) public void check(@Payload BillVO billVO){ process(billVO); }