環境:SpringBoot2.0spring
<!-- Spring Boot RabbitMQ 依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
yml配置api
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
# 開啓發送確認
#publisher-confirms: true
# 開啓發送失敗退回
#publisher-returns: true
#開啓ACK
#listener.direct.acknowledge-mode: manual
#listener.simple.acknowledge-mode: manual
RabbitConfig
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { //topic public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String TOPIC_EXCHANGE = "topic.exchange"; //fanout public static final String FANOUT_QUEUE1 = "fanout.queue1"; public static final String FANOUT_QUEUE2 = "fanout.queue2"; public static final String FANOUT_EXCHANGE = "fanout.exchange"; //direct public static final String DIRECT_QUEUE1 = "direct.queue1"; public static final String DIRECT_QUEUE2 ="direct.queue2" ; public static final String DIRECT_EXCHANGE = "direct.exchange"; /** * Topic模式 * * @return */ @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1); } @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2); } @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("zns.message"); } @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("zns.#"); } /** * Fanout模式 * Fanout 就是咱們熟悉的廣播模式或者訂閱模式,給Fanout交換機發送消息,綁定了這個交換機的全部隊列都收到這個消息。 * @return */ @Bean public Queue fanoutQueue1() { return new Queue(FANOUT_QUEUE1); } @Bean public Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE2); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } /** * direct模式 * 消息中的路由鍵(routing key)若是和 Binding 中的 binding key 一致, 交換器就將消息發到對應的隊列中。路由鍵與隊列名徹底匹配 * @return */ @Bean public Queue directQueue1() { return new Queue(DIRECT_QUEUE1); } @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } @Bean public Binding directBinding1() { return BindingBuilder.bind(directQueue1()).to(directExchange()).with("direct.pwl"); } }
發生消息安全
import com.zns.admin.api.TestModel; import com.zns.admin.api.ZNSAdminApiApplication; import com.zns.admin.api.config.RabbitConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class RabbitmqTest { @Autowired private AmqpTemplate rabbitTemplate; @Test public void testTopic() throws Exception { TestModel data=new TestModel(1,"topic..."); this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"zns.message", data); this.rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "zns.lzc", data); } @Test public void testFanout() throws Exception { TestModel data=new TestModel(2,"fanout..."); this.rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", data); } @Test public void testDirect() throws Exception { TestModel data=new TestModel(3,"direct..."); this.rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, "direct.pwl", data); } }
接收消息服務器
import com.zns.admin.api.config.RabbitConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitmqReceiver { @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1) public void receiveTopic1(TestModel data) { System.out.println("【receiveTopic1監聽到消息】" + data.toString()); } @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2) public void receiveTopic2(TestModel data) { System.out.println("【receiveTopic2監聽到消息】" + data.toString()); } @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1) public void receiveFanout1(TestModel data) { System.out.println("【receiveFanout1監聽到消息】" + data.toString()); } @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2) public void receiveFanout2(TestModel data) { System.out.println("【receiveFanout2監聽到消息】" + data.toString()); } @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1) public void receiveDirect1(TestModel data) { System.out.println("【receiveDirect1監聽到消息】" + data.toString()); } @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1) public void receiveDirect2(TestModel data) { System.out.println("【receiveDirect2監聽到消息】" + data.toString()); } }
配置文件打開相關配置開關ide
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest # 開啓發送確認 publisher-confirms: true # 開啓發送失敗退回 publisher-returns: true #開啓ACK listener.direct.acknowledge-mode: manual listener.simple.acknowledge-mode: manual
這裏使用Fanout類型的Exchange爲例,主要是設置隊列,交換機及綁定spring-boot
@Configuration public class RabbitMqFanoutACKConfig { @Bean public Queue ackQueue() { return new Queue("ackQueue"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingAckQueue2Exchange(Queue ackQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(ackQueue).to(fanoutExchange); } }
消息發送服務測試
@Service public class AckSenderService implements RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("AckSender returnedMessage " + message.toString() + " === " + i + " === " + s1 + " === " + s2); } /** * 消息發送 */ public void send() { final String content = "如今時間是" + LocalDateTime.now(ZoneId.systemDefault()); //設置返回回調 rabbitTemplate.setReturnCallback(this); //設置確認回調 rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息發送成功!"); } else { System.out.println("消息發送失敗," + cause + correlationData.toString()); } }); rabbitTemplate.convertAndSend("ackQueue", content); } }
消息消費者ui
@Component @RabbitListener(queues = {"ackQueue"}) public class MyAckReceiver { @RabbitHandler public void process(String sendMsg, Channel channel, Message message) { System.out.println("AckReceiver : 收到發送消息 " + sendMsg + ",收到消息時間" + LocalDateTime.now(ZoneId.systemDefault())); try { //告訴服務器收到這條消息已經被當前消費者消費了,能夠在隊列安全刪除,這樣後面就不會再重發了, //不然消息服務器覺得這條消息沒處理掉,後續還會再發 //第二個參數是消息的標識,false只確認當前一個消息收到,true確認全部consumer得到的消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); System.out.println("process success"); } catch (Exception e) { System.out.println("process fail"); e.printStackTrace(); } } }
調用senderService.send()測試this