一、RabbitMQ與Spring Cloud Stream整合實戰。SpringCloud Stream總體結構核心概念圖,以下所示:java
圖示解釋:Outputs輸出,即消息的發送端。Inputs輸入,即消息的接收端。Application Core即核心的應用。Binder是協調者的角色。Middleware是消息中間件。
web
二、SpringCloud Stream總體結構核心概念圖,以下所示:spring
圖示解釋:SpringCloud Stream在RabbitMQ在生產者發送消息以前、消費者接收監聽以後都套了一層插件。插件能夠接收各類各樣的不一樣的消息,也能夠支持消息中間件的替換。
SpringCloud Stream插件的關鍵點,Barista接口,Barista接口是定義來做爲後面類的參數,這一接口定義來通道類型和通道名稱,通道名稱是做爲配置用,通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。
apache
三、使用Spring Cloud Stream很是簡單,只須要使用好這3個註解便可,在實現高性能消息的生產和消費的場景很是適合,可是使用SpringCloudStram框架有一個很是大的問題就是不能實現可靠性的投遞,也就是無法保證消息的100%可靠性,會存在少許消息丟失的問題。這個緣由是由於SpringCloudStream框架爲了和Kafka兼顧因此在實際工做中使用它的目的就是針對高性能的消息通訊的,這點就是在當前版本SpringCloudStream的定位。
springboot
@Output,輸出註解,用於定義發送消息接口。
@Input,輸入註解,用於定義消息的消費者接口。
@StreamListener,用於定義監聽方法的註解。app
四、建立maven項目rabbitmq-springcloudstream-producer。修改pom.xml配置文件,以下所示:
框架
1 <project xmlns="http://maven.apache.org/POM/4.0.0" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 4 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.bie</groupId> 7 <artifactId>rabbitmq-springcloudstream-producer</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 10 <parent> 11 <groupId>org.springframework.boot</groupId> 12 <artifactId>spring-boot-starter-parent</artifactId> 13 <version>1.5.8.RELEASE</version> 14 <relativePath /> <!-- lookup parent from repository --> 15 </parent> 16 17 <properties> 18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter-web</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter</artifactId> 31 </dependency> 32 <!-- springboot自動裝配的jar包 --> 33 <dependency> 34 <groupId>org.springframework.boot</groupId> 35 <artifactId>spring-boot-autoconfigure</artifactId> 36 </dependency> 37 <dependency> 38 <groupId>org.springframework.boot</groupId> 39 <artifactId>spring-boot-starter-test</artifactId> 40 <scope>test</scope> 41 </dependency> 42 <!-- 與spring cloud stream相關的jar包 --> 43 <dependency> 44 <groupId>org.springframework.cloud</groupId> 45 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 46 <version>1.3.4.RELEASE</version> 47 </dependency> 48 <!-- springboot監控相關的jar包 --> 49 <dependency> 50 <groupId>org.springframework.boot</groupId> 51 <artifactId>spring-boot-starter-actuator</artifactId> 52 </dependency> 53 </dependencies> 54 55 <build> 56 <plugins> 57 <plugin> 58 <groupId>org.springframework.boot</groupId> 59 <artifactId>spring-boot-maven-plugin</artifactId> 60 </plugin> 61 </plugins> 62 </build> 63 64 </project>
修改rabbitmq-springcloudstream-producer的application.properties配置文件。以下所示:maven
1 # 端口號 2 server.port=8001 3 # 請問訪問路徑 4 server.servlet.context-path=/producer 5 6 # 應用的名稱 7 spring.application.name=producer 8 # 將交換機和隊列綁定到了通道output_channel上面 9 # 交換機名稱 10 spring.cloud.stream.bindings.output_channel.destination=exchange-3 11 # 隊列名稱 12 spring.cloud.stream.bindings.output_channel.group=queue-3 13 # 對集羣環境進行配置 14 spring.cloud.stream.bindings.output_channel.binder=rabbit_cluster 15 16 # rabbit_cluster對應上面的spring.cloud.stream.bindings.output_channel.binder的值。名稱能夠自定義 17 spring.cloud.stream.binders.rabbit_cluster.type=rabbit 18 # 使用的環境是rabbit 19 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.110.133:5672 20 # 帳號 21 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest 22 # 密碼 23 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest 24 # 虛擬主機 25 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
建立啓動類,以下所示:spring-boot
1 package com.bie; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 /** 7 * 8 * @author biehl 9 * 10 */ 11 @SpringBootApplication 12 public class SpringCloudStreamRabbitMQProducerApplication { 13 14 public static void main(String[] args) { 15 SpringApplication.run(SpringCloudStreamRabbitMQProducerApplication.class, args); 16 } 17 }
建立Barista接口,用於建立輸出通道,將消息和輸出通道進行綁定,以下所示:性能
1 package com.bie.stream; 2 3 import org.springframework.cloud.stream.annotation.Output; 4 import org.springframework.messaging.MessageChannel; 5 6 /** 7 * 這裏的Barista接口是定義來做爲後面類的參數, 8 * 9 * 這一接口定義來通道類型和通道名稱。 10 * 11 * 通道名稱是做爲配置用, 12 * 13 * 通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。 14 */ 15 public interface Barista { 16 17 // String INPUT_CHANNEL = "input_channel"; // 輸入通道 18 String OUTPUT_CHANNEL = "output_channel"; // 輸出通道 19 20 // 註解@Input聲明瞭它是一個輸入類型的通道, 21 // 名字是Barista.INPUT_CHANNEL,也就是position3的input_channel。 22 // 這一名字與上述配置app2的配置文件中position1應該一致, 23 // 代表注入了一個名字叫作input_channel的通道,它的類型是input, 24 // 訂閱的主題是position2處聲明的mydest這個主題。 25 // @Input(Barista.INPUT_CHANNEL) 26 // SubscribableChannel loginput(); 27 28 // 註解@Output聲明瞭它是一個輸出類型的通道, 29 // 名字是output_channel。這一名字與app1中通道名一致, 30 // 代表注入了一個名字爲output_channel的通道,類型是output,發佈的主題名爲mydest。 31 @Output(Barista.OUTPUT_CHANNEL) // 輸出通道 32 MessageChannel logoutput(); 33 34 // String INPUT_BASE = "queue-1"; 35 // String OUTPUT_BASE = "queue-1"; 36 // @Input(Barista.INPUT_BASE) 37 // SubscribableChannel input1(); 38 // MessageChannel output1(); 39 40 }
建立生產者類,以下所示:
1 package com.bie.stream; 2 3 import java.util.Map; 4 5 import org.springframework.beans.factory.annotation.Autowired; 6 import org.springframework.cloud.stream.annotation.EnableBinding; 7 import org.springframework.messaging.Message; 8 import org.springframework.messaging.MessageHeaders; 9 import org.springframework.messaging.support.MessageBuilder; 10 import org.springframework.stereotype.Service; 11 12 /** 13 * 14 * @author biehl 15 * 16 */ 17 @EnableBinding(Barista.class) // 啓動該綁定 18 @Service // 注入到Spring容器中 19 public class RabbitmqProducer { 20 21 @Autowired // 將barista注入到spirng容器中 22 private Barista barista; 23 24 // 發送消息 25 public String sendMessage(Object message, Map<String, Object> properties) throws Exception { 26 try { 27 // 設置消息頭 28 MessageHeaders mhs = new MessageHeaders(properties); 29 // 建立消息,使用消息和消息頭建立消息。 30 Message msg = MessageBuilder.createMessage(message, mhs); 31 // 調用barista進行消息的發送。 32 boolean sendStatus = barista.logoutput().send(msg); 33 // 打印查看消息發送的狀況 34 System.out.println("========================sending========================"); 35 System.out.println("發送數據:" + message + ",sendStatus: " + sendStatus); 36 } catch (Exception e) { 37 System.out.println("========================error========================"); 38 e.printStackTrace(); 39 throw new RuntimeException(e.getMessage()); 40 } 41 return null; 42 } 43 44 }
五、建立消費者rabbitmq-springcloudstream-consumer。修改配置文件,以下所示:
1 <project xmlns="http://maven.apache.org/POM/4.0.0" 2 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 4 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 <groupId>com.bie</groupId> 7 <artifactId>rabbitmq-springcloudstream-consumer</artifactId> 8 <version>0.0.1-SNAPSHOT</version> 9 10 <parent> 11 <groupId>org.springframework.boot</groupId> 12 <artifactId>spring-boot-starter-parent</artifactId> 13 <version>1.5.8.RELEASE</version> 14 <relativePath /> <!-- lookup parent from repository --> 15 </parent> 16 17 <properties> 18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter-web</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.springframework.boot</groupId> 34 <artifactId>spring-boot-starter-test</artifactId> 35 <scope>test</scope> 36 </dependency> 37 <!-- 與spring cloud stream相關的jar包 --> 38 <dependency> 39 <groupId>org.springframework.cloud</groupId> 40 <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 41 <version>1.3.4.RELEASE</version> 42 </dependency> 43 <!-- springboot監控相關的jar包 --> 44 <dependency> 45 <groupId>org.springframework.boot</groupId> 46 <artifactId>spring-boot-starter-actuator</artifactId> 47 </dependency> 48 </dependencies> 49 50 <build> 51 <plugins> 52 <plugin> 53 <groupId>org.springframework.boot</groupId> 54 <artifactId>spring-boot-maven-plugin</artifactId> 55 </plugin> 56 </plugins> 57 </build> 58 59 </project>
修改rabbitmq-springcloudstream-consumer的application.properties配置文件。以下所示:
1 # 端口號 2 server.port=8002 3 # 訪問路徑 4 server.context-path=/consumer 5 6 # 應用的名稱 7 spring.application.name=consumer 8 # 交換機名稱 9 spring.cloud.stream.bindings.input_channel.destination=exchange-3 10 # 隊列名稱 11 spring.cloud.stream.bindings.input_channel.group=queue-3 12 # 對集羣環境進行配置 13 spring.cloud.stream.bindings.input_channel.binder=rabbit_cluster 14 # 設置默認監聽數 15 spring.cloud.stream.bindings.input_channel.consumer.concurrency=1 16 # 是否支持從新放回隊列 17 spring.cloud.stream.rabbit.bindings.input_channel.consumer.requeue-rejected=false 18 # 接收模式是手工接收 19 spring.cloud.stream.rabbit.bindings.input_channel.consumer.acknowledge-mode=MANUAL 20 # 服務斷開,3秒鐘重連 21 spring.cloud.stream.rabbit.bindings.input_channel.consumer.recovery-interval=3000 22 # 是否啓用持久化訂閱 23 spring.cloud.stream.rabbit.bindings.input_channel.consumer.durable-subscription=true 24 # 最大的監聽數 25 spring.cloud.stream.rabbit.bindings.input_channel.consumer.max-concurrency=5 26 27 # rabbit_cluster對應上面的spring.cloud.stream.bindings.output_channel.binder的值。名稱能夠自定義 28 # 使用的環境是rabbit 29 spring.cloud.stream.binders.rabbit_cluster.type=rabbit 30 # 訪問地址和端口號 31 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.addresses=192.168.110.133:5672 32 # 帳號 33 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.username=guest 34 # 密碼 35 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.password=guest 36 # 虛擬主機 37 spring.cloud.stream.binders.rabbit_cluster.environment.spring.rabbitmq.virtual-host=/
建立啓動類,以下所示:
1 package com.bie; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 /** 7 * 8 * @author biehl 9 * 10 */ 11 @SpringBootApplication 12 public class SpringCloudStreamRabbitMQConsumerApplication { 13 14 public static void main(String[] args) { 15 SpringApplication.run(SpringCloudStreamRabbitMQConsumerApplication.class, args); 16 } 17 18 }
建立Barista接口,用於建立輸入通道,將消息和輸入通道進行綁定,以下所示:
1 package com.bie.stream; 2 3 import org.springframework.cloud.stream.annotation.Input; 4 import org.springframework.messaging.SubscribableChannel; 5 6 /** 7 * 這裏的Barista接口是定義來做爲後面類的參數, 8 * 9 * 這一接口定義來通道類型和通道名稱。 10 * 11 * 通道名稱是做爲配置用, 12 * 13 * 通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。 14 * 15 * @author biehl 16 * 17 */ 18 19 public interface Barista { 20 21 // 輸入管道,用於接收消息 22 String INPUT_CHANNEL = "input_channel"; 23 24 // 註解@Input聲明瞭它是一個輸入類型的通道, 25 // 名字是Barista.INPUT_CHANNEL, 26 // 也就是position3的input_channel。 27 // 這一名字與上述配置app2的配置文件中position1應該一致, 28 // 代表注入了一個名字叫作input_channel的通道,它的類型是input, 29 // 訂閱的主題是position2處聲明的mydest這個主題。 30 @Input(Barista.INPUT_CHANNEL) 31 SubscribableChannel loginput(); 32 33 }
建立消費者,以下所示:
1 package com.bie.stream; 2 3 import org.springframework.amqp.support.AmqpHeaders; 4 import org.springframework.cloud.stream.annotation.EnableBinding; 5 import org.springframework.cloud.stream.annotation.StreamListener; 6 import org.springframework.messaging.Message; 7 import org.springframework.stereotype.Service; 8 9 import com.rabbitmq.client.Channel; 10 11 @EnableBinding(Barista.class) // 啓動該綁定 12 @Service // 注入到Spring容器中 13 public class RabbitmqConsumer { 14 15 /** 16 * @StreamListener接收RabbitMQ的消息。指定了輸入管道。 17 * 18 * 19 * 接收消息 20 * @param message 21 * @throws Exception 22 */ 23 @StreamListener(Barista.INPUT_CHANNEL) 24 public void consumer(Message message) throws Exception { 25 // 獲取到具體的channel 26 Channel channel = (com.rabbitmq.client.Channel) message.getHeaders().get(AmqpHeaders.CHANNEL); 27 // 獲取到deliveryTag 28 Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); 29 // 消費消息 30 System.out.println("Input Stream 1 接受數據:" + message); 31 System.out.println("消費完畢------------"); 32 // 手動ack確認機制,false表明了1條一條進行消息簽收 33 channel.basicAck(deliveryTag, false); 34 } 35 36 }
啓動消費者進行消息的監聽,在生產者項目編寫測試類進行代碼測試,發送消息,看生產者是否能夠接收到消息並進行消費處理。
1 package com.bie; 2 3 import java.util.Date; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.http.client.utils.DateUtils; 8 import org.junit.Test; 9 import org.junit.runner.RunWith; 10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.boot.test.context.SpringBootTest; 12 import org.springframework.test.context.junit4.SpringRunner; 13 14 import com.bie.stream.RabbitmqProducer; 15 16 @RunWith(SpringRunner.class) 17 @SpringBootTest 18 public class ApplicationTests { 19 20 @Autowired 21 private RabbitmqProducer rabbitmqProducer; 22 23 @Test 24 public void sendMessageTest1() throws InterruptedException { 25 for (int i = 0; i < 1; i++) { 26 try { 27 Map<String, Object> properties = new HashMap<String, Object>(); 28 properties.put("serial_number", "12345"); 29 properties.put("bank_number", "abc"); 30 properties.put("plat_send_time", DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss.SSS")); 31 rabbitmqProducer.sendMessage("Hello, I am amqp sender num :" + i, properties); 32 } catch (Exception e) { 33 System.out.println("================================error================================"); 34 e.printStackTrace(); 35 } 36 } 37 Thread.sleep(50000); 38 } 39 40 }
運行效果以下所示:
做者:別先生
博客園:https://www.cnblogs.com/biehongli/
若是您想及時獲得我的撰寫文章以及著做的消息推送,能夠掃描上方二維碼,關注我的公衆號哦。