<!-- spring-boot --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.1.0.RELEASE</version> </dependency> <!-- spring-cloud-rocketmq --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>0.2.2.BUILD-SNAPSHOT</version> </dependency> <!-- spring-cloud --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Greenwich.M3</version> <type>pom</type> <scope>import</scope> </dependency>
若是找不到,則添加一下配置web
<repositories> <repository> <id>spring-snapshot</id> <name>Spring Snapshot Repository</name> <url>https://repo.spring.io/snapshot</url> <snapshots> <enabled>true</enabled> </snapshots> </repository> </repositories> #若是使用私服nexus的話,再添加這個代理倉庫時,注意倉庫類型:snapshot要容許代理
spring: cloud: stream: default-binder: rocketmq rocketmq: binder: #rocketmq地址 name-server: 192.168.0.78:9876 bindings: #自定義的名稱 對應spring.cloud.stream.bindings.output1 output1: producer: group: test-group-user-ouput1 sync: true #Binding: 包括 Input Binding 和 Output Binding。 #Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋樑, #實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據便可,屏蔽了開發者與底層消息中間件的接觸。 bindings: #自定義的名稱 output1: destination: test-topic-user # topic(一級分類) content-type: application/json
spring.cloud.stream.default-binder=rocketmq spring.cloud.stream.rocketmq.binder.name-server=192.168.0.78:9876 spring.cloud.stream.rocketmq.bindings.output1.producer.group=test-group-user-ouput1 spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true spring.cloud.stream.bindings.output1.destination=test-topic-user spring.cloud.stream.bindings.output1.content-type=application/json
定義name爲output1的Output Bindingspring
public interface MySource { @Output("output1") MessageChannel output1(); }
在Application 中添加註解json
@SpringBootApplication @EnableBinding({ MySource.class }) //MySource爲上面定義Binding的接口 public class RocketMQConsumerApplication { ... }
注入 定義Bingding的 接口app
@Autowired private MySource source;
String msg = ...; source.output1().send(MessageBuilder.withPayload(msg).build());
Object msg = ...; String tag = "test-tag";//tag爲二級分類,topic爲一級分類,能夠根據兩個類別進行訂閱 Message message = MessageBuilder.createMessage(msg, new MessageHeaders(Stream.of(tag).collect(Collectors.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString)))); source.output1().send(message);
Object msg = ...; String tag = "test-tag" Message message = MessageBuilder.withPayload(msg) .setHeader(MessageConst.PROPERTY_TAGS, tag) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) .build(); source.output1().send(message);
// TODO
spring: cloud: stream: default-binder: rocketmq rocketmq: binder: name-server: 192.168.1.179:9876 bindings: #自定義的名稱 對應spring.cloud.stream.bindings.input1 input1: consumer: tags: test-tag1 # 訂閱的tag,二級分類 orderly: false # 是否按順序消費 bindings: #自定義的名稱 input1: destination: test-topic-user # 訂閱的topic ,一級分類 content-type: application/json group: test-input-group1 #group consumer: concurrency: 20 maxAttempts: 1
定義name爲input1的Input Bindingide
public interface MySource { @Input("input1") SubscribableChannel input1(); }
在Application 中添加註解spring-boot
@SpringBootApplication @EnableBinding({ MySource.class }) //MySource爲上面定義Binding的接口 public class RocketMQConsumerApplication { ... }
/** * 消費者 */ @Service public class ReceiveService { // 接受字符串 // @StreamListener("input1") // public void receiveInput1( String receiveMsg) { // System.out.println("input1 receive: " + receiveMsg); // } /** * 接受對象 */ @StreamListener("input1") public void receiveInput3(@Payload List<BranchInfoEntity> list ) { System.out.println("input3 receive: " + JSON.toJSONString(list)); } }