參考程序員DD大佬的文章,本身新建demo學習學習,因爲須要消息回執,看到了@SendTo這個註解可以實現,下面開始學習demo,新建兩個項目 cloud-stream-consumer
消費端 和 cloud-stream-consumer
生產端程序員
public interface StreamReceive { @Input("MQRece") SubscribableChannel mqReceive(); }
添加一個 StreamReceive
接口,定義@input
通道spring
@Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); return "ok".getBytes(); } }
添加消息監聽,接受消息定義爲 byte[]
segmentfault
添加 application.properties
配置文件信息app
spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.211.11:9876 spring.cloud.stream.bindings.MQRece.destination=message-topic spring.cloud.stream.bindings.MQRece.group=rece-group server.port=19999
爲MQRece通道添加主題 message-topic
,組名 rece-group
學習
到此Stream 客戶端消費就完成了,本節須要把@SendTo註解用起來,須要新建一個MessageChannel進行產生消息測試
public interface MsgBackPush { @Output("back-push") MessageChannel backPush(); }
而後在 ReceiveListener
添加@SendToui
@Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") @SendTo("back-push") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); return "ok".getBytes(); } }
新增通道配置 application.properties
spa
spring.cloud.stream.bindings.back-push.destination=back-topic spring.cloud.stream.bindings.back-push.group=back-group
SpringBoot啓動類記得添加 @EnableBinding(value = {StreamReceive.class,MsgBackPush.class})
3d
@SpringBootApplication @EnableBinding(value = {StreamReceive.class,MsgBackPush.class}) public class CloudStreamConsumerApplication { public static void main(String[] args) { SpringApplication.run(CloudStreamConsumerApplication.class, args); } }
到此,cloud-stream-consumer這個demo就完成了日誌
接下來看看 cloud-stream-producer
public interface StreamPush { @Output("MQPush") MessageChannel mqPush(); }
定義一個通道名爲 MQPush
,進行消息生產
public interface ProducerReceive { @Input("producer-receive") SubscribableChannel producerReceive(); }
定義一個通道名爲 producer-receive
,進行回執消息的消費
@Component @Slf4j public class ProducerListener { @StreamListener("producer-receive") public void producerReceive(byte[] bytes){ log.info("come back message:"+new String(bytes)); } }
具體回執消息處理邏輯,再來看看 application.properties
spring.cloud.stream.rocketmq.binder.namesrv-addr= 192.168.214.191:9876 spring.cloud.stream.bindings.MQPush.destination=message-topic spring.cloud.stream.bindings.MQPush.group=push-group spring.cloud.stream.bindings.producer-receive.destination=back-topic spring.cloud.stream.bindings.producer-receive.group=back-group server.port=20000
爲通道設置topic和group,新建一個Http接口測試一下成果
@SpringBootApplication @EnableBinding(value = {StreamPush.class,ProducerReceive.class}) @RestController public class CloudStreamProducerApplication { public static void main(String[] args) { SpringApplication.run(CloudStreamProducerApplication.class, args); } @Autowired private StreamPush streamPush; @GetMapping("/sendMessage") public String sendMessage(){ streamPush.mqPush().send(MessageBuilder.withPayload("message body".getBytes()).build()); return "ok"; } }
訪問 http://localhost:20000/sendMessage
,結果圖以下
cloud-stream-consumer日誌輸出
cloud-stream-producer日誌輸出
學習@ServiceActivator這個註解,上面的項目 cloud-stream-consumer
ReceiveListener類中添加
@Component @Slf4j public class ReceiveListener { @StreamListener("MQRece") @SendTo("back-push") public byte[] receive(byte[] bytes){ log.info("接受消息:"+new String(bytes)); // 拋出異常 if(1==1){ throw new RuntimeException("Message consumer failed!"); } return "ok".getBytes(); } @Autowired private MsgBackPush msgBackPush; @ServiceActivator(inputChannel = "message-topic.rece-group.errors") public void error(Message<?> message){ log.info("消費者消費消息失敗:"+message); msgBackPush.backPush().send(MessageBuilder.withPayload("消息消費失敗".getBytes()).build()); } }
經過使用@ServiceActivator(inputChannel = "test-topic.stream-exception-handler.errors")指定了某個通道的錯誤處理映射。其中,inputChannel的配置中對應關係以下:
訪問 http://localhost:20000/sendMessage
,結果圖以下
cloud-stream-consumer日誌輸出
cloud-stream-producer日誌輸出