Spring Cloud Stream消息驅動@SendTo和消息降級

參考程序員DD大佬的文章,本身新建demo學習學習,因爲須要消息回執,看到了@SendTo這個註解可以實現,下面開始學習demo,新建兩個項目 cloud-stream-consumer 消費端 和 cloud-stream-consumer 生產端程序員

public interface StreamReceive {

    @Input("MQRece")
    SubscribableChannel mqReceive();
}

添加一個 StreamReceive 接口,定義@input通道spring

@Component
@Slf4j
public class ReceiveListener {

    @StreamListener("MQRece")
    public byte[] receive(byte[] bytes){
        log.info("接受消息:"+new String(bytes));
        return "ok".getBytes();
    }

}

添加消息監聽,接受消息定義爲 byte[] segmentfault

添加 application.properties 配置文件信息app

spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876
spring.cloud.stream.bindings.MQRece.destination=message-topic
spring.cloud.stream.bindings.MQRece.group=rece-group
server.port=19999

爲MQRece通道添加主題 message-topic ,組名 rece-group 學習

到此Stream 客戶端消費就完成了,本節須要把@SendTo註解用起來,須要新建一個MessageChannel進行產生消息測試

public interface MsgBackPush {
    
    @Output("back-push")
    MessageChannel backPush();
}

而後在 ReceiveListener 添加@SendToui

@Component
@Slf4j
public class ReceiveListener {

    @StreamListener("MQRece")
    @SendTo("back-push")
    public byte[] receive(byte[] bytes){
        log.info("接受消息:"+new String(bytes));
        return "ok".getBytes();
    }

}

新增通道配置 application.properties spa

spring.cloud.stream.bindings.back-push.destination=back-topic
spring.cloud.stream.bindings.back-push.group=back-group

SpringBoot啓動類記得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class}) 3d

@SpringBootApplication
@EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
public class CloudStreamConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamConsumerApplication.class, args);
    }

}

到此,cloud-stream-consumer這個demo就完成了日誌

接下來看看 cloud-stream-producer

public interface StreamPush {

    @Output("MQPush")
    MessageChannel mqPush();
}

定義一個通道名爲 MQPush ,進行消息生產

public interface ProducerReceive {


    @Input("producer-receive")
    SubscribableChannel producerReceive();
}

定義一個通道名爲 producer-receive ,進行回執消息的消費

@Component
@Slf4j
public class ProducerListener {
    @StreamListener("producer-receive")
    public void producerReceive(byte[] bytes){
        log.info("come back message:"+new String(bytes));
    }
}

具體回執消息處理邏輯,再來看看 application.properties

spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876
spring.cloud.stream.bindings.MQPush.destination=message-topic
spring.cloud.stream.bindings.MQPush.group=push-group


spring.cloud.stream.bindings.producer-receive.destination=back-topic
spring.cloud.stream.bindings.producer-receive.group=back-group


server.port=20000

爲通道設置topic和group,新建一個Http接口測試一下成果

@SpringBootApplication
@EnableBinding(value = {StreamPush.class,ProducerReceive.class})
@RestController
public class CloudStreamProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(CloudStreamProducerApplication.class, args);
    }

    @Autowired
    private StreamPush streamPush;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build());
        return "ok";
    }
}

訪問 http://localhost:20000/sendMessage ,結果圖以下

cloud-stream-consumer日誌輸出

file

cloud-stream-producer日誌輸出

file

學習@ServiceActivator這個註解,上面的項目 cloud-stream-consumer ReceiveListener類中添加

@Component
@Slf4j
public class ReceiveListener {

    @StreamListener("MQRece")
    @SendTo("back-push")
    public byte[] receive(byte[] bytes){
        log.info("接受消息:"+new String(bytes));
                // 拋出異常
        if(1==1){
            throw new RuntimeException("Message consumer failed!");
        }
        return "ok".getBytes();
    }

    @Autowired
    private MsgBackPush msgBackPush;

    @ServiceActivator(inputChannel = "message-topic.rece-group.errors")
    public void error(Message<?> message){
        log.info("消費者消費消息失敗:"+message);
        msgBackPush.backPush().send(MessageBuilder.withPayload("消息消費失敗".getBytes()).build());
    }
}

經過使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某個通道的錯誤處理映射。其中,inputChannel的配置中對應關係以下:

  • message-topic:消息通道對應的目標(destination,即:spring.cloud.stream.bindings.MQRece.destination的配置)
  • rece-group:消息通道對應的消費組(group,即:spring.cloud.stream.bindings.MQRece.group的配置)

訪問 http://localhost:20000/sendMessage ,結果圖以下

cloud-stream-consumer日誌輸出

file

cloud-stream-producer日誌輸出

file

我的聯繫方式QQ:944484545,歡迎你們的加入,分享學習是一件開心事
相關文章
相關標籤/搜索