RabbitMQ是一個在AMQP基礎上完整的,可複用的企業消息系統。它能夠用於大型軟件系統各個模塊之間的高效通訊,支持高併發,支持可擴展。java
即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標準高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不一樣產品,不一樣的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。git
基於spring boot的特性鏈接rabbitmq,並做出以下樣例:spring
配置json
發佈方樣例併發
消費方樣例app
引入maven依賴框架
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
編寫config配置類(默認狀況下是不用作任何配置的,這裏有配置是由於,它默認是用的二進制作的消息傳輸,這裏的配置是改成json傳輸)異步
@Configuration public class RabbitMqConfig { @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactoryPlus( SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory, Jackson2JsonMessageConverter jackson2JsonMessageConverter) { rabbitListenerContainerFactory.setMessageConverter(jackson2JsonMessageConverter); return rabbitListenerContainerFactory; } @Bean public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper xssObjectMapper) { return new Jackson2JsonMessageConverter(xssObjectMapper); } }
編寫配置文件xss
spring.rabbitmq.host=192.168.134.100 spring.rabbitmq.port=5672 spring.rabbitmq.username=dev_udf-sample spring.rabbitmq.password=1qazxsw2 spring.rabbitmq.virtual-host=/dev_udf-sample spring.rabbitmq.template.retry.enabled=true #發送方是否重試 spring.rabbitmq.listener.retry.enabled=true #消費方是否重試
定義公共的消息類maven
public class RabbitmqMessage<T> implements Serializable { private static final long serialVersionUID = 1L; //消息ID private String id; ....其餘自定義 }
建立Exchange,這裏使用的是DirectExchange,exchange主要是定義路由規則的,還有其餘不一樣的路由規則實現,如:TopicExchange,他們都繼承至AbstractExchange
@Bean public DirectExchange testExchange() { return new DirectExchange("test_exchange"); }
使用AmqpTemplate發送異步消息(RoutingKey則是指定消息的路由鍵,不一樣的路由鍵可被不一樣的消費方消費)
@Autowired private AmqpTemplate amqpTemplate; //而後調用發送方法發送消息 this.amqpTemplate.convertAndSend("test_exchange", "testRoutingKey", new RabbitmqMessage<String>("test"));
建立消費隊列,死信隊列,以及與exchange的綁定關係
//消費隊列 @Bean public Queue testConsume() { //死信exchange與上面的定義方式同樣 Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange","test_exchange_dlx"); args.put("x-dead-letter-routing-key","testRoutingKey_dlx"); return new Queue("test_consume", true, false, false, args); } //死信消費隊列 @Bean public Queue testConsumeDlx() { return new Queue("test_consume_dlx"); } //消費隊列綁定 @Bean public Binding testConsumeBinding() { return new Binding("test_consume", DestinationType.QUEUE, "test_exchange","testRoutingKey", null); } //死信消費隊列綁定 @Bean public Binding testConsumeDlxBinding() { return new Binding("test_consume_dlx", DestinationType.QUEUE, "test_exchange_dlx","testRoutingKey_dlx", null); }
消費消息
@RabbitListener(queues = "test_consume") public void process(Message<String> message) { log.info(message); }
以上演示了rabbitmq在spring boot中的配置,以及發送方和消費方的樣例,在後續的章節中,會找機會介紹rabbitmq的搭建以及配置.
想得到最快更新,請關注公衆號