RabbitMQ是實現了高級消息隊列協議(Advanced Message Queueing Protocol , AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。java
一、AMQP協議spring
RocketMQ基於JMS(Java Messaging Service , java 消息服務)協議。安全
重點:協議的做用:二者的交互基礎。springboot
三個主要的功能模塊:dom
1.一、「exchange」接收發布應用程序發送的消息,並根據必定的規則將這些消息路由到「消息隊列」。(交換器類型:direct, fanout, topic, header)spring-boot
1.二、「message queue」存儲消息,直到這些消息被消費者安全處理完爲止。ui
1.三、「binding」定義了exchange和message queue之間的關聯,提供路由規則。spa
AMQP協議是一個二進制協議。代理
協議模型圖:code
二、RabbitMQ
2.一、交換器類型:
2.1.一、direct (默認):每一個隊列都會使用它的隊列名字做爲路由關鍵字(routing key)去自動地綁定到默認交換器上。
2.1.二、fanout : 該類型的交換器會將消息轉發給全部與之綁定的隊列上。(相似廣播機制)
2.1.三、topic : 該類型的交換器會視消息路由關鍵字和綁定路由關鍵字之間的匹配狀況,進行消息的路由轉發。解析:一個消息過來後,將根據路由key轉發給符合要求的全部隊列。
2.1.四、headers : 用於路由的屬性是取自於消息header屬性的,當消息header的值與隊列綁定時指定的值相同時,消息就會路由至相應的隊列中。
2.二、虛擬機(virtual hosts)
AMQP使用了虛擬機的概念,在一個broker上面劃分出多個隔離的環境(各環境下的用戶、交換器以及隊列等互不影響)。這樣一來,AMQP客戶端們在進行鏈接的時候,須要協商指定同一個vhost才能進行正常的往來業務。
三、spring-boot-starter-amqp 的使用
一、BeanConfig
@Configuration public class AmqpConfig { @Resource private RabbitTemplate rabbitTemplate; @Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true); rabbitTemplate.setUsePublisherConnection(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息發送到exchange成功,id: {}", correlationData.getId()); } else { log.info("消息發送到exchange失敗,緣由: {}", cause); } }); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationId(); log.info("消息:{} 發送失敗, 應答碼:{} 緣由:{} 交換機: {} 路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey); }); return rabbitTemplate; } /** * 修改好友備註交換機 */ @Bean(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE) public Exchange modifyFriendRemakeNameDirectExchange() { return ExchangeBuilder .directExchange(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE) .durable(true) .build(); } /** * 隊列 */ @Bean(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE) public Queue modifyFriendRemakeNameQueue() { return QueueBuilder.durable(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE).build(); } /** * 隊列 關聯 路由key 關聯 交換機 * @return */ @Bean public Binding modifyFriendRemakeNameBinding(Queue modifyFriendRemakeNameQueue, Exchange modifyFriendRemakeNameDirectExchange) { return BindingBuilder .bind(modifyFriendRemakeNameQueue) .to(modifyFriendRemakeNameDirectExchange) .with(AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY) .noargs(); } }
二、AmqpConstant
public interface AmqpConstant { /** * 修改好友備註交換機 */ String MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE = "modifyFriendRemakeNameDirectExchange"; /** * 修改好友備註隊列 */ String MODIFY_FRIEND_REMAKE_NAME_QUEUE = "modifyFriendRemakeNameQueue"; /** * 修改好友備註路由鍵 */ String MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY = "modifyFriendRemakeNameRoutingKey"; }
三、RabbitUtils
public class RabbitUtils { private static final Logger logger = LoggerFactory.getLogger(RabbitUtils.class); public static void sendModifyRemakeNameMsg(RabbitTemplate rabbitTemplate, RabbitFriend rabbitFriend){ try{ if(rabbitFriend.getLists().size() < 1){ return; } logger.info("sendModifyRemakeNameMsg request:"+JSON.toJSONString(rabbitFriend)); CorrelationData correlationDataId = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend( AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_DIRECT_EXCHANGE, AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_ROUTING_KEY, rabbitFriend, correlationDataId ); }catch (Exception e){ logger.info("mq消息發送錯誤",e); } } }
四、生產者
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void test1(){ RabbitFriend rabbitFriend = new RabbitFriend(); rabbitFriend.setUserId(100000022); rabbitFriend.setLists(new ArrayList<>()); rabbitFriend.getLists().add(new RabbitNickName("2637845647931","2637845647931123哈哈")); rabbitFriend.getLists().add(new RabbitNickName("2637","2637哈哈")); RabbitUtils.sendModifyRemakeNameMsg(rabbitTemplate,rabbitFriend); try { int read = System.in.read(); } catch (IOException e) { e.printStackTrace(); } System.out.println("end!"); } }
五、消費者
@Component public class RemakeNameMessageListener { private static final Logger log = LoggerFactory.getLogger(RemakeNameMessageListener.class); @RabbitListener(queues = AmqpConstant.MODIFY_FRIEND_REMAKE_NAME_QUEUE) public void remakeNameQueueListener(Message message, Channel channel) throws IOException { try { RabbitFriend rabbitFriend = RabbitUtils.buildMessage(message, RabbitFriend.class); log.info("RemakeNameMessageListener|remakeNameQueueListener,correlationDataId:{},RabbitFriend:{}", message.getMessageProperties().getCorrelationId(), rabbitFriend); //do you things log.info("RemakeNameMessageListener|remakeNameQueueListener,Consumption of success"); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } catch (Exception e) { log.error("RemakeNameMessageListener|remakeNameQueueListener,Consumption of failed,cause:{}", e.getMessage()); channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true); } } }
六、springboot 配置文件
spring: rabbitmq: host: 192.168.4.21 port: 5672 username: sasa password: sasa publisher-confirms: true #手動確認 publisher-returns: true virtualHost: / #虛擬機路徑 template: mandatory: true retry: enabled: true #重試 multiplier: 2 #重試次數