(十七) 整合spring cloud雲架構 -消息驅動 Spring Cloud Stream

在使用spring cloud雲架構的時候,咱們不得不使用Spring cloud Stream,由於消息中間件的使用在項目中無處不在,咱們公司後面作了娛樂方面的APP,在使用spring cloud作架構的時候,其中消息的異步通知,業務的異步處理都須要使用消息中間件機制。spring cloud的官方給出的集成建議(使用rabbit mq和kafka),我看了一下源碼和配置,只要把rabbit mq集成,kafka只是換了一個pom配置jar包而已,閒話少說,咱們就直接進入配置實施:html

1. 簡介:java

Spring cloud Stream 數據流操做開發包,封裝了與Redis,Rabbit、Kafka等發送接收消息。web

2. 使用工具:spring

rabbit,具體的下載和安裝細節我這裏不作太多講解,網上的實例太多了json

3. 建立commonservice-mq-producer消息的發送者項目,在pom裏面配置stream-rabbit的依賴架構

<span style="font-size: 16px;"><!-- 引入MQ消息驅動的微服務包,引入stream只須要進行配置化便可,是對rabbit、kafka很好的封裝 -->  
2.<dependency>  
3.    <groupId>org.springframework.cloud</groupId>  
4.    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  
5.</dependency></span>

 4. 在yml文件裏面配置rabbit mqapp

<span style="font-size: 16px;">server:  
2.  port: 5666  
3.spring:  
4.  application:  
5.    name: commonservice-mq-producer  
6.  profiles:   
7.    active: dev  
8.  cloud:  
9.    config:  
10.      discovery:   
11.        enabled: true  
12.        service-id: commonservice-config-server  
13.  <span style="color: rgb(255, 0, 0);"># rabbitmq和kafka都有相關配置的默認值,若是修改,能夠再次進行配置  
14.    stream:  
15.      bindings:  
16.        mqScoreOutput:   
17.          destination: honghu_exchange  
18.          contentType: application/json  
19.            
20.  rabbitmq:  
21.     host: localhost  
22.     port: 5672  
23.     username: honghu  
24.     password: honghu</span>  
25.eureka:   
26.  client:  
27.    service-url:  
28.      defaultZone: http://honghu:123456@localhost:8761/eureka  
29.  instance:  
30.    prefer-ip-address: true</span>

5. 定義接口ProducerService框架

<span style="font-size: 16px;">package com.honghu.cloud.producer;  
2.  
3.import org.springframework.cloud.stream.annotation.Output;  
4.import org.springframework.messaging.SubscribableChannel;  
5.  
6.public interface ProducerService {  
7.      
8.    String SCORE_OUPUT = "mqScoreOutput";  
9.      
10.    @Output(ProducerService.SCORE_OUPUT)  
11.    SubscribableChannel sendMessage();  
12.}</span>

6. 定義綁定異步

<span style="font-size: 16px;">package com.honghu.cloud.producer;  
2.  
3.import org.springframework.cloud.stream.annotation.EnableBinding;  
4.  
5.@EnableBinding(ProducerService.class)  
6.public class SendServerConfig {  
7.  
8.}</span>

7. 定義發送消息業務ProducerController微服務

<span style="font-size: 16px;">package com.honghu.cloud.controller;  
2.  
3.  
4.import org.springframework.beans.factory.annotation.Autowired;  
5.import org.springframework.integration.support.MessageBuilder;  
6.import org.springframework.messaging.Message;  
7.import org.springframework.web.bind.annotation.PathVariable;  
8.import org.springframework.web.bind.annotation.RequestBody;  
9.import org.springframework.web.bind.annotation.RequestMapping;  
10.import org.springframework.web.bind.annotation.RequestMethod;  
11.import org.springframework.web.bind.annotation.RestController;  
12.  
13.import com.honghu.cloud.common.code.ResponseCode;  
14.import com.honghu.cloud.common.code.ResponseVO;  
15.import com.honghu.cloud.entity.User;  
16.import com.honghu.cloud.producer.ProducerService;  
17.  
18.import net.sf.json.JSONObject;  
19.  
20.@RestController  
21.@RequestMapping(value = "producer")  
22.public class ProducerController {  
23.      
24.    @Autowired  
25.    private ProducerService producerService;  
26.      
27.      
28.    /** 
29.     * 經過get方式發送</span>對象<span style="font-size: 16px;"> 
30.     * @param name 路徑參數 
31.     * @return 成功|失敗 
32.     */  
33.    @RequestMapping(value = "/sendObj", method = RequestMethod.GET)  
34.    public ResponseVO sendObj() {  
35.        User user = new User(1, "hello User");  
36.        <span style="color: rgb(255, 0, 0);">Message<User> msg = MessageBuilder.withPayload(user).build();</span>  
37.        boolean result = producerService.sendMessage().send(msg);  
38.        if(result){  
39.            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  
40.        }  
41.        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  
42.    }  
43.      
44.      
45.    /** 
46.     * 經過get方式發送字符串消息 
47.     * @param name 路徑參數 
48.     * @return 成功|失敗 
49.     */  
50.    @RequestMapping(value = "/send/{name}", method = RequestMethod.GET)  
51.    public ResponseVO send(@PathVariable(value = "name", required = true) String name) {  
52.        Message msg = MessageBuilder.withPayload(name.getBytes()).build();  
53.        boolean result = producerService.sendMessage().send(msg);  
54.        if(result){  
55.            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  
56.        }  
57.        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  
58.    }  
59.      
60.    /** 
61.     * 經過post方式發送</span>json對象<span style="font-size: 16px;"> 
62.     * @param name 路徑參數 
63.     * @return 成功|失敗 
64.     */  
65.    @RequestMapping(value = "/sendJsonObj", method = RequestMethod.POST)  
66.    public ResponseVO sendJsonObj(@RequestBody JSONObject jsonObj) {  
67.        Message<JSONObject> msg = MessageBuilder.withPayload(jsonObj).build();  
68.        boolean result = producerService.sendMessage().send(msg);  
69.        if(result){  
70.            return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_SUCCESS, false);  
71.        }  
72.        return ResponseCode.buildEnumResponseVO(ResponseCode.RESPONSE_CODE_FAILURE, false);  
73.    }  
74.}  
75.</span>

8. 建立commonservice-mq-consumer1消息的消費者項目,在pom裏面配置stream-rabbit的依賴

<!-- 引入MQ消息驅動的微服務包,引入stream只須要進行配置化便可,是對rabbit、kafka很好的封裝 -->  
2.<dependency>  
3.    <groupId>org.springframework.cloud</groupId>  
4.    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>  
5.</dependency>

 9. 在yml文件中配置:

server:  
2.  port: 5111  
3.spring:  
4.  application:  
5.    name: commonservice-mq-consumer1  
6.  profiles:   
7.    active: dev  
8.  cloud:  
9.    config:  
10.      discovery:   
11.        enabled: true  
12.        service-id: commonservice-config-server  
13.          
14.    <span style="color: rgb(255, 0, 0);">stream:  
15.      bindings:  
16.        mqScoreInput:  
17.          group: honghu_queue  
18.          destination: honghu_exchange  
19.          contentType: application/json  
20.            
21.  rabbitmq:  
22.     host: localhost  
23.     port: 5672  
24.     username: honghu  
25.     password: honghu</span>  
26.eureka:   
27.  client:  
28.    service-url:  
29.      defaultZone: http://honghu:123456@localhost:8761/eureka  
30.  instance:  
31.    prefer-ip-address: true

10. 定義接口ConsumerService

package com.honghu.cloud.consumer;  
2.  
3.import org.springframework.cloud.stream.annotation.Input;  
4.import org.springframework.messaging.SubscribableChannel;  
5.  
6.public interface ConsumerService {  
7.      
8.    <span style="color: rgb(255, 0, 0);">String SCORE_INPUT = "mqScoreInput";  
9.  
10.    @Input(ConsumerService.SCORE_INPUT)  
11.    SubscribableChannel sendMessage();</span>  
12.  
13.}

11. 定義啓動類和消息消費

package com.honghu.cloud;  
2.  
3.import org.springframework.boot.SpringApplication;  
4.import org.springframework.boot.autoconfigure.SpringBootApplication;  
5.import org.springframework.cloud.netflix.eureka.EnableEurekaClient;  
6.import org.springframework.cloud.stream.annotation.EnableBinding;  
7.import org.springframework.cloud.stream.annotation.StreamListener;  
8.  
9.import com.honghu.cloud.consumer.ConsumerService;  
10.import com.honghu.cloud.entity.User;  
11.  
12.@EnableEurekaClient  
13.@SpringBootApplication  
14.@EnableBinding(ConsumerService.class) //能夠綁定多個接口  
15.public class ConsumerApplication {  
16.      
17.    public static void main(String[] args) {  
18.        SpringApplication.run(ConsumerApplication.class, args);  
19.    }  
20.      
21.    <span style="color: rgb(255, 0, 0);">@StreamListener(ConsumerService.SCORE_INPUT)  
22.    public void onMessage(Object obj) {  
23.        System.out.println("消費者1,接收到的消息:" + obj);  
24.    }</span>  
25.  
26.}

12. 分別啓動commonservice-mq-producer、commonservice-mq-consumer1

13. 經過postman來驗證消息的發送和接收



 

 

 

 

 

能夠看到接收到了消息,下一章咱們介紹mq的集羣方案。

到此,整個消息中心方案集成完畢

歡迎你們和我一塊兒學習spring cloud構建微服務雲架構,我這邊會將近期研發的spring cloud微服務雲架構的搭建過程和精髓記錄下來,幫助更多有興趣研發spring cloud框架的朋友,你們來一塊兒探討spring cloud架構的搭建過程及如何運用於企業項目。完整的資料和源碼

相關文章
相關標籤/搜索