5 RabbitMQ整合Spring

重點

  • RabbitAdmin
  • Spring AMQP 聲明
  • RabbitTemplate
  • SimpleMessageListenerContainer
  • MessageListenerAdaper
  • MessageConverter

使用RabbitMQ結合Spring AMQP

RabbitAdmin

  • 用於聲明RabbitMQ相關配置、操做RabbitMQ
  • autoStartup設爲true:表示Spring容器啓動時自動初始化RabbitAdmin
  • 底層實現:從Spring容器中獲取Exchange、Binding、RoutingKey以及Queue的@Bean聲明
  • rabbitTemplate的execute方法執行對應的聲明等操做

SpringAMQP聲明

  • exchange
    • TopicExchange
    • FanoutExchange
    • DirectExchange
  • queue
  • binding
    • BindingBuilder

spring使用@bean聲明exchange queue binding 例子以下:java

@Bean  
 public TopicExchange exchange001() {  
        return new TopicExchange("topic001", true, false);  
    }  

    @Bean  
    public Queue queue001() {  
        return new Queue("queue001", true); //隊列持久 
    }  
    
    @Bean  
    public Binding binding001() {  
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    }  
複製代碼

RabbitTemplate

  • 與SpringAMQP整合的時候進行發送消息的關鍵類
  • 提供豐富的發送消息方法,包括可靠性投遞消息方法、回調監聽消息接口
  • ConfirmCallback
  • ReturnCallback
  • 與Spring整合時須要實例化,可是與Springboot整合時不須要,在配置文件添加配置便可

如下是RabbitTemplate實例化的例子:spring

@Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(//);
        rabbitTemplate.setReturnCallback(//);
    	return rabbitTemplate;
    }
複製代碼

使用RabbitTemplate sendMessage

  • rabbitTemplate.convertAndSend方法是主要的發送消息的方法
  • MessageProperties 用於構造消息體
  • MessagePostProcessor:消息發送以後對消息進行的設置

例子:springboot

@Test
	public void testSendMessage() throws Exception {
		//1 建立消息
		MessageProperties messageProperties = new MessageProperties();
		messageProperties.getHeaders().put("desc", "信息描述..");
		messageProperties.getHeaders().put("type", "自定義消息類型..");
		Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
		
		rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				System.err.println("------添加額外的設置---------");
				message.getMessageProperties().getHeaders().put("desc", "額外修改的信息描述");
				message.getMessageProperties().getHeaders().put("attr", "額外新加的屬性");
				return message;
			}
		});
	}
複製代碼

SimpleMessageListenerContainer

簡單消息容器 功能:架構

  • 監聽多個隊列、自動啓動、自動聲明
  • 設置事務特性、事務管理器、事務屬性、事務容器、是否開啓事務、回滾消息
  • 設置消費者數量、最小最大數量、批量消費
  • 設置消息確認和自動確認模式、是否重回隊列、異常捕獲函數
  • 設置消費者標籤生成策略、是否獨佔模式、消費者屬性等
  • 設置具體的監聽器、消息轉換器 message convert
  • SimpleMessageListenerContainer能夠進行動態設置,好比在運行中的應用能夠動態的修改其消費者的大小、接收消息的模式等,能夠基於此開發rabbitmq自定義後臺管控平臺

屬性併發

  • queues
  • concurrentConsumers:當前消費者數
  • maxConcurrentConsumers:最大消費者併發
  • defaultRequeueRejected: 是否重回隊列 false
  • acknowledgeMode:消息確認模型 AUTO
  • exposeListenerChannel:
  • messageListener: 消息監聽
  • consumerTagStrategy:consumerTag生成策略
@Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
    	SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    	container.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
    	container.setConcurrentConsumers(1);
    	container.setMaxConcurrentConsumers(5);
    	container.setDefaultRequeueRejected(false);
    	container.setAcknowledgeMode(AcknowledgeMode.AUTO);
    	container.setExposeListenerChannel(true);
    	container.setConsumerTagStrategy(new ConsumerTagStrategy() {
			@Override
			public String createConsumerTag(String queue) {
				return queue + "_" + UUID.randomUUID().toString();
			}
		});
        }
複製代碼

MessageListenerApapter 適配器模式

  • extends AbstractAdaptableMessageListener 消息listener
  • queueOrTagToMethodName 隊列標識與方法名稱組成的集合
  • defaultListenerMethod 默認監聽方法名稱
  • Delegate 委託對象:實際真實的委託對象
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate);

//設定queue使用哪一個adapter方法處理
Map<String,String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001","method1");
queueOrTagToMethodName.put("queue002","method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);
//設置默認處理方法,默認處理方法是handleMessage
adapter.setDefaultListenerMethod("handleMessage");
//設置消息轉換方式
adapter.setMessageConverter(new TextMessageConvert());
//消息監聽
container.setMessageListener(adapter);
複製代碼

MessageConverter 消息轉換器

自定義經常使用轉換器 都須要實現 MessageConverter接口app

Json convert: Jackson2JsonMessageConverter 能夠進行java對象的轉換功能 DefaultJackson2JavaTypeMapper映射器: 能夠進行java對象的映射關係 二進制 convert: 如Image、PDF、PPT、stream 框架

Springboot整合RabbitMQ

基本概念

@EnableRabbit

@EnableRabbit和@Configuration一塊兒使用,能夠加在類或者方法上,這個註解開啓了容器對註冊的bean的@RabbitListener檢查。dom

@RabbitListener

@RabbitListener用於註冊Listener時使用的信息:如queue,exchange,key、ListenerContainerFactory和RabbitAdmin的bean name。ide

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue}", durable = "true"),
        exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
        key = "${mq.config.key}"), admin = "rabbitAdmin")
複製代碼

掃描到bean帶有該註解後,首先會將註解的內容封裝到Endpoint對象中並和ListenerContainerFactory的實例一塊兒添加到上面的RabbitListenerEndpointRegistry實例中。添加的時候會建立相應的ListenerContainer實例並添加Listener對象。函數

@RabbitHandler

@RabbitListener 和 @RabbitHandler結合使用,不一樣類型的消息使用不一樣的方法來處理。

  • 不一樣的消息類型走不一樣的handler
public class CommandListener{

    @RabbitHandler
    public void handler1(ApiMessage msg){
        System.out.println(msg);
    }

    @RabbitHandler
    public void handler2(Map msg){
        System.out.println(msg);
    }
}
複製代碼

生產端發送消息

  • ConfirmCallback
  • ReturnCallback
  • CorrelationData
  • convertAndSend
//回調函數: confirm確認 message ->send —> broker -> ack
	final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
		@Override
		public void confirm(CorrelationData correlationData, boolean ack, String cause) {
			System.err.println("correlationData: " + correlationData);
			System.err.println("ack: " + ack);
			if(!ack){
				System.err.println("異常處理....");
			}
		}
	};
	
	//回調函數: return返回 
	final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
		@Override
		public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) {
			System.err.println("return exchange: " + exchange + ", routingKey: " 
				+ routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
		}
	};

//發送消息方法調用: 構建Message消息
	public void send(Object message, Map<String, Object> properties) throws Exception {
		MessageHeaders mhs = new MessageHeaders(properties);
		Message msg = MessageBuilder.createMessage(message, mhs);
		rabbitTemplate.setConfirmCallback(confirmCallback);
		rabbitTemplate.setReturnCallback(returnCallback);
		//id + 時間戳 全局惟一 
		CorrelationData correlationData = new CorrelationData("1234567890");
		rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
	}
複製代碼

消費端監聽方式

  • @RabbitListener
    • bindings @QueueBindingvalue
      • @Queue
        • durable 隊列是否持久化
        • value 值
        • exchange @Exchange
          • type 交換機的類型
          • value 值
          • ignoreDeclarationExceptions 是否忽略聲明異常
        • key routing Key
  • @RabbitHandler
  • onMessage
    • @Payload: message body
      • deliveryTag
      • basicAck
    • Channel:
    • @Headers: message headers
/** * *spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* * @param order * @param channel * @param headers * @throws Exception */
	@RabbitListener(bindings = @QueueBinding(
			value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", 
			durable="${spring.rabbitmq.listener.order.queue.durable}"),
			exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", 
			durable="${spring.rabbitmq.listener.order.exchange.durable}", 
			type= "${spring.rabbitmq.listener.order.exchange.type}", 
			ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
			key = "${spring.rabbitmq.listener.order.key}"
			)
	)
	@RabbitHandler
	public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map<String, Object> headers) throws Exception {
		System.err.println("--------------------------------------");
		System.err.println("消費端order: " + order.getId());
		Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
		//手工ACK
		channel.basicAck(deliveryTag, false);
	}
複製代碼

Spring cloud stream with RabbitMQ

特色: Spring cloud stream 生產端和消費端可使用不一樣的中間件

總體架構核心概念圖:

基本概念

Barista

Barista接口: Barista接口是定義來做爲後面類的參數. 這一接口定義了通道類型和通道名稱,通道名稱是做爲配置用,通道類型則決定了app會使用這一通道進行發送消息仍是從中接收消息。

  • @Output 輸入註解
  • @Input 輸出註解
  • @StreamListener 監聽消息

使用Spring Cloud Stream很是簡單,使用好這3個註解便可,在實現高性能消息的生產和消費場景很是適合,可是使用SpringCloudStream框架有一個很是大的問題就是不能實現可靠性的投遞,也就是沒法保證消息的100%可靠性,會存在少許消息丟失的問題。緣由是爲了兼容Kafka。

使用流程圖

相關文章
相關標籤/搜索