spring cloud boot RocketMQ 引入指南

  • MAVEN 配置
  • 生產者配置
    • application.yml配置
    • application.properties配置
    • 定義Output Binding
    • 開啓Binding(開啓MQ)
    • 發送消息
  • 消費者配置
    • application.yml配置
    • 定義Input Binding
    • 開啓Binding(開啓MQ)
    • 接受消息

 

MAVEN 配置

<!-- spring-boot -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.1.0.RELEASE</version>
            </dependency>

<!-- spring-cloud-rocketmq -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    <version>0.2.2.BUILD-SNAPSHOT</version>
</dependency>

<!-- spring-cloud -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Greenwich.M3</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

若是找不到,則添加一下配置web

<repositories>
    <repository>
        <id>spring-snapshot</id>
        <name>Spring Snapshot Repository</name>
        <url>https://repo.spring.io/snapshot</url>
        <snapshots>
            <enabled>true</enabled>
        </snapshots>
    </repository>
</repositories>

#若是使用私服nexus的話,再添加這個代理倉庫時,注意倉庫類型:snapshot要容許代理

生產者配置

application.yml配置

spring:
    cloud:
        stream:
            default-binder: rocketmq
            rocketmq:
                binder:
                    #rocketmq地址
                    name-server: 192.168.0.78:9876
                bindings:
                    #自定義的名稱 對應spring.cloud.stream.bindings.output1
                    output1:
                        producer:
                            group: test-group-user-ouput1
                            sync: true
            #Binding: 包括 Input Binding 和 Output Binding。
            #Binding 在消息中間件與應用程序提供的 Provider 和 Consumer 之間提供了一個橋樑,
            #實現了開發者只需使用應用程序的 Provider 或 Consumer 生產或消費數據便可,屏蔽了開發者與底層消息中間件的接觸。
            bindings:
                #自定義的名稱
                output1:
                    destination: test-topic-user   # topic(一級分類)
                    content-type: application/json

application.properties配置

spring.cloud.stream.default-binder=rocketmq
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.78:9876
spring.cloud.stream.rocketmq.bindings.output1.producer.group=test-group-user-ouput1
spring.cloud.stream.rocketmq.bindings.output1.producer.sync=true
spring.cloud.stream.bindings.output1.destination=test-topic-user
spring.cloud.stream.bindings.output1.content-type=application/json

定義Output Binding

定義name爲output1的Output Bindingspring

public interface MySource {
    @Output("output1")
    MessageChannel output1();
}

開啓Binding(開啓MQ)

在Application 中添加註解json

@SpringBootApplication
@EnableBinding({ MySource.class }) //MySource爲上面定義Binding的接口
public class RocketMQConsumerApplication {
    ...
}

發送消息

注入 定義Bingding的 接口app

@Autowired
private MySource source;
  • 直接發送消息,沒有tag和key
String msg = ...;
source.output1().send(MessageBuilder.withPayload(msg).build());
  • 攜帶tag發送消息
Object msg = ...;
String tag = "test-tag";//tag爲二級分類,topic爲一級分類,能夠根據兩個類別進行訂閱
Message message = MessageBuilder.createMessage(msg,
				new MessageHeaders(Stream.of(tag).collect(Collectors.toMap(str -> MessageConst.PROPERTY_TAGS, String::toString))));
source.output1().send(message);
  • 發送對象
Object msg = ...;
String tag = "test-tag"
Message message = MessageBuilder.withPayload(msg)
				.setHeader(MessageConst.PROPERTY_TAGS, tag)
				.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
				.build();
source.output1().send(message);
  • 發送事務消息
// TODO

消費者配置

application.yml配置

spring:
    cloud:
        stream:
            default-binder: rocketmq
            rocketmq:
                binder:
                    name-server: 192.168.1.179:9876
                bindings:
                    #自定義的名稱 對應spring.cloud.stream.bindings.input1
                    
                    input1:
                        consumer:
                            tags: test-tag1 # 訂閱的tag,二級分類
                            orderly: false  # 是否按順序消費
            bindings:
                #自定義的名稱
                input1:
                    destination: test-topic-user # 訂閱的topic ,一級分類
                    content-type: application/json
                    group: test-input-group1    #group
                    consumer:
                        concurrency: 20
                        maxAttempts: 1

定義Input Binding

定義name爲input1的Input Bindingide

public interface MySource {
    @Input("input1")
    SubscribableChannel input1();
}

開啓Binding(開啓MQ)

在Application 中添加註解spring-boot

@SpringBootApplication
@EnableBinding({ MySource.class }) //MySource爲上面定義Binding的接口
public class RocketMQConsumerApplication {
    ...
}

接受消息

/**
 * 消費者
 */
@Service
public class ReceiveService {
// 接受字符串
//	@StreamListener("input1")
//	public void receiveInput1( String receiveMsg) {
//		System.out.println("input1 receive: " + receiveMsg);
//	}

    /**
     * 接受對象
     */
	@StreamListener("input1")
	public void receiveInput3(@Payload List<BranchInfoEntity> list ) {
		System.out.println("input3 receive: " + JSON.toJSONString(list));
	}

}
相關文章
相關標籤/搜索