pom依賴github
<!--amqp依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置文件spring
server.port=8080 spring.application.name=springboot-rabbitmq spring.rabbitmq.host=192.168.242.131 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回 spring.rabbitmq.publisher-returns=true # 開啓ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
即一個生產者對一個消費者模式安全
配置類springboot
@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } }
消費者服務器
@Component //監聽隊列kinson @RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }
消息生產者測試接口app
/** * 單條消息發送給單個隊列,該隊列只有一個消費者 * * @return */ @GetMapping(value = "send") public String send() { String content = "Date:" + System.currentTimeMillis(); //發送默認交換機對應的的隊列kinson amqpTemplate.convertAndSend("kinson", content); return content; }
即一個生產者對多個消費者,該模式下能夠是一個生產者將消息投遞到一個隊列,該隊列對應多個消費者,此時每條消息只會被消費一次,多個消費者循環處理。另外也能夠是一個生產者將消息投遞到多個隊列裏,此時消息是被複制處理。異步
模式一:ide
配置類spring-boot
@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } }
消費者1
@Component //監聽隊列kinson @RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }
消費者2
@Component //監聽隊列kinson @RabbitListener(queues = {"kinson"}) public class MyReceiver2 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver2 :" + msg); } }
消息生產者測試接口
/** * 發送多條消息到一個隊列,該隊列有多個消費者 * * @return */ @GetMapping(value = "sendMore") public String sendMore() { List<String> result = new ArrayList<String>(); //發送10條數據 for (int i = 0; i < 10; i++) { String content = "第" + (i + 1) + "次發送 Date:" + System.currentTimeMillis(); //發送默認交換機對應的的隊列kinson,此時有兩個消費者MyReceiver1和MyReceiver2,每條消息只會被消費一次 amqpTemplate.convertAndSend("kinson", content); result.add(content); } return String.join("<br/>", result); }
模式二:
配置類
@Configuration public class RabbitMqConfig { @Bean public Queue kinsonQueue() { return new Queue("kinson"); } @Bean public Queue kinsonQueue2() { return new Queue("kinson2"); } }
kinson隊列消費者
@Component //監聽隊列kinson @RabbitListener(queues = {"kinson"}) public class MyReceiver1 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver1 :" + msg); } }
kinson2隊列消費者
@Component //監聽隊列kinson2 @RabbitListener(queues = {"kinson2"}) public class MyReceiver3 { @RabbitHandler public void receiver(String msg) { System.out.println("MyReceiver3 :" + msg); } }
消息生產者測試接口
/** * 發送多條消息到多個隊列 * * @return */ @GetMapping(value = "sendMoreQueue") public String sendMoreQueue() { List<String> result = new ArrayList<String>(); //發送10條數據 for (int i = 0; i < 10; i++) { String content = "第" + (i + 1) + "次發送 Date:" + System.currentTimeMillis(); //發送默認交換機對應的的隊列kinson amqpTemplate.convertAndSend("kinson", content); //發送默認交換機對應的的隊列kinson2 amqpTemplate.convertAndSend("kinson2", content); result.add(content); } return String.join("<br/>", result); }
相應測試結果請自測
配置文件加入相應配置
# 開啓發送確認 spring.rabbitmq.publisher-confirms=true # 開啓發送失敗退回 spring.rabbitmq.publisher-returns=true # 開啓ACK spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
配置類,使用Fanout類型的Exchange,主要是設置隊列,交換機及綁定
@Configuration public class RabbitMqFanoutACKConfig { @Bean public Queue ackQueue() { return new Queue("ackQueue"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(ackQueue).to(fanoutExchange); } }
消息發送服務
@Service public class AckSenderService implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2);
} /** * 消息發送 */ public void send() { final String content = "如今時間是" + LocalDateTime.now(ZoneId.systemDefault()); //設置返回回調 rabbitTemplate.setReturnCallback(this); //設置確認回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息發送成功!"); } else { System.out.println("消息發送失敗," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }
消息消費者
@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver : 收到發送消息 " + sendMsg + ",收到消息時間" + LocalDateTime.now(ZoneId.systemDefault())); try { //告訴服務器收到這條消息已經被當前消費者消費了,能夠在隊列安全刪除,這樣後面就不會再重發了, //不然消息服務器覺得這條消息沒處理掉,後續還會再發 //第二個參數是消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }
測試訪問接口
/** * @return */ @GetMapping(value = "ackSend") public String ackSend() { senderService.send(); return "ok"; }
測試將Consumer確認代碼註釋掉,即
@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver : 收到發送消息 " + sendMsg + ",收到消息時間" + LocalDateTime.now(ZoneId.systemDefault())); try { //告訴服務器收到這條消息已經被當前消費者消費了,能夠在隊列安全刪除,這樣後面就不會再重發了, //不然消息服務器覺得這條消息沒處理掉,後續還會再發 //第二個參數是消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息 //channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }
此時訪問測試接口,能夠看到當消息發送完被消費掉以後,隊列的狀態變爲unacked。
當停掉服務時,unacked狀態變爲Ready
再從新啓動服務時會從新發送消息
事務的實現主要是對信道(Channel)的設置,主要的方法有三個: //聲明啓動事務模式 channel.txSelect(); //提交事務 channel.txComment(); //回滾事務 channel.txRollback();
消息發送示例
public void publish() throws KeyManagementException, NoSuchAlgorithmException, URISyntaxException, IOException, TimeoutException { // 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/"); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(TX_QUEUE, true, false, false, null); try { long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { // 聲明事務 channel.txSelect(); String message = String.format("時間 => %s", System.currentTimeMillis()); // 發送消息 channel.basicPublish("", TX_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); // 提交事務 channel.txCommit(); } long endTime = System.currentTimeMillis(); System.out.println("事務模式,發送10條數據,執行花費時間:" + (endTime - startTime) + "s"); } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); } }
消息消費示例
public void consume() throws IOException, TimeoutException, InterruptedException { Connection conn = RabbitMqConnFactoryUtil.getRabbitConn(); Channel channel = conn.createChannel(); channel.queueDeclare(TX_QUEUE, true, false, false, null); // 聲明事務 channel.txSelect(); try { //單條消息獲取進行消費 GetResponse resp = channel.basicGet(TX_QUEUE, false); String message = new String(resp.getBody(), "UTF-8"); System.out.println("收到消息:" + message); //消息拒絕 // channel.basicReject(resp.getEnvelope().getDeliveryTag(), true); // 消息確認 channel.basicAck(resp.getEnvelope().getDeliveryTag(), false); // 提交事務 channel.txCommit(); } catch (Exception e) { // 回滾事務 channel.txRollback(); } finally { //關閉通道、鏈接 channel.close(); conn.close(); } }
Confirm發送方確認模式使用和事務相似,也是經過設置Channel進行發送方確認的,Confirm的三種實現方式: //方式一:普通發送方確認模式 channel.waitForConfirms(); //方式二:批量確認模式 channel.waitForConfirmsOrDie(); //方式三:異步監聽發送方確認模式 channel.addConfirmListener();
消息發佈示例
public void publish() throws IOException, TimeoutException, InterruptedException { // 建立鏈接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost("/"); factory.setHost(host); factory.setPort(port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 聲明隊列 channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null); long startTime = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { // 開啓發送方確認模式 channel.confirmSelect(); String message = String.format("時間 => %s", System.currentTimeMillis()); channel.basicPublish("", CONFIRM_QUEUE, null, message.getBytes("UTF-8")); } //添加確認監聽器 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未確認消息,標識:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已確認消息,標識:%d,多個消息:%b", deliveryTag, multiple)); } }); long endTime = System.currentTimeMillis(); System.out.println("執行花費時間:" + (endTime - startTime) + "s"); }