SpringBoot具體整合rabbitMQ可參考:SpringBoot2.0應用(三):SpringBoot2.0整合RabbitMQspring
當項目中存在org.springframework.amqp.rabbit.core.RabbitTemplate
和com.rabbitmq.client.Channel
着兩個類時,SpringBoot將RabbitMQ須要使用到的對象註冊爲Bean,供項目注入使用。一塊兒看一下RabbitAutoConfiguration
類。app
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { @Configuration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory( RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception { PropertyMapper map = PropertyMapper.get(); CachingConnectionFactory factory = new CachingConnectionFactory( getRabbitConnectionFactoryBean(properties).getObject()); ...... return factory; } } @Configuration @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration { private final ObjectProvider<MessageConverter> messageConverter; private final RabbitProperties properties; ...... @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); ...... return template; } ......
RabbitAutoConfiguration
使和RabbitMQ相關的配置生效,並根據相關屬性建立和RabbitMQ的鏈接,並依賴鏈接工廠建立rabbitTemplate
。 RabbitAutoConfiguration
同時導入了RabbitAnnotationDrivenConfiguration
,注入了rabbitListenerContainerFactory
。ide
先看一個例子:post
@Bean Binding bindingExchangeMessage(Queue queue, TopicExchange exchange) { // 將隊列1綁定到名爲topicKey.A的routingKey return BindingBuilder.bind(queue).to(exchange).with("routingKey"); }
將queue經過"routingKey"綁定到exchange。 RabbitAdmin
類的initialize方法會拿出注入的Exchange,Queue和Binding進行聲明。ui
public void initialize() { ...... Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); ...... final Collection<Exchange> exchanges = filterDeclarables(contextExchanges); final Collection<Queue> queues = filterDeclarables(contextQueues); final Collection<Binding> bindings = filterDeclarables(contextBindings); ...... this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); }
如下面的發送爲例:this
rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "from key1");
這個方法會先對消息進行轉換,預處理,最終經過調用ChannelN
的basicPublish方法提交一個AMQCommand,由AMQCommand完成最終的消息發送。spa
public void transmit(AMQChannel channel) throws IOException { int channelNumber = channel.getChannelNumber(); AMQConnection connection = channel.getConnection(); synchronized (assembler) { Method m = this.assembler.getMethod(); connection.writeFrame(m.toFrame(channelNumber)); if (m.hasContent()) { byte[] body = this.assembler.getContentBody(); connection.writeFrame(this.assembler.getContentHeader() .toFrame(channelNumber, body.length)); int frameMax = connection.getFrameMax(); int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE; for (int offset = 0; offset < body.length; offset += bodyPayloadMax) { int remaining = body.length - offset; int fragmentLength = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax; Frame frame = Frame.fromBodyFragment(channelNumber, body, offset, fragmentLength); connection.writeFrame(frame); } } } connection.flush(); }
先看一個消費的事例:code
@Component public class TopicConsumer { @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1) @RabbitHandler public void process1(String message) { System.out.println("queue:topic.message1,message:" + message); } }
SpringBoot解析@RabbitListener
和@RabbitHandler
兩個註解的是RabbitListenerAnnotationBeanPostProcessor
中的buildMetadata方法:orm
private TypeMetadata buildMetadata(Class<?> targetClass) { Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List<ListenerMethod> methods = new ArrayList<>(); final List<Method> multiMethods = new ArrayList<>(); ReflectionUtils.doWithMethods(targetClass, method -> { Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method); if (listenerAnnotations.size() > 0) { methods.add(new ListenerMethod(method, listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()]))); } if (hasClassLevelListeners) { RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class); if (rabbitHandler != null) { multiMethods.add(method); } } }, ReflectionUtils.USER_DECLARED_METHODS); if (methods.isEmpty() && multiMethods.isEmpty()) { return TypeMetadata.EMPTY; } return new TypeMetadata( methods.toArray(new ListenerMethod[methods.size()]), multiMethods.toArray(new Method[multiMethods.size()]), classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()])); }
@RabbitListener能夠放在方法上,也能夠放在類上。對於Class級別的Listener,會配合RabbitHandle進行綁定。對於method級別的Listener則不會。 解析完成後,由processListener方法對處理Listener。對象
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(rabbitListener)); endpoint.setQueueNames(resolveQueues(rabbitListener)); endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency")); endpoint.setBeanFactory(this.beanFactory); endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions())); String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler"); if (StringUtils.hasText(errorHandlerBeanName)) { endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class)); } String group = rabbitListener.group(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } String autoStartup = rabbitListener.autoStartup(); if (StringUtils.hasText(autoStartup)) { endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup)); } endpoint.setExclusive(rabbitListener.exclusive()); String priority = resolve(rabbitListener.priority()); if (StringUtils.hasText(priority)) { try { endpoint.setPriority(Integer.valueOf(priority)); } catch (NumberFormatException ex) { throw new BeanInitializationException("Invalid priority value for " + rabbitListener + " (must be an integer)", ex); } } String rabbitAdmin = resolve(rabbitListener.admin()); if (StringUtils.hasText(rabbitAdmin)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name"); try { endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class)); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" + rabbitAdmin + "' was found in the application context", ex); } } RabbitListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(rabbitListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } this.registrar.registerEndpoint(endpoint, factory); }
先設置endpoint
的相關屬性,再獲取rabbitListenerContainerFactory
,根據endpoint和rabbitListenerContainerFactory建立一個messageListenerContainer
,並建立endpoint的id到messageListenerContainer的映射。 當收到消息時,SpringBoot會調用綁定好的方法。