1.引 spring-boot-starter-amqp**java
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
自動配置原理說明 RabbitAutoConfigurationweb
一、有自動配置了鏈接工廠CachingConnectionFactory;獲取與rabbitmq鏈接信息spring
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { @Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception { ... } ...
二、RabbitProperties封裝了 RabbitMQ的配置json
@ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitProperties { /** * RabbitMQ host. */ private String host = "localhost"; /** * RabbitMQ port. */ private int port = 5672; ....
application.yml配置app
spring: rabbitmq: host: xxx.xxx.xxx.xxx username: guest password: guest port: 5672
三、RabbitTemplate:給RabbitMQ發送和接收消息ide
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { .... @Configuration @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration { private final RabbitProperties properties; private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers; public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { this.properties = properties; this.messageConverter = messageConverter; this.retryTemplateCustomizers = retryTemplateCustomizers; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitOperations.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = this.messageConverter.getIfUnique(); if (messageConverter != null) { template.setMessageConverter(messageConverter); } template.setMandatory(determineMandatoryFlag()); RabbitProperties.Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate(new RetryTemplateFactory( this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())).createRetryTemplate( properties.getRetry(), RabbitRetryTemplateCustomizer.Target.SENDER)); } map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis) .to(template::setReceiveTimeout); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout); map.from(properties::getExchange).to(template::setExchange); map.from(properties::getRoutingKey).to(template::setRoutingKey); map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); return template; } }
四、AmqpAdmin:RabbitMQ系統管理組件,用來聲明隊列,交換器等 , 當沒有在網頁端本身建立queue、exchange、Binding時可採用 AmqpAdmin:建立和刪除queue、exchange、Bindingspring-boot
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { ... @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } }
2.測試RabbitMQ源碼分析
1)單波-點對點測試
@RunWith(SpringRunner.class) @SpringBootTest public class Springboot02AmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Test public void contextLoads() { //message須要本身定義,定義一個消息體內容 //rabbitTemplate.send(exchage,routeKey,message); //經常使用的convertAndSend,消息體會自動轉換,object:默認當成消息體,只要傳入要發送的對象,自動序列化Babbitmq //rabbitTemplate.convertAndSend(exchange,routeKey,object); Map<String,Object> maps = new HashMap<String,Object>(); maps.put("msg","這是一個消息"); maps.put("data", Arrays.asList("helloworld",123,true)); //對象被默認序列化之後發送出去(jdk) rabbitTemplate.convertAndSend("exchange.direct","jatpeo.news",new Book("西遊記","吳承恩")); } //接收數據,如何將數據自動轉爲json發送出去? @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("jatpeo.news"); System.out.println(o.getClass()); System.out.println(o); }
經常使用的convertAndSend,消息體會自動轉換,object:默認當成消息體,只要傳入要發送的對象,自動序列化Babbitmq,對象被默認序列化之後發送出去(jdk)this
源碼分析:
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count implements BeanFactoryAware, RabbitOperations, MessageListener, ListenerContainerAware, PublisherCallbackChannel.Listener, Lifecycle, BeanNameAware { private MessageConverter messageConverter = new SimpleMessageConverter();
調用SimpleMessageConverter
public class SimpleMessageConverter extends WhiteListDeserializingMessageConverter implements BeanClassLoaderAware { content = new String(message.getBody(), encoding); } catch (UnsupportedEncodingException var8) { throw new MessageConversionException("failed to convert text-based Message content", var8); } } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) { try { content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); } catch (IllegalArgumentException | IllegalStateException | IOException var7) { throw new MessageConversionException("failed to convert serialized Message content", var7); } } }
自定義MessageConvert
新建MyAMQPConfig
@EnableRabbit//開啓基於註解的RabbitMQ @Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
2)廣播
@RunWith(SpringRunner.class) @SpringBootTest public class Springboot02AmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; /** * 二、廣播 * * 廣播路由鍵無所謂 * */ @Test public void Test(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("紅樓夢","曹雪芹")); }
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-TNAhBooF-1571057027287)(C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\1571056120180.png)]
@EnableRabbit + @RabbitListener 監聽消息隊列的內容
@RabbitListener:監聽隊列
@EnableRabbit:開啓基於註解的RabbitMq
@Service public class BookService { //只要這個消息隊列收到消息就打印消息,要讓此註解生效要在配置類中開啓註解@EnableRabbit @RabbitListener(queues = "jatpeo.news") public void receive(Book book){ System.out.println("收到消息。。。打印"); } @RabbitListener(queues = "jatpeo") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
5.1.三、AmqpAdmin:RabbitMQ
AmqpAdmin:RabbitMQ系統管理組件,用來聲明隊列,交換器等
當沒有在網頁端本身建立queue、exchange、Binding時可採用* AmqpAdmin:建立和刪除queue、exchange、Binding
注入AmqpAdmin
@RunWith(SpringRunner.class) @SpringBootTest public class Springboot02AmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Autowired AmqpAdmin amqpAdmin; @Test public void createExchange(){ //建立DirectExchange /* amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange")); System.out.println("建立完成");*/ //建立隊列 //amqpAdmin.declareQueue(new Queue("amqpAdmin.queue",true)); //建立綁定規則 amqpAdmin.declareBinding(new Binding("amqpAdmin.queue", Binding.DestinationType.QUEUE, "amqpAdmin.exchange","amqpAdmin.haha",null)); }
網頁端查看:
等。