Springboot2.x整合RabbitMQ

一、RabbitMQ介紹

可參照RabbitMQ筆記git

二、接入配置

pom依賴github

<!--amqp依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件spring

server.port=8080 spring.application.name=springboot-rabbitmq spring.rabbitmq.host=192.168.242.131 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回 spring.rabbitmq.publisher-returns=true # 開啓ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual

三、一對一模式

  即一個生產者對一個消費者模式安全

配置類springboot

@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } }

消費者服務器

@Component //監聽隊列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }

消息生產者測試接口app

/** * 單條消息發送給單個隊列,該隊列只有一個消費者 * * @return
     */ @GetMapping(value = "send") public String send() { String content = "Date:" + System.currentTimeMillis(); //發送默認交換機對應的的隊列kinson
        amqpTemplate.convertAndSend("kinson", content); return content; }

四、一對多模式

  即一個生產者對多個消費者,該模式下能夠是一個生產者將消息投遞到一個隊列,該隊列對應多個消費者,此時每條消息只會被消費一次,多個消費者循環處理。另外也能夠是一個生產者將消息投遞到多個隊列裏,此時消息是被複制處理。異步

模式一:ide

配置類spring-boot

@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } }

消費者1

@Component //監聽隊列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }

消費者2

@Component //監聽隊列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver2 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver2 :" + msg); } }

消息生產者測試接口

/** * 發送多條消息到一個隊列,該隊列有多個消費者 * * @return
     */ @GetMapping(value = "sendMore") public String sendMore() { List<String> result = new ArrayList<String>(); //發送10條數據
        for (int i = 0; i < 10; i++) { String content = "第" + (i + 1) + "次發送 Date:" + System.currentTimeMillis(); //發送默認交換機對應的的隊列kinson,此時有兩個消費者MyReceiver1和MyReceiver2,每條消息只會被消費一次
            amqpTemplate.convertAndSend("kinson", content); result.add(content); } return String.join("<br/>", result); }

模式二:

配置類

@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } @Bean public Queue kinsonQueue2() { return new Queue("kinson2"); } }

kinson隊列消費者

@Component //監聽隊列kinson
@RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }

kinson2隊列消費者

@Component //監聽隊列kinson2
@RabbitListener(queues = {"kinson2"}) public class MyReceiver3 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver3 :" + msg); } }

消息生產者測試接口

  /** * 發送多條消息到多個隊列 * * @return
     */ @GetMapping(value = "sendMoreQueue") public String sendMoreQueue() { List<String> result = new ArrayList<String>(); //發送10條數據
        for (int i = 0; i < 10; i++) { String content = "第" + (i + 1) + "次發送 Date:" + System.currentTimeMillis(); //發送默認交換機對應的的隊列kinson
            amqpTemplate.convertAndSend("kinson", content); //發送默認交換機對應的的隊列kinson2
            amqpTemplate.convertAndSend("kinson2", content); result.add(content); } return String.join("<br/>", result); }

相應測試結果請自測

五、ACK消息確認

配置文件加入相應配置

# 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回 spring.rabbitmq.publisher-returns=true # 開啓ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual

配置類,使用Fanout類型的Exchange,主要是設置隊列,交換機及綁定

@Configuration public class RabbitMqFanoutACKConfig { @Bean public Queue ackQueue() { return new Queue("ackQueue"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(ackQueue).to(fanoutExchange); } }

消息發送服務

@Service public class AckSenderService implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
} /** * 消息發送 */ public void send() { final String content = "如今時間是" + LocalDateTime.now(ZoneId.systemDefault()); //設置返回回調 rabbitTemplate.setReturnCallback(this); //設置確認回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息發送成功!"); } else { System.out.println("消息發送失敗," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }

消息消費者

@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver  : 收到發送消息 " + sendMsg + ",收到消息時間"
                + LocalDateTime.now(ZoneId.systemDefault())); try { //告訴服務器收到這條消息已經被當前消費者消費了,能夠在隊列安全刪除,這樣後面就不會再重發了, //不然消息服務器覺得這條消息沒處理掉,後續還會再發 //第二個參數是消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }

測試訪問接口

  /** * @return
     */ @GetMapping(value = "ackSend") public String ackSend() { senderService.send(); return "ok"; }

測試將Consumer確認代碼註釋掉,即

@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver  : 收到發送消息 " + sendMsg + ",收到消息時間"
                + LocalDateTime.now(ZoneId.systemDefault())); try { //告訴服務器收到這條消息已經被當前消費者消費了,能夠在隊列安全刪除,這樣後面就不會再重發了, //不然消息服務器覺得這條消息沒處理掉,後續還會再發 //第二個參數是消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息 //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }

此時訪問測試接口,能夠看到當消息發送完被消費掉以後,隊列的狀態變爲unacked。

當停掉服務時,unacked狀態變爲Ready

再從新啓動服務時會從新發送消息

六、事務機制

事務的實現主要是對信道(Channel)的設置,主要的方法有三個: //聲明啓動事務模式
channel.txSelect(); //提交事務
channel.txComment(); //回滾事務
channel.txRollback();

消息發送示例

public void publish() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException { // 建立鏈接
        ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/"); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); // 建立信道
        Channel channel = conn.createChannel(); // 聲明隊列
        channel.queueDeclare(TX_QUEUE, true, false, false, null); try { long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { // 聲明事務
 channel.txSelect(); String message = String.format("時間 => %s", System.currentTimeMillis()); // 發送消息
                channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); // 提交事務
 channel.txCommit(); } long endTime = System.currentTimeMillis(); System.out.println("事務模式,發送10條數據,執行花費時間:" + (endTime - startTime) + "s"); } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); } }

消息消費示例

public void consume() throws IOException, TimeoutException, InterruptedException { Connection conn = RabbitMqConnFactoryUtil.getRabbitConn(); Channel channel = conn.createChannel(); channel.queueDeclare(TX_QUEUE, true, false, false, null); // 聲明事務
 channel.txSelect(); try { //單條消息獲取進行消費
            GetResponse resp = channel.basicGet(TX_QUEUE, false); String message = new String(resp.getBody(), "UTF-8"); System.out.println("收到消息:" + message); //消息拒絕 // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); // 消息確認
            channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 提交事務
 channel.txCommit(); } catch (Exception e) { // 回滾事務
 channel.txRollback(); } finally { //關閉通道、鏈接
 channel.close(); conn.close(); } }

七、Confirm消息確認

Confirm發送方確認模式使用和事務相似,也是經過設置Channel進行發送方確認的,Confirm的三種實現方式: //方式一:普通發送方確認模式
channel.waitForConfirms(); //方式二:批量確認模式
channel.waitForConfirmsOrDie(); //方式三:異步監聽發送方確認模式
channel.addConfirmListener();

消息發佈示例

public void publish() throws IOException, TimeoutException, InterruptedException { // 建立鏈接
        ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/"); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); // 建立信道
        Channel channel = conn.createChannel(); // 聲明隊列
        channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null); long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { // 開啓發送方確認模式
 channel.confirmSelect(); String message = String.format("時間 => %s", System.currentTimeMillis()); channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8")); } //添加確認監聽器
        channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未確認消息,標識:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); } }); long endTime = System.currentTimeMillis(); System.out.println("執行花費時間:" + (endTime - startTime) + "s"); }

 

RabbitMQ簡單示例源碼參照Github

相關文章
相關標籤/搜索