RabbitMQ與Spring的框架整合之Spring Cloud Stream實戰

一、RabbitMQ與Spring Cloud Stream整合實戰。SpringCloud Stream總體結構核心概念圖,以下所示:java

  圖示解釋:Outputs輸出,即消息的發送端。Inputs輸入,即消息的接收端。Application Core即核心的應用。Binder是協調者的角色。Middleware是消息中間件。
web

二、SpringCloud Stream總體結構核心概念圖,以下所示:spring

  圖示解釋:SpringCloud Stream在RabbitMQ在生產者發送消息以前、消費者接收監聽以後都套了一層插件。插件能夠接收各類各樣的不一樣的消息,也能夠支持消息中間件的替換。
  SpringCloud Stream插件的關鍵點,Barista接口,Barista接口是定義來做爲後面類的參數,這一接口定義來通道類型和通道名稱,通道名稱是做爲配置用,通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。
apache

三、使用Spring Cloud Stream很是簡單,只須要使用好這3個註解便可,在實現高性能消息的生產和消費的場景很是適合,可是使用SpringCloudStram框架有一個很是大的問題就是不能實現可靠性的投遞,也就是無法保證消息的100%可靠性,會存在少許消息丟失的問題。這個緣由是由於SpringCloudStream框架爲了和Kafka兼顧因此在實際工做中使用它的目的就是針對高性能的消息通訊的,這點就是在當前版本SpringCloudStream的定位。
springboot

@Output,輸出註解,用於定義發送消息接口。
@Input,輸入註解,用於定義消息的消費者接口。
@StreamListener,用於定義監聽方法的註解。
app

四、建立maven項目rabbitmq-springcloudstream-producer。修改pom.xml配置文件,以下所示:
框架

 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 4     http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.bie</groupId>
 7     <artifactId>rabbitmq-springcloudstream-producer</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9 
10     <parent>
11         <groupId>org.springframework.boot</groupId>
12         <artifactId>spring-boot-starter-parent</artifactId>
13         <version>1.5.8.RELEASE</version>
14         <relativePath /> <!-- lookup parent from repository -->
15     </parent>
16 
17     <properties>
18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter-web</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter</artifactId>
31         </dependency>
32         <!-- springboot自動裝配的jar包 -->
33         <dependency>
34             <groupId>org.springframework.boot</groupId>
35             <artifactId>spring-boot-autoconfigure</artifactId>
36         </dependency>
37         <dependency>
38             <groupId>org.springframework.boot</groupId>
39             <artifactId>spring-boot-starter-test</artifactId>
40             <scope>test</scope>
41         </dependency>
42         <!-- 與spring cloud stream相關的jar包 -->
43         <dependency>
44             <groupId>org.springframework.cloud</groupId>
45             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
46             <version>1.3.4.RELEASE</version>
47         </dependency>
48         <!-- springboot監控相關的jar包 -->
49         <dependency>
50             <groupId>org.springframework.boot</groupId>
51             <artifactId>spring-boot-starter-actuator</artifactId>
52         </dependency>
53     </dependencies>
54 
55     <build>
56         <plugins>
57             <plugin>
58                 <groupId>org.springframework.boot</groupId>
59                 <artifactId>spring-boot-maven-plugin</artifactId>
60             </plugin>
61         </plugins>
62     </build>
63 
64 </project>

修改rabbitmq-springcloudstream-producer的application.properties配置文件。以下所示:maven

 1 # 端口號
 2 server.port=8001
 3 # 請問訪問路徑
 4 server.servlet.context-path=/producer
 5 
 6 # 應用的名稱
 7 spring.application.name=producer
 8 # 將交換機和隊列綁定到了通道output_channel上面
 9 # 交換機名稱
10 spring.cloud.stream.bindings.output_channel.destination=exchange-3
11 # 隊列名稱
12 spring.cloud.stream.bindings.output_channel.group=queue-3
13 # 對集羣環境進行配置
14 spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster
15 
16 # rabbit_cluster對應上面的spring.cloud.stream.bindings.output_channel.binder的值。名稱能夠自定義
17 spring.cloud.stream.binders.rabbit_cluster.type=rabbit
18 # 使用的環境是rabbit
19 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.110.133:5672
20 # 帳號
21 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
22 # 密碼
23 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
24 # 虛擬主機
25 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/

建立啓動類,以下所示:spring-boot

 1 package com.bie;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 /**
 7  * 
 8  * @author biehl
 9  *
10  */
11 @SpringBootApplication
12 public class SpringCloudStreamRabbitMQProducerApplication {
13 
14     public static void main(String[] args) {
15         SpringApplication.run(SpringCloudStreamRabbitMQProducerApplication.class, args);
16     }
17 }

建立Barista接口,用於建立輸出通道,將消息和輸出通道進行綁定,以下所示:性能

 1 package com.bie.stream;
 2 
 3 import org.springframework.cloud.stream.annotation.Output;
 4 import org.springframework.messaging.MessageChannel;
 5 
 6 /**
 7  * 這裏的Barista接口是定義來做爲後面類的參數,
 8  * 
 9  * 這一接口定義來通道類型和通道名稱。
10  * 
11  * 通道名稱是做爲配置用,
12  * 
13  * 通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。
14  */
15 public interface Barista {
16 
17     // String INPUT_CHANNEL = "input_channel"; // 輸入通道
18     String OUTPUT_CHANNEL = "output_channel"; // 輸出通道
19 
20     // 註解@Input聲明瞭它是一個輸入類型的通道,
21     // 名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。
22     // 這一名字與上述配置app2的配置文件中position1應該一致,
23     // 代表注入了一個名字叫作input_channel的通道,它的類型是input,
24     // 訂閱的主題是position2處聲明的mydest這個主題。
25     // @Input(Barista.INPUT_CHANNEL)
26     // SubscribableChannel loginput();
27 
28     // 註解@Output聲明瞭它是一個輸出類型的通道,
29     // 名字是output_channel。這一名字與app1中通道名一致,
30     // 代表注入了一個名字爲output_channel的通道,類型是output,發佈的主題名爲mydest。
31     @Output(Barista.OUTPUT_CHANNEL) // 輸出通道
32     MessageChannel logoutput();
33 
34     // String INPUT_BASE = "queue-1";
35     // String OUTPUT_BASE = "queue-1";
36     // @Input(Barista.INPUT_BASE)
37     // SubscribableChannel input1();
38     // MessageChannel output1();
39 
40 }

建立生產者類,以下所示:

 1 package com.bie.stream;
 2 
 3 import java.util.Map;
 4 
 5 import org.springframework.beans.factory.annotation.Autowired;
 6 import org.springframework.cloud.stream.annotation.EnableBinding;
 7 import org.springframework.messaging.Message;
 8 import org.springframework.messaging.MessageHeaders;
 9 import org.springframework.messaging.support.MessageBuilder;
10 import org.springframework.stereotype.Service;
11 
12 /**
13  * 
14  * @author biehl
15  *
16  */
17 @EnableBinding(Barista.class) // 啓動該綁定
18 @Service // 注入到Spring容器中
19 public class RabbitmqProducer {
20 
21     @Autowired // 將barista注入到spirng容器中
22     private Barista barista;
23 
24     // 發送消息
25     public String sendMessage(Object message, Map<String, Object> properties) throws Exception {
26         try {
27             // 設置消息頭
28             MessageHeaders mhs = new MessageHeaders(properties);
29             // 建立消息,使用消息和消息頭建立消息。
30             Message msg = MessageBuilder.createMessage(message, mhs);
31             // 調用barista進行消息的發送。
32             boolean sendStatus = barista.logoutput().send(msg);
33             // 打印查看消息發送的狀況
34             System.out.println("========================sending========================");
35             System.out.println("發送數據:" + message + ",sendStatus: " + sendStatus);
36         } catch (Exception e) {
37             System.out.println("========================error========================");
38             e.printStackTrace();
39             throw new RuntimeException(e.getMessage());
40         }
41         return null;
42     }
43 
44 }

五、建立消費者rabbitmq-springcloudstream-consumer。修改配置文件,以下所示:

 1 <project xmlns="http://maven.apache.org/POM/4.0.0"
 2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
 4     http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6     <groupId>com.bie</groupId>
 7     <artifactId>rabbitmq-springcloudstream-consumer</artifactId>
 8     <version>0.0.1-SNAPSHOT</version>
 9 
10     <parent>
11         <groupId>org.springframework.boot</groupId>
12         <artifactId>spring-boot-starter-parent</artifactId>
13         <version>1.5.8.RELEASE</version>
14         <relativePath /> <!-- lookup parent from repository -->
15     </parent>
16 
17     <properties>
18         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
19         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
20         <java.version>1.8</java.version>
21     </properties>
22 
23     <dependencies>
24         <dependency>
25             <groupId>org.springframework.boot</groupId>
26             <artifactId>spring-boot-starter-web</artifactId>
27         </dependency>
28         <dependency>
29             <groupId>org.springframework.boot</groupId>
30             <artifactId>spring-boot-starter</artifactId>
31         </dependency>
32         <dependency>
33             <groupId>org.springframework.boot</groupId>
34             <artifactId>spring-boot-starter-test</artifactId>
35             <scope>test</scope>
36         </dependency>
37         <!-- 與spring cloud stream相關的jar包 -->
38         <dependency>
39             <groupId>org.springframework.cloud</groupId>
40             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
41             <version>1.3.4.RELEASE</version>
42         </dependency>
43         <!-- springboot監控相關的jar包 -->
44         <dependency>
45             <groupId>org.springframework.boot</groupId>
46             <artifactId>spring-boot-starter-actuator</artifactId>
47         </dependency>
48     </dependencies>
49 
50     <build>
51         <plugins>
52             <plugin>
53                 <groupId>org.springframework.boot</groupId>
54                 <artifactId>spring-boot-maven-plugin</artifactId>
55             </plugin>
56         </plugins>
57     </build>
58 
59 </project>

修改rabbitmq-springcloudstream-consumer的application.properties配置文件。以下所示:

 1 # 端口號
 2 server.port=8002
 3 # 訪問路徑
 4 server.context-path=/consumer
 5 
 6 # 應用的名稱
 7 spring.application.name=consumer
 8 # 交換機名稱
 9 spring.cloud.stream.bindings.input_channel.destination=exchange-3
10 # 隊列名稱
11 spring.cloud.stream.bindings.input_channel.group=queue-3
12 # 對集羣環境進行配置
13 spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster
14 # 設置默認監聽數
15 spring.cloud.stream.bindings.input_channel.consumer.concurrency=1
16 # 是否支持從新放回隊列
17 spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false
18 # 接收模式是手工接收
19 spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL
20 # 服務斷開,3秒鐘重連
21 spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000
22 # 是否啓用持久化訂閱
23 spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true
24 # 最大的監聽數
25 spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5
26 
27 # rabbit_cluster對應上面的spring.cloud.stream.bindings.output_channel.binder的值。名稱能夠自定義
28 # 使用的環境是rabbit
29 spring.cloud.stream.binders.rabbit_cluster.type=rabbit
30 # 訪問地址和端口號
31 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.110.133:5672
32 # 帳號
33 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest
34 # 密碼
35 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest
36 # 虛擬主機
37 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/

建立啓動類,以下所示:

 1 package com.bie;
 2 
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5 
 6 /**
 7  * 
 8  * @author biehl
 9  *
10  */
11 @SpringBootApplication
12 public class SpringCloudStreamRabbitMQConsumerApplication {
13 
14     public static void main(String[] args) {
15         SpringApplication.run(SpringCloudStreamRabbitMQConsumerApplication.class, args);
16     }
17     
18 }

建立Barista接口,用於建立輸入通道,將消息和輸入通道進行綁定,以下所示:

 1 package com.bie.stream;
 2 
 3 import org.springframework.cloud.stream.annotation.Input;
 4 import org.springframework.messaging.SubscribableChannel;
 5 
 6 /**
 7  * 這裏的Barista接口是定義來做爲後面類的參數,
 8  * 
 9  * 這一接口定義來通道類型和通道名稱。
10  * 
11  * 通道名稱是做爲配置用,
12  * 
13  * 通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。
14  * 
15  * @author biehl
16  *
17  */
18 
19 public interface Barista {
20 
21     // 輸入管道,用於接收消息
22     String INPUT_CHANNEL = "input_channel";
23 
24     // 註解@Input聲明瞭它是一個輸入類型的通道,
25     // 名字是Barista.INPUT_CHANNEL,
26     // 也就是position3的input_channel。
27     // 這一名字與上述配置app2的配置文件中position1應該一致,
28     // 代表注入了一個名字叫作input_channel的通道,它的類型是input,
29     // 訂閱的主題是position2處聲明的mydest這個主題。
30     @Input(Barista.INPUT_CHANNEL)
31     SubscribableChannel loginput();
32 
33 }

建立消費者,以下所示:

 1 package com.bie.stream;
 2 
 3 import org.springframework.amqp.support.AmqpHeaders;
 4 import org.springframework.cloud.stream.annotation.EnableBinding;
 5 import org.springframework.cloud.stream.annotation.StreamListener;
 6 import org.springframework.messaging.Message;
 7 import org.springframework.stereotype.Service;
 8 
 9 import com.rabbitmq.client.Channel;
10 
11 @EnableBinding(Barista.class) // 啓動該綁定
12 @Service // 注入到Spring容器中
13 public class RabbitmqConsumer {
14 
15     /**
16      * @StreamListener接收RabbitMQ的消息。指定了輸入管道。
17      * 
18      * 
19      *                                         接收消息
20      * @param message
21      * @throws Exception
22      */
23     @StreamListener(Barista.INPUT_CHANNEL)
24     public void consumer(Message message) throws Exception {
25         // 獲取到具體的channel
26         Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
27         // 獲取到deliveryTag
28         Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
29         // 消費消息
30         System.out.println("Input Stream 1 接受數據:" + message);
31         System.out.println("消費完畢------------");
32         // 手動ack確認機制,false表明了1條一條進行消息簽收
33         channel.basicAck(deliveryTag, false);
34     }
35 
36 }

啓動消費者進行消息的監聽,在生產者項目編寫測試類進行代碼測試,發送消息,看生產者是否能夠接收到消息並進行消費處理。

 1 package com.bie;
 2 
 3 import java.util.Date;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6 
 7 import org.apache.http.client.utils.DateUtils;
 8 import org.junit.Test;
 9 import org.junit.runner.RunWith;
10 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.boot.test.context.SpringBootTest;
12 import org.springframework.test.context.junit4.SpringRunner;
13 
14 import com.bie.stream.RabbitmqProducer;
15 
16 @RunWith(SpringRunner.class)
17 @SpringBootTest
18 public class ApplicationTests {
19 
20     @Autowired
21     private RabbitmqProducer rabbitmqProducer;
22 
23     @Test
24     public void sendMessageTest1() throws InterruptedException {
25         for (int i = 0; i < 1; i++) {
26             try {
27                 Map<String, Object> properties = new HashMap<String, Object>();
28                 properties.put("serial_number", "12345");
29                 properties.put("bank_number", "abc");
30                 properties.put("plat_send_time", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
31                 rabbitmqProducer.sendMessage("Hello, I am amqp sender num :" + i, properties);
32             } catch (Exception e) {
33                 System.out.println("================================error================================");
34                 e.printStackTrace();
35             }
36         }
37         Thread.sleep(50000);
38     }
39 
40 }

運行效果以下所示:

 

做者:別先生

博客園:https://www.cnblogs.com/biehongli/

若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。

相關文章
相關標籤/搜索