1.pom.xml添加依賴html
<!--RabbitMq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.application.ymlspring
spring: application: admin: springboot-rabbitmq rabbitmq: host: localhost port: 5672 username: guest password: guest publisher-returns: true listener: direct: acknowledge-mode: manual simple: acknowledge-mode: manual
3.添加配置類RabbitMqConfig springboot
@Configuration public class RabbitMqConfig { @Autowired public ConnectionFactory connectionFactory; @Bean public RabbitAdmin rabbitAdmin() { RabbitAdmin admin = new RabbitAdmin(connectionFactory); return admin; } @Bean public Queue kinsonQueue(){//建立隊列 Map<String, Object> arguments = new HashMap<>(); arguments.put("x-message-ttl", 40000); return new Queue("queue1",true,false,false,arguments); } @Bean public FanoutExchange defaultExchange(){//建立交換機 return new FanoutExchange("exchange-1"); } @Bean public Binding binQueueExchange(){//隊列綁定交換機 return BindingBuilder.bind(kinsonQueue()).to(defaultExchange()); } }
4.添加生產者app
@Component public class RabbitSender { @Autowired private RabbitTemplate rabbitTemplate; private final Logger log = LoggerFactory.getLogger(this.getClass()); //回調函數: confirm確認 final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("correlationData: " + correlationData); log.info("ack: " + ack); if (!ack) { log.info("異常處理...."); } } }; //回調函數: return返回 final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("return exchange: {}, routingKey: {}, replyCode: {}, replyText: {}", exchange, routingKey, replyCode, replyText); } }; //發送消息方法調用: 構建Message消息 public void send(String queue, Map<String, Object> properties) throws Exception { MessageHeaders mhs = new MessageHeaders(properties); Message<Object> msg = MessageBuilder.createMessage("queue1", mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); rabbitTemplate.convertAndSend(queue, msg); } }
5.添加消費者ide
@Component public class RabbitReceiver { @RabbitListener(queues = "queue1") @RabbitHandler public void onMessage(Message message, Channel channel,@Payload String string) throws Exception { Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); // 手工ACK channel.basicAck(deliveryTag, false); System.err.println("消費端: " +message); } }
6.測試函數
@SpringBootTest class DemoApplicationTests { @Autowired private RabbitSender rabbitSender; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Test public void testSender1() throws Exception { Map<String, Object> properties = new HashMap<>(); properties.put("number", "12345"); properties.put("send_time", simpleDateFormat.format(new Date())); rabbitSender.send("queue1", properties); } }
RabbitMQ介紹:spring-boot
1.交換機:測試
處理路由鍵ui
須要將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵徹底匹配。這是一個完整的匹配。若是一個隊列
綁定到該交換機上要求路由鍵 「 dog」,則只有被標記爲 「 的消息才被轉發,不會轉發 dog.puppy ,也不會轉發dog.guard ,只會轉發 dog 。this
不處理路由鍵
你只須要將隊列綁定到交換機上, 發送消息 到交換機都會被轉發到與該交換機綁定的全部隊列上。
將路由鍵和某模式進行匹配
此時隊列須要綁定要一個模式上。符號「#」匹配一個或多個詞,符號「*」匹配一個詞。所以「audit.#」可以匹配到「audit.irs.corporate」,可是「audit.*」 只會匹配到「audit.irs」。
不處理路由鍵
而是根據發送的消息內容中的headers屬性進行匹配。在綁定Queue與Exchange時指定一組鍵值對;當消息發送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配;若是徹底匹配則消息會路由到該隊列,不然不會路由到該隊列。headers屬性是一個鍵值對,能夠是Hashtable,鍵值對的值能夠是任何類型。而fanout,direct,topic 的路由鍵都須要要字符串形式的。
Default Exchange(默認交換機 )
默認交換機其實是一個由RabbitMQ預先聲明好的名字爲空字符串的直連交換機(direct exchange)。它有一個特殊的屬性使得它對於簡單應用特別有用處:那就是每一個新建隊列(queue)都會自動綁定到默認交換機上,綁定的路由鍵(routing key)名稱與隊列名稱相同。
Dead Letter Exchange(死信交換機)
在默認狀況,若是消息在投遞到交換機時,交換機發現此消息沒有匹配的隊列,則這個消息將被丟棄。爲了解決這個問題,RabbitMQ中有一種交換機叫死信交換機。當消費者不能處理接收到的消息時,將這個消息從新發布到另一個隊列中,等待重試或者人工干預。這個過程當中的exchange和queue就是所謂的」Dead Letter Exchange 和 Queue」
交換機的屬性:
Name:交換機名稱
Durability:是否持久化。若是持久性,則RabbitMQ重啓後,交換機還存在
Auto-delete:當全部與之綁定的消息隊列都完成了對此交換機的使用後刪掉它
Arguments:擴展參數
官網 http://www.rabbitmq.com/getstarted.html
參考資料: https://docs.spring.io/spring-boot/docs/current/reference/html/common-application-properties.html
基礎信息
spring.rabbitmq.host: 默認localhost spring.rabbitmq.port: 默認5672 spring.rabbitmq.username: 用戶名 spring.rabbitmq.password: 密碼 spring.rabbitmq.virtual-host: 鏈接到代理時用的虛擬主機 spring.rabbitmq.addresses: 鏈接到server的地址列表(以逗號分隔),先addresses後host spring.rabbitmq.requested-heartbeat: 請求心跳超時時間,0爲不指定,若是不指定時間單位默認爲妙 spring.rabbitmq.publisher-confirms: 是否啓用【發佈確認】,默認false spring.rabbitmq.publisher-returns: 是否啓用【發佈返回】,默認false spring.rabbitmq.connection-timeout: 鏈接超時時間,單位毫秒,0表示永不超時