1.依賴 SpringBoot 2.1.6.RELEASE 版本java
<!--rabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.配置信息spring
#設置端口 server.port=80 #安裝的RabbitMq的服務器IP spring.rabbitmq.host=192.168.***.** #安裝的RabbitMq的服務器端口 spring.rabbitmq.port=5672 #安裝的RabbitMq的用戶名 spring.rabbitmq.username=xxx #安裝的RabbitMq的密碼 spring.rabbitmq.password=xxx #消息確認機制 spring.rabbitmq.publisher-confirms=true #與消息確認機制聯合使用,保證可以收到回調 spring.rabbitmq.publisher-returns=true #消息確認模式 MANUAL:手動確認 NONE:不確認 AUTO:自動確認 spring.rabbitmq.listener.simple.acknowledge-mode=auto #消費者 spring.rabbitmq.listener.simple.concurrency=10 spring.rabbitmq.listener.simple.max-concurrency=10 #發佈後重試 spring.rabbitmq.listener.simple.retry.enabled=true spring.rabbitmq.listener.simple.retry.initial-interval=5000 spring.rabbitmq.listener.simple.retry.max-attempts=5 #每隔多久進行重試 spring.rabbitmq.template.retry.multiplier=1.0 #消費失敗後從新消費 spring.rabbitmq.listener.simple.default-requeue-rejected=false #自定義的vhost spring.rabbitmq.dev-virtual-host=devVir spring.rabbitmq.test-virtual-host=testVir
3.配置信息:此處爲多個Vhost配置,單個可直接使用,無需另外配置,只需聲明隊列信息便可服務器
package com.rabbit.config; import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; /** * 2019年7月7日15:43:38 Joelan整合 RabbitConfig 概念介紹: * 1.Queue:隊列,是RabbitMq的內部對象,用於存儲消息,RabbitMq的多個消費者能夠訂閱同一個隊列,此時隊列會以輪詢的方式給多個消費者消費,而非多個消費者都收到全部的消息進行消費 * 注意:RabbitMQ不支持隊列層面的廣播消費,若是須要廣播消費,能夠採用一個交換器經過路由Key綁定多個隊列,由多個消費者來訂閱這些隊列的方式。 * 2.Exchange:交換器,在RabbitMq中,生產者並不是直接將消息投遞到隊列中。真實狀況是,生產者將消息發送到Exchange(交換器),由交換器將消息路由到一個或多個隊列中。 * 注意:若是路由不到,或返回給生產者,或直接丟棄,或作其它處理。 * 3.RoutingKey:路由Key,生產者將消息發送給交換器的時候,通常會指定一個RoutingKey,用來指定這個消息的路由規則。這個路由Key須要與交換器類型和綁定鍵(BindingKey)聯合使用才能 * 最終生效。在交換器類型和綁定鍵固定的狀況下,生產者能夠在發送消息給交換器時經過指定RoutingKey來決定消息流向哪裏。 * 4.Binding:RabbitMQ經過綁定將交換器和隊列關聯起來,在綁定的時候通常會指定一個綁定鍵,這樣RabbitMQ就能夠指定如何正確的路由到隊列了。 */ @Configuration public class RabbitConfig { /** * RabbitMq的主機地址 */ @Value("${spring.rabbitmq.host}") private String host; /** * RabbitMq的端口 */ @Value("${spring.rabbitmq.port}") private Integer port; /** * 用戶帳號 */ @Value("${spring.rabbitmq.username}") private String username; /** * 用戶密碼 */ @Value("${spring.rabbitmq.password}") private String password; /** * 消息確認,回調機制 */ @Value("${spring.rabbitmq.publisher-confirms}") private boolean confirms; @Value("${spring.rabbitmq.publisher-returns}") private boolean returns; /** * vhost:dev */ @Value("${spring.rabbitmq.dev-virtual-host}") private String hrmDevVirtualHost; /** * vhost:test */ @Value("${spring.rabbitmq.test-virtual-host}") private String hrmTestVirtualHost; /** * 若一個項目只使用一個virtualHost的話,默認只須要在配置文件中配置其屬性便可 * 若項目中使用到多個virtualHost,那麼能夠以經過建立ConnectionFactory的方式指定不一樣的virtualHost */ public ConnectionFactory createConnectionFactory(String host, Integer port, String username, String password, String vHost) { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setHost(host); connectionFactory.setPort(port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setSimplePublisherConfirms(confirms); connectionFactory.setPublisherReturns(returns); connectionFactory.setVirtualHost(vHost); return connectionFactory; } // ----------------------------------------------------------------------------------------第一步,建立消息鏈接,第一個VirtualHost /** * 建立指定vhost:test的鏈接工廠 */ @Primary @Bean(name = "devConnectionFactory") public ConnectionFactory devConnectionFactory() { return createConnectionFactory(host, port, username, password, hrmDevVirtualHost); } /** * 如有多個vhost則自定義RabbitMqTemplate 經過名稱指定對應的vhost */ @Primary @Bean(name = "devRabbitTemplate") public RabbitTemplate devRabbitTemplate( @Qualifier(value = "devConnectionFactory") ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 消息確認機制,ConnectionFactory中必須設置回調機制(publisher-confirms,publisher-returns) rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack) { System.out.println("消息id爲: " + correlationData + "的消息,已經被ack成功"); } else { System.out.println("消息id爲: " + correlationData + "的消息,消息nack,失敗緣由是:" + cause); } } }); return rabbitTemplate; } // ----------------------------------------------------------------------------------------第二個VirtualHost,以此類推 /** * 建立指定vhost:test的鏈接工廠 */ @Bean(name = "testConnectionFactory") public ConnectionFactory testConnectionFactory() { return createConnectionFactory(host, port, username, password, hrmTestVirtualHost); } /** * 如有多個vhost則自定義RabbitMqTemplate 經過名稱指定對應的vhost,此處未使用回調 */ @Bean(name = "testRabbitTemplate") public RabbitTemplate testRabbitTemplate( @Qualifier(value = "testConnectionFactory") ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); } // ----------------------------------------------------------------------------------------引入:死信隊列 /** * 所謂死信:即(1)消息被拒絕(basic.reject 或者 basic.nack),而且requeue=false;(2)消息的過時時間到期了; * (3)隊列長度限制超過了 等三個因素形成。 * 咱們會將以上緣由形成的隊列存入死信隊列,死信隊列其實也是一個普通的隊列,咱們能夠根據自身須要,能夠對死信進行操做。 * 如下爲死信隊列的演示(將正常隊列監聽關閉並設置超時):首先聲明一個正常的隊列,並設置死信隊列的相關聲明【死信交換器(與正常隊列一致便可),死信路由Key等】 * 設置完後,準備一個新的隊列,此隊列用於接收上一個正常隊列發生死信後,將由此隊列代替(即候補隊列),而後將新隊列經過上一個交換器以及正常隊列中聲明的死信路由Key進行綁定 * 該操做與正常聲明一致(聲明交換器(可以使用正常隊列的交換器,無需另外聲明),隊列,將隊列綁定到交換器) */ /** * 聲明交換器(此處正常的與死信的交換器一致) */ @Bean public Exchange testExchange() { return new DirectExchange("test_exchange", true, false); } /** * 聲明一個正常的隊列,並設置死信相關信息(交換器,路由Key),確保發生死信後會將死信存入交換器 */ @Bean public Queue testQueue() { Map<String, Object> args = new HashMap<>(4); // x-dead-letter-exchange 聲明 死信交換機 args.put("x-dead-letter-exchange", "test_exchange"); // x-dead-letter-routing-key 聲明死信路由鍵 args.put("x-dead-letter-routing-key", "test_dead_rout"); return new Queue("test_queue", true, false, false, args); } /** * 將隊列綁定到指定交換器並設置路由 */ @Bean public Binding testBinding() { return BindingBuilder.bind(testQueue()).to(testExchange()).with("test_rout").noargs(); } /** * 死信隊列(候補隊列) 若上面的正常隊列發生死信時,需將發生死信的隊列信息路由到此隊列中 * 路由過程:正常隊列發送->信息到交換器->交換器路由到正常隊列->監聽,發生死信->死信回到指定的交換器->再由交換器路由到死信隊列->死信監聽 */ @Bean public Queue testDeadQueue() { return new Queue("test_dead_queue", true, false, false); } /** * 綁定死信的隊列到候補隊列 */ @Bean public Binding testDeadBinding() { return BindingBuilder.bind(testDeadQueue()).to(testExchange()).with("test_dead_rout").noargs(); } // ----------------------------------------------------------------------------------------第二步,聲明隊列信息,Fanout模式 /** * 此處使用第一個正常隊列來示例完整隊列過程 建立隊列 參數name:隊列的名稱,不能爲空;設置爲「」以使代理生成該名稱。 * 參數durable:true表示爲持久隊列,該隊列將在服務器從新啓動後繼續存在 * 參數exclusive:若是聲明獨佔隊列,則爲true,該隊列將僅由聲明者的鏈接使用 * 參數autoDelete:若是服務器再也不使用隊列時應將其刪除,則自動刪除爲true 參數arguments:用於聲明隊列的參數 */ @Bean public Queue testFanoutQueue() { /* * 1.new Queue(name); return new Queue("test_fanout_queue"); */ /* * 2.new Queue(name,durable); */ return new Queue("test_fanout_queue", true, false, false); /* * 3.new Queue(name,durable,exclusive,autoDelete); return new * Queue("test_fanout_queue", true, false, false); */ /* * 4.new Queue(name,durable,exclusive,autoDelete,arguments); return new * Queue("test_fanout_queue", true, true, true, null); */ } /** * 建立交換機 1.fanout:扇形交換器,它會把發送到該交換器的消息路由到全部與該交換器綁定的隊列中,若是使用扇形交換器,則不會匹配路由Key * 白話:一個交換機能夠綁定N個隊列,此模式會將寫入的隊列發送到一個交換機,由此交換機發送到N個隊列中,那麼監聽該隊列的消費者都能收到對應的消息 */ @Bean @Primary public Exchange testFanoutExchange() { return new FanoutExchange("test_fanout_exchange"); } /** * 綁定隊列到交換機 Fanout模式不須要RoutingKey */ @Bean public Binding testFanoutBinding() { return BindingBuilder.bind(testFanoutQueue()).to(testFanoutExchange()).with("").noargs(); } // ----------------------------------------------------------------------------------------Direct模式 /** * 建立隊列 */ @Bean public Queue testDirectQueue() { return new Queue("test_direct_queue", true, false, false); } /** * 建立交換機 2.direct交換器 直連模式,會把消息路由到RoutingKey與BindingKey徹底匹配的隊列中。 * 白話:直連模式在綁定隊列到交換機的時候,RoutingKey與發送隊列的RoutingKey要徹底保持一致 */ @Bean public Exchange testDirectExchange() { return new TopicExchange("test_direct_exchange"); } /** * 綁定隊列到交換機並指定一個路由,此處的RoutingKey爲test,發送隊列時也必須使用test */ @Bean public Binding testDirectBinding() { return BindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with("test").noargs(); } // ----------------------------------------------------------------------------------------Topic模式 /** * 建立隊列 */ @Bean public Queue testTopicQueue() { return new Queue("test_topic_queue", true, false, false); } /** * 建立交換機 2.topic 匹配模式(我的)與直連模式區別:RoutingKey能夠模糊匹配,兩種匹配風格: *匹配 #匹配 * 咱們的RoutingKey和BindKey爲一個點分隔的字符串,例:test.routing.client * 那麼咱們的模糊匹配,*能夠匹配一個單詞,即:*.routing.* 能夠匹配到 test.routing.client, * #能夠匹配多個單詞,即:#.client 能夠匹配到 test.routing.client,以此類推 */ @Bean public Exchange testTopicExchange() { return new TopicExchange("test_topic_exchange"); } /** * 綁定隊列到交換機並指定一個路由 */ @Bean public Binding testTopicBinding() { return BindingBuilder.bind(testTopicQueue()).to(testTopicExchange()).with("test.*").noargs(); } // ----------------------結束強調:第一步建立鏈接,第二步聲明隊列,交換器,路由Key信息,第三步發送隊列,第四步監聽隊列 }
4.發送隊列dom
package com.rabbit.send; import java.util.UUID; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; /** * RabbitSend */ @Component public class RabbitSend { @Autowired @Qualifier(value = "devRabbitTemplate") private RabbitTemplate rabbitTemplate; /** * 發送死信隊列 */ public void sendDeadMsg(String msg) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 聲明消息處理器 這個對消息進行處理 能夠設置一些參數 對消息進行一些定製化處理 咱們這裏 來設置消息的編碼 以及消息的過時時間 // 由於在.net 以及其餘版本過時時間不一致 這裏的時間毫秒值 爲字符串 MessagePostProcessor messagePostProcessor = message -> { MessageProperties messageProperties = message.getMessageProperties(); // 設置編碼 messageProperties.setContentEncoding("utf-8"); // 設置過時時間10*1000毫秒 messageProperties.setExpiration("10000"); return message; }; // 向test_queue 發送消息 10*1000毫秒後過時 造成死信,具體的時間能夠根據本身的業務指定 rabbitTemplate.convertAndSend("test_exchange", "test_rout", msg, messagePostProcessor, correlationData); } /** * 發送一條Fanout扇形隊列 */ public void sendTestFanoutMsg(String msg) { rabbitTemplate.convertAndSend("test_fanout_exchange", "", msg, new CorrelationData("2")); } /** * 發送一條Direct直連隊列 如有開啓回調機制,必須傳此參數new CorrelationData("1"),用於聲明ID */ public void sendTestDirectMsg(String msg) { rabbitTemplate.convertAndSend("test_direct_exchange", "test", msg, new CorrelationData("1")); } /** * 發送一條Topic消息隊列 */ public void sendTestMsg(String msg) { rabbitTemplate.convertAndSend("test_topic_exchange", "test.mq", msg); } }
5.監聽隊列ide
package com.rabbit.receiver; import java.io.IOException; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * RabbitReceiver */ @Component public class RabbitReceiver { /** * 若死信隊列監聽到信息,表示咱們的死信隊列設置是沒有問題的 */ @RabbitHandler @RabbitListener(queues = "test_dead_queue") public void redirect(Message message, Channel channel) throws IOException { System.out.println("監聽到死信隊列有消息進來"); } /** * 爲了測試死信隊列效果,此處註銷監聽 */ //@RabbitHandler //@RabbitListener(queues = "test_queue") //public void handlerTestQueue(Message message, Channel channel) throws IOException { // System.out.println("監聽到正常隊列有消息進來"); //} @RabbitHandler @RabbitListener(queues = "test_fanout_queue") public void handlerFanout(String msg) { System.out.println("RabbitReceiver:" + msg + "test_fanout_queue"); } @RabbitHandler @RabbitListener(queues = "test_direct_queue") public void handlerDirect(String msg) { System.out.println("RabbitReceiver:" + msg + "test_direct_queue"); } @RabbitHandler @RabbitListener(queues = "test_topic_queue") public void handlerTopic(String msg) { System.out.println("RabbitReceiver:" + msg + "test_topic_queue"); } }