Spring Cloud Stream是一個構建消息驅動微服務的框架,應用程序經過input(至關於consumer)、output(至關於producer)來與Spring Cloud Stream中Binder交互,而Binder負責與消息中間件交互;所以,咱們只需關注如何與Binder交互便可,而無需關注與具體消息中間件的交互。java
一、添加依賴spring
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> <version>2.1.2.RELEASE</version> </dependency>
二、配置express
provider配置(採用動態路由鍵方式)json
server: port: 7071 spring: cloud: stream: binders: pro: type: rabbit environment: spring: rabbitmq: addresses: localhost port: 5672 username: test password: test virtual-host: test bindings: myOutPut: destination: myOutPut content-type: application/json default-binder: test rabbit: bindings: myOutPut: producer: exchangeType: topic routing-key-expression: headers.routeId
consumer配置app
server: port: 7072 spring: cloud: stream: rabbit: bindings: input: consumer: bindingRoutingKey: routeKey1 acknowledge-mode: manual binders: protest: type: rabbit environment: spring: rabbitmq: addresses: localhost port: 5672 username: test password: test virtual-host: test bindings: input: destination: myOutPut content-type: application/json default-binder: protest group: group-cus1
三、java端
provider框架
public interface MqMessageSource {//自定義通道 String MY_OUT_PUT = "myOutPut"; @Output(MY_OUT_PUT) MessageChannel testOutPut(); }
@EnableBinding(MqMessageSource.class) public class MessageProviderImpl implements IMessageProvider { @Autowired @Output(MqMessageSource.MY_OUT_PUT) private MessageChannel channel; @Override public void send(Company company) { channel.send(MessageBuilder.withPayload(company).setHeader("routeId", company.getTitle()).build()); } }
consumeride
@Component @EnableBinding(Sink.class) public class MessageListener { @StreamListener(Sink.INPUT) public void input(Message<Company> message) throws IOException { Channel channel = (com.rabbitmq.client.Channel)message.getHeaders().get(AmqpHeaders.CHANNEL); Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); channel.basicAck(deliveryTag, false); System.err.println(JSON.toJSONString(message.getPayload())); } }
END微服務