具體代碼可參考個人github:https://github.com/UniqueDong/springboot-studyhtml
RabbitMQ是一個消息中間件,因此最主要的做用就是:信息緩衝區,實現應用程序的異步和解耦。java
RabbitMQ是實現AMQP(高級消息隊列協議)的消息中間件的一種,最初起源於金融系統,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。RabbitMQ主要是爲了實現系統之間的雙向解耦而實現的。當生產者大量產生數據時,消費者沒法快速消費,那麼須要一箇中間層。保存這個數據。git
AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。AMQP的主要特徵是面向消息、隊列、路由(包括點對點和發佈/訂閱)、可靠性、安全。詳細概念能夠參考官方指南 RabbitMQgithub
一般咱們談到隊列服務, 會有三個概念: 發消息者、隊列、收消息者,RabbitMQ 在這個基本概念之上, 多作了一層抽象, 在發消息者和 隊列之間, 加入了交換器 (Exchange). 這樣發消息者和隊列就沒有直接聯繫, 轉而變成發消息者把消息給交換器, 交換器根據調度策略再把消息再給隊列。web
那麼,其中比較重要的概念有 4 個,分別爲:虛擬主機,交換機,隊列,和綁定。spring
交換機的功能主要是接收消息而且轉發到綁定的隊列,交換機不存儲消息,在啓用ack模式後,交換機找不到隊列會返回錯誤。交換機有四種類型:Direct, topic, Headers and Fanout安全
Direct Exchange是RabbitMQ默認的交換機模式,也是最簡單的模式,根據key全文匹配去尋找隊列。
springboot
第一個 X - Q1 就有一個 binding key,名字爲 orange; X - Q2 就有 2 個 binding key,名字爲 black 和 green。當消息中的 路由鍵 和 這個 binding key 對應上的時候,那麼就知道了該消息去到哪個隊列中。服務器
Ps:爲何 X 到 Q2 要有 black,green,2個 binding key呢,一個不就好了嗎? - 這個主要是由於可能又有 Q3,而Q3只接受 black 的信息,而Q2不只接受black 的信息,還接受 green 的信息。微信
根據通配符轉發消息到隊列,在這種交換機下,隊列和交換機的綁定會定義一種路由模式,那麼,通配符就要在這種路由模式和路由鍵之間匹配後交換機才能轉發消息。
headers 也是根據規則匹配, 相較於 direct 和 topic 固定地使用 routing_key , headers 則是一個自定義匹配規則的類型.
在隊列與交換器綁定時, 會設定一組鍵值對規則, 消息中也包括一組鍵值對( headers 屬性), 當這些鍵值對有一對, 或所有匹配時, 消息被投送到對應隊列.
消息廣播的模式,也就是咱們的發佈訂閱模式。Fanout Exchange 消息廣播的模式,無論路由鍵或者是路由模式,會把消息發給綁定給它的所有隊列,若是配置了routing_key會被忽略。
消息消費者如何通知 Rabbit 消息消費成功?
消息經過 ACK 確認是否被正確接收,每一個 Message 都要被確認(acknowledged),能夠手動去 ACK 或自動 ACK 自動確認會在消息發送給消費者後當即確認,但存在丟失消息的可能,若是消費端消費邏輯拋出異常,也就是消費端沒有處理成功這條消息,那麼就至關於丟失了消息 若是消息已經被處理,但後續代碼拋出異常,使用 Spring 進行管理的話消費端業務邏輯會進行回滾,這也一樣形成了實際意義的消息丟失 若是手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確承認以在業務失敗後進行一些操做,若是消息未被 ACK 則會發送到下一個消費者 若是某個服務忘記 ACK 了,則 RabbitMQ 不會再發送數據給它,由於 RabbitMQ 認爲該服務的處理能力有限 ACK 機制還能夠起到限流做用,好比在接收到某條消息時休眠幾秒鐘 消息確認模式有:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency>
配置springboot的yaml文件
server: servlet: context-path: /rabbitmq port: 9004 spring: application: name: rabbitmq rabbitmq: host: localhost virtual-host: /crawl username: xxxx password: xxx port: 5672 # 消息失敗返回,好比路由不到隊列時觸發回調 publisher-returns: true # 消息正確發送確認 publisher-confirms: true template: retry: enabled: true initial-interval: 2s listener: simple: # 手動ACK 不開啓自動ACK模式,目的是防止報錯後未正確處理消息丟失 默認 爲 none acknowledge-mode: manual
另外咱們還要配置ACK確認回調的配置,經過實現RabbitTemplate.ConfirmCallback接口,消息發送到Broker後觸發回調,也就是隻能確認是否正確到達Exchange中。
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author lijianqing * @version 1.0 * @ClassName RabbitTemplateConfirmCallback * @date 2019/4/23 12:55 */ @Component @Slf4j public class RabbitTemplateConfirmCallback implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //指定 ConfirmCallback rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息惟一標識:{},確認結果:{},失敗緣由:{}", correlationData, ack, cause); } }
消息失敗返回,好比路由步到隊列就會觸發,若是西區奧西發送到交換器成功,可是沒有匹配的隊列就會觸發回調
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * @author lijianqing * @version 1.0 * @ClassName RabbitTemplateReturnCallback * @date 2019/4/23 12:55 */ @Component @Slf4j public class RabbitTemplateReturnCallback implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { //指定 ReturnCallback rabbitTemplate.setReturnCallback(this); rabbitTemplate.setMandatory(true); } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主體 message : " + message); log.info("消息主體 message : " + replyCode); log.info("描述:" + replyText); log.info("消息使用的交換器 exchange : " + exchange); log.info("消息使用的路由鍵 routing : " + routingKey); } }
以下圖:
「P」是咱們的生產者,「C」是咱們的消費者。中間的框是一個隊列 - RabbitMQ表明消費者保留的消息緩衝區。
新增SimpleConfig,建立咱們要投放的隊列:代碼以下
/** * 隊列直接投放 * @author lijianqing * @version 1.0 * @ClassName SimpleConfig * @date 2019/4/26 15:11 */ @Configuration public class SimpleConfig { @Bean public Queue simpleQueue() { return new Queue("simple"); } }
再分別建立消息發送者與消息接收者:
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import zero.springboot.study.rabbitmq.model.User; import java.util.UUID; /** * @author lijianqing * @version 1.0 * @ClassName HelloSender * @date 2019/4/23 11:22 */ @Component @Slf4j public class HelloSender { @Autowired private RabbitTemplate rabbitTemplate; public void send() { User user = new User(); user.setName("青"); user.setPass("111111"); //發送消息到hello隊列 log.info("發送消息:{}", user); rabbitTemplate.convertAndSend("hello", user, new CorrelationData(UUID.randomUUID().toString())); String msg = "hello qing"; log.info("發送消息:{}", msg); rabbitTemplate.convertAndSend("simple", msg); } }
import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import zero.springboot.study.rabbitmq.model.User; import java.io.IOException; /** * 監聽hello隊列 * * @author lijianqing * @version 1.0 * @ClassName HelloReceiver * @date 2019/4/23 11:42 */ @Component @Slf4j @RabbitListener(queues = "simple") public class HelloReceiver { @RabbitHandler public void processUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("收到消息:{}", user); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } @RabbitHandler public void processString(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("收到消息:{}", message); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
這樣就實現了簡單的消息發送到指定隊列的模式。咱們寫一個測試類
主要配置咱們的Direct Exchange交換機,而且建立隊列經過routing key 綁定到交換機上
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * * @author lijianqing * @version 1.0 * @ClassName DirectConfig * @date 2019/4/23 11:15 */ @Configuration public class DirectConfig { //隊列名字 public static final String QUEUE_NAME = "direct_name"; //交換機名稱 public static final String EXCHANGE = "zero-exchange"; //路由鍵名稱 public static final String ROUTING_KEY = "routingKey"; @Bean public Queue blueQueue() { return new Queue(QUEUE_NAME, true); } @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE); } @Bean public Binding bindingBlue() { return BindingBuilder.bind(blueQueue()).to(defaultExchange()).with(ROUTING_KEY); } }
接下來咱們建立生產者與消費者
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import zero.springboot.study.rabbitmq.config.DirectConfig; import zero.springboot.study.rabbitmq.model.User; import java.util.UUID; /** * @author lijianqing * @version 1.0 * @ClassName HelloSender * @date 2019/4/23 11:22 */ @Component @Slf4j public class DirectSender { @Autowired private RabbitTemplate rabbitTemplate; public void send() { User user = new User(); user.setName("青"); user.setPass("111111"); //發送消息到hello隊列 log.info("DirectReceiver發送消息:{}", user); rabbitTemplate.convertAndSend(DirectConfig.EXCHANGE, DirectConfig.ROUTING_KEY, user, new CorrelationData(UUID.randomUUID().toString())); String msg = "hello qing"; log.info("DirectReceiver發送消息:{}", msg); rabbitTemplate.convertAndSend(DirectConfig.EXCHANGE, DirectConfig.ROUTING_KEY, msg); } }
/** * * @author lijianqing * @version 1.0 * @ClassName HelloReceiver * @date 2019/4/23 11:42 */ @Component @Slf4j @RabbitListener(queues = "direct_name") public class DirectReceiver { @RabbitHandler public void processUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("DirectReceiver收到消息:{}", user); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } @RabbitHandler public void processString(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("收到消息:{}", message); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
建立隊列以及交換機。並經過路由匹配規則將隊列與交換機綁定上
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * queueMessages 匹配topic.#,queueMessage 只匹配 "topic.message" * * @author lijianqing * @version 1.0 * @ClassName TopicRabbitConfig * @date 2019/4/23 15:03 */ @Configuration public class TopicRabbitConfig { final static String message = "topic.message"; final static String messages = "topic.messages"; @Bean public Queue queueMessage() { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages() { return new Queue(TopicRabbitConfig.messages); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } @Bean Binding bindingExchangeMessage(@Qualifier("queueMessage") Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessages(@Qualifier("queueMessages") Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author lijianqing * @version 1.0 * @ClassName TopicSender * @date 2019/4/23 15:10 */ @Component @Slf4j public class TopicSender { @Autowired private RabbitTemplate rabbitTemplate; /** * 匹配topic.message,兩個隊列都會收到 */ public void send1() { String context = "hi, i am message 1"; log.info("主題發送 : {}" , context); rabbitTemplate.convertAndSend("topicExchange", "topic.message", context); } /** * 匹配topic.messages */ public void send2() { String context = "hi, i am messages 2"; log.info("主題發送 : {}" , context); rabbitTemplate.convertAndSend("topicExchange", "topic.messages", context); } }
@Component @RabbitListener(queues = "topic.message") @Slf4j public class TopicReceiver { @RabbitHandler public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("topic.message Receiver1 {}: ", message); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
第二個消費者
@Component @RabbitListener(queues = "topic.messages") @Slf4j public class TopicReceiver2 { @RabbitHandler public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("topic.messages Receiver2 : {}", message); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
也就是發佈、訂閱。全部綁定在交換機上的隊列都會收到消息,發送端指定的routing key的任何字符都會被忽略
配置交換機與隊列
@Configuration public class FanoutRabbitConfig { @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
@Component @Slf4j public class FanoutSender { @Autowired private RabbitTemplate rabbitTemplate; public void send() { String context = "hi, fanout msg "; rabbitTemplate.convertAndSend("fanoutExchange", null, context); } }
@Component @RabbitListener(queues = "fanout.A") @Slf4j public class FanoutReceiverA { @RabbitHandler public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) { log.info("fanout Receiver A : {}" , message); // 手動ACK try { // //消息確認,表明消費者確認收到當前消息,語義上表示消費者成功處理了當前消息。 channel.basicAck(tag, false); // 表明消費者拒絕一條或者多條消息,第二個參數表示一次是否拒絕多條消息,第三個參數表示是否把當前消息從新入隊 // channel.basicNack(deliveryTag, false, false); // 表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊 // channel.basicReject(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
剩下的B、C就不重複貼了。
import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import zero.springboot.study.rabbitmq.direct.DirectSender; import zero.springboot.study.rabbitmq.fanout.FanoutSender; import zero.springboot.study.rabbitmq.simple.HelloSender; import zero.springboot.study.rabbitmq.topic.TopicSender; @RunWith(SpringRunner.class) @SpringBootTest(classes = RabbitmqApplication.class) public class RabbitmqApplicationTests { @Autowired private DirectSender directSender; @Autowired private TopicSender topicSender; @Autowired private FanoutSender fanoutSender; @Autowired private HelloSender helloSender; @Test public void testDirect() { directSender.send(); } @Test public void topic1() { topicSender.send1(); } @Test public void topic2() { topicSender.send2(); } @Test public void testFanout() { fanoutSender.send(); } @Test public void testSimple() { helloSender.send(); } }
全部的代碼已在個人github上分享,你們能夠具體查看與提出意見。github rabbitmq模塊
您的點贊與轉發是我最大的確定。
歡迎關注個人微信公衆號 JavaStorm