springCloud stream message 開發 單元測試

1.maven依賴
2.發送消息及單元測試web

2.1發送消息 使用默認提供的Source類,提供一個"Output" MessageChannel
2.2測試消息發送 使用testsupport類MessageCollector拉取消息
2.3配置內容

3.消息接收和轉發及單元測試spring

3.1 消息處理 含接收和轉發 EnableBinding Processor.class
3.2測試類
3.3配置

4.消息接收處理及參數驗證測試json

4.1 消息接受處理
4.2測試驗證接收入參
4.3配置

5.自定義通道發送接收app

5.1自定義消息發送接收通道 MySource MySink
5.2配置通道
5.3消息發送使用
5.4消息接收使用

1.maven依賴dom

<dependencies>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <scope>test</scope>
</dependency>
</dependencies>

2.1發送消息
使用默認提供的Source類,提供一個"Output" MessageChannelmaven

/***
 * 定時任務發送消息
 * */
@EnableScheduling
@EnableBinding(Source.class)
public class UsageDetailSender {

    @Autowired
    private Source source;

    private String[] users = {"user1", "user2", "user3", "user4", "user5"};

    @Scheduled(fixedDelay = 1000)
    public void sendEvents() {
        UsageDetail usageDetail = new UsageDetail();

        usageDetail.setUserId(this.users[new Random().nextInt(5)]);
        usageDetail.setDuration(new Random().nextInt(300));
        usageDetail.setData(new Random().nextInt(700));
        this.source.output()
            .send(MessageBuilder.withPayload(usageDetail).build());
    }
}

2.2測試消息發送
使用testsupport類MessageCollector拉取消息ide

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageDetailSenderApplicationTests {

    @Autowired
    private MessageCollector messageCollector;

    @Autowired
    private Source source;

    @Test
    public void contextLoads() {
    }

    @Test
    public void testUsageDetailSender() throws Exception {
        Message message = this.messageCollector.forChannel(
                this.source.output())
                .poll(1, TimeUnit.SECONDS);
        String usageDetailJSON = message.getPayload().toString();
        assertTrue(usageDetailJSON.contains("userId"));
        assertTrue(usageDetailJSON.contains("duration"));
        assertTrue(usageDetailJSON.contains("data"));
    }
}

2.3配置內容spring-boot

spring.cloud.stream.bindings.output.destination=usage-detail
spring.cloud.stream.bindings.output.producer.requiredGroups=usage-cost-consumer

3.1 消息處理 EnableBinding Processor.class單元測試

@EnableBinding(Processor.class)
public class UsageCostProcessor {

    private double ratePerSecond = 0.1;

    private double ratePerMB = 0.05;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public UsageCostDetail processUsageCost(UsageDetail usageDetail) {
        UsageCostDetail usageCostDetail = new UsageCostDetail();
        usageCostDetail.setUserId(usageDetail.getUserId());
        usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
        usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
        return usageCostDetail;
    }
}

3.2測試類測試

@RunWith(SpringRunner.class)
@SpringBootTest
public class UsageCostProcessorRabbitApplicationTests {
@Autowired
private Processor processor;
@Autowired
private MessageCollector messageCollector;
@Test
public void testUsageCostProcessor() throws Exception {
    this.processor.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"duration\":101,\"data\":502}").build());
    Message message = this.messageCollector.forChannel(this.processor.output()).poll(1, TimeUnit.SECONDS);
    assertTrue(message.getPayload().toString().equals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}"));
}
}

3.3配置

spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.input.group=usage-cost-consumer
spring.cloud.stream.bindings.output.destination=usage-cost
spring.cloud.stream.bindings.output.producer.requiredGroups=logger

4.1 消息接受處理

@EnableBinding(Sink.class)
public class UsageCostLogger {
private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerRabbitApplication.class);
@StreamListener(Sink.INPUT)
public void process(UsageCostDetail usageCostDetail) {
    logger.info(usageCostDetail.toString());
}
}

4.2測試驗證接收入參

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class UsageCostLoggerRabbitApplicationTests {
@Autowired
protected Sink sink;
@Autowired
protected UsageCostLogger usageCostLogger;
@Test
public void testUsageCostLogger() throws Exception {
    ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class);
    this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
    //捕獲usageCostLogger.process的入參
    verify(this.usageCostLogger).process(captor.capture());
    //校驗入參值
    assertEquals("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}", captor.getValue());
}
@EnableAutoConfiguration
@EnableBinding(Sink.class)
static class TestConfig {
    // Override `UsageCostLogger` bean for spying.
    @Bean
    @Primary
    public UsageCostLogger usageCostLogger() {
        return spy(new UsageCostLogger());
    }
}
}

4.3配置

spring.cloud.stream.bindings.input.destination=usage-cost
spring.cloud.stream.bindings.input.group=logger

5.1自定義消息發送接收通道

public interface MySource {

    String OC_BILL_OUTPUT = "oc-bill-output";

    /**
     * 發佈對帳單通知
     * @return 消息體
     */
    @Output(OC_BILL_OUTPUT)
    MessageChannel paymentBill();
}
public interface MySink {
    String OC_BILL_INPUT = "oc-bill-input";

    /**
     * 對帳消息接收
     * @return 消息體
     */
    @Input(OC_BILL_INPUT)
    SubscribableChannel paymentBill();
}

5.2配置通道

spring.cloud.stream.bindings.oc-bill-input.destination=payment-bill
spring.cloud.stream.bindings.oc-bill-input.content-type=application/json
spring.cloud.stream.bindings.oc-bill-input.group=pay-admin

spring.cloud.stream.bindings.oc-bill-output.destination=pay-payment-bill
spring.cloud.stream.bindings.oc-bill-output.content-type=application/json
spring.cloud.stream.bindings.oc-bill-output.group=pay-admin

5.3消息發送使用

@RestController
public class MyController {

    @Autowired
    private MySource mySource;

    /**發送對帳通知*/
    @PostMapping(value = "/send")
    public ResponseResult send(@RequestBody BillVO billVO) {

        Message<BillVO> message = MessageBuilder.withPayload(billVO).build();

        mySource.paymentBill().send(message);

        return ResponseResult.ofSuccess();

    }
}

5.4消息接收使用

@StreamListener(MySink.OC_BILL_INPUT)
public void check(@Payload BillVO billVO){

    process(billVO);
}
相關文章
相關標籤/搜索