springboot 集成rabbitmq 並採用ack模式 以及封裝隊列定義

  • rabbitmq簡介

rabbitmq 是spring所在公司Pivotal本身的產品  是基於AMQP高級隊列協議的消息中間件 採用erlang開發   所以安裝須要erlang環境  具體安裝根據本身的環境  由於跟spring有共同的血緣關係 因此spring 全家桶對其的支持應該是至關完善的java

  •  簡單概念

通常消息隊列 都是生產者將消息發送到隊列   消費者監聽隊列進行消費     rabbitmq 一個虛擬主機(默認 /)持有一個或者多個交換機(Exchange) 用戶只能在虛擬主機的粒度進行權限控制  交換機根據必定的策略(RoutingKey)綁定(Binding)到隊列(Queue)上 這樣生產者和隊列就沒有直接聯繫 而是將消息發送的交換機  交換機再把消息轉發到對應綁定的隊列上   此處須要詳細熟悉rabbitmq的工做流程  不清楚能夠找相關資料進行學習   git

上面說了 Exchange 做爲rabbitmq的一個獨特的重要的概念  這裏有必要着重強調一下  咱們從 spring對rabbitmq的封裝來解讀一下這個東西 web

package org.springframework.amqp.core;

/**
 * Constants for the standard Exchange type names.
 *
 * @author Mark Fisher
 * @author Gary Russell
 */
public abstract class ExchangeTypes {

	public static final String DIRECT = "direct";

	public static final String TOPIC = "topic";

	public static final String FANOUT = "fanout";

	public static final String HEADERS = "headers";

	public static final String SYSTEM = "system";

	/**
	 * The constant to represent {@code x-delayed-message} exchange mode.
	 * @deprecated since 1.6.4, it's not a user-available exchange type,
	 * the delayed {@code boolean} is used for that.
	 */
	@Deprecated
	public static final String DELAYED = "x-delayed-message";
}

  上面是 交換機類型的定義類    說明了6種交換機類型  最後一種由於即將棄用 因此是五種 咱們經常使用的有四種  下面這個建造類說明了一切spring

package org.springframework.amqp.core;

import java.util.Map;

/**
 * Builder providing a fluent API for building {@link Exchange}s.
 *
 * @author Gary Russell
 * @since 1.6
 *
 */
public final class ExchangeBuilder extends AbstractBuilder {

	private final String name;

	private final String type;

	private boolean durable;

	private boolean autoDelete;

	private boolean internal;

	private boolean delayed;

	/**
	 * Construct an instance of the appropriate type.
	 * @param name the exchange name
	 * @param type the type name
	 * @see ExchangeTypes
	 * @since 1.6.7
	 */
	public ExchangeBuilder(String name, String type) {
		this.name = name;
		this.type = type;
	}

	/**
	 * Return a {@link DirectExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder directExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.DIRECT);
	}

	/**
	 * Return a {@link TopicExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder topicExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.TOPIC);
	}

	/**
	 * Return a {@link FanoutExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder fanoutExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.FANOUT);
	}

	/**
	 * Return a {@link HeadersExchange} builder.
	 * @param name the name.
	 * @return the builder.
	 */
	public static ExchangeBuilder headersExchange(String name) {
		return new ExchangeBuilder(name, ExchangeTypes.HEADERS);
	}

	/**
	 * Set the auto delete flag.
	 * @return the builder.
	 */
	public ExchangeBuilder autoDelete() {
		this.autoDelete = true;
		return this;
	}

	/**
	 * Set the durable flag to true.
	 * @return the builder.
	 * @deprecated - in 2.0, durable will be true by default
	 * @see #durable(boolean)
	 */
	@Deprecated
	public ExchangeBuilder durable() {
		this.durable = true;
		return this;
	}

	/**
	 * Set the durable flag.
	 * @param durable the durable flag (default false).
	 * @return the builder.
	 * @see #durable
	 */
	public ExchangeBuilder durable(boolean durable) {
		this.durable = durable;
		return this;
	}

	/**
	 * Add an argument.
	 * @param key the argument key.
	 * @param value the argument value.
	 * @return the builder.
	 */
	public ExchangeBuilder withArgument(String key, Object value) {
		getOrCreateArguments().put(key, value);
		return this;
	}

	/**
	 * Add the arguments.
	 * @param arguments the arguments map.
	 * @return the builder.
	 */
	public ExchangeBuilder withArguments(Map<String, Object> arguments) {
		this.getOrCreateArguments().putAll(arguments);
		return this;
	}

	/**
	 * Set the internal flag.
	 * @return the builder.
	 */
	public ExchangeBuilder internal() {
		this.internal = true;
		return this;
	}

	/**
	 * Set the delayed flag.
	 * @return the builder.
	 */
	public ExchangeBuilder delayed() {
		this.delayed = true;
		return this;
	}

	public Exchange build() {
		AbstractExchange exchange;
		if (ExchangeTypes.DIRECT.equals(this.type)) {
			exchange = new DirectExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else if (ExchangeTypes.TOPIC.equals(this.type)) {
			exchange = new TopicExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else if (ExchangeTypes.FANOUT.equals(this.type)) {
			exchange = new FanoutExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else if (ExchangeTypes.HEADERS.equals(this.type)) {
			exchange = new HeadersExchange(this.name, this.durable, this.autoDelete, getArguments());
		}
		else {
			throw new IllegalStateException("Invalid type: " + this.type);
		}
		exchange.setInternal(this.internal);
		exchange.setDelayed(this.delayed);
		return exchange;
	}

}

 

這四種的說明springboot

  1. Direct: 先策略匹配到對應綁定的隊列後 纔會被投送到該隊列  交換機跟隊列必須是精確的對應關係 這種最爲簡單
  2. Topic: 轉發消息主要是根據通配符 在這種交換機下,隊列和交換機的綁定會定義一種路由模式,那麼,通配符就要在這種路由模式和路由鍵之間匹配後交換機才能轉發消息 這種能夠認爲是Direct 的靈活版  
  3. Headers:也是根據規則匹配, 相較於 direct 和 topic 固定地使用 routingkey , headers 則是一個自定義匹配規則的類型
    在隊列與交換器綁定時 會設定一組鍵值對規則 消息中也包括一組鍵值對( headers 屬性) 當這些鍵值對有一對 或所有匹配時 消息被投送到對應隊列
  4. Fanout : 消息廣播模式 無論路由鍵或者是路由模式 會把消息發給綁定給它的所有隊列  若是配置了routingkey會被忽略

 

 

 

  • springboot集成rabbitmq       

在熟悉了相關概念後咱們開始搞一搞這個東西   首先你要安裝好rabbitmq  相關方法資料不少 此處不表    在本機安裝好 並啓用了管理頁面後打開  localhost:15672 會顯示一個管理頁面   以下  能夠進行一些可視化操做app

新建springboot工程  springboot 版本 1.5.10  依賴以下dom

<dependencies>
		<!--amqp rabbitmq 依賴必須 必須-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<!--springboot單元測試 選-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<!--springboot健康監控 選-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<!--web支持  選-->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
	</dependencies>

application.yml 配置文件 rabbitmq 相關:spring-boot

spring:
  rabbitmq:
    username: rabbitAdmin
    password: 123456789
#    支持發佈確認
    publisher-confirms: true
#    支持發佈返回
    publisher-returns: true
    listener:
      simple:
#      採用手動應答
        acknowledge-mode: manual
#        當前監聽容器數
        concurrency: 1
#        最大數
        max-concurrency: 1
#        是否支持重試
        retry:
          enabled: true
#        日誌配置 
logging:
  config: classpath:logback.xml

 

定製模版類   聲明交換機  隊列     綁定交換機到隊列單元測試

這裏 聲明瞭Direct 交換機  並經過路由鍵綁定到一個隊列中        來測試Direct模式學習

        聲明瞭Fanout交換機  並綁定到2個隊列    來測試廣播模式

package cn.felord.message.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * 隊列配置.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/23 14:28
 */
@Configuration
public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 定製化amqp模版      可根據須要定製多個
     * 
     * 
     * 此處爲模版類定義 Jackson消息轉換器
     * ConfirmCallback接口用於實現消息發送到RabbitMQ交換器後接收ack回調   即消息發送到exchange  ack
     * ReturnCallback接口用於實現消息發送到RabbitMQ 交換器,但無相應隊列與交換器綁定時的回調  即消息發送不到任何一個隊列中  ack
     *
     * @return the amqp template
     */
//    @Primary
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
//          使用jackson 消息轉換器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
//        開啓returncallback     yml 須要 配置    publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 發送失敗, 應答碼:{} 緣由:{} 交換機: {}  路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        //        消息確認  yml 須要配置   publisher-returns: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息發送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息發送到exchange失敗,緣由: {}", cause);
            }
        });
        return rabbitTemplate;
    }

    /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */

    /**
     * 聲明Direct交換機 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 聲明一個隊列 支持持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }

    /**
     * 經過綁定鍵 將指定隊列綁定到一個指定的交換機 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }

    /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */

    /**
     * 聲明 fanout 交換機.
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }

    /**
     * Fanout queue A.
     *
     * @return the queue
     */
    @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }

    /**
     * Fanout queue B .
     *
     * @return the queue
     */
    @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }

    /**
     * 綁定隊列A 到Fanout 交換機.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    /**
     * 綁定隊列B 到Fanout 交換機.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

 

編寫監聽器 來監聽隊列消息 

package cn.felord.message.comsumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 監聽器.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/24 9:36
 */
@Component
public class Receiver {
    private static final Logger log= LoggerFactory.getLogger(Receiver.class);
    /**
     * FANOUT廣播隊列監聽一.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裏異常須要處理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_A "+new String(message.getBody()));
    }

    /**
     * FANOUT廣播隊列監聽二.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception   這裏異常須要處理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_B "+new String(message.getBody()));
    }

    /**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裏異常須要處理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("DIRECT "+new String (message.getBody()));
    }
}

 

編寫 發送消息接口 來進行測試

package cn.felord.message.controller;

import cn.felord.message.bean.ResponseEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 消息接口.
 *
 * @author dax.
 * @version v1.0
 * @since 2018 /2/23 17:27
 */
@RestController
@RequestMapping("/rabbit")
public class SendController {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     *  測試廣播模式.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/fanout")
    public ResponseEntity send(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
        return ResponseEntity.ok();
    }

    /**
     *  測試Direct模式.
     *
     * @param p the p
     * @return the response entity
     */
    @RequestMapping("/direct")
    public ResponseEntity direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
        return ResponseEntity.ok();
    }
}

測試廣播模式

控制檯輸出

一樣 本身能夠測試Direct模式     能夠打開rabbitmq控制檯進行追蹤 相關運行信息

 

配套源碼 :https://gitee.com/felord/springboot-message

下一篇會在 springboot中實現 rabbitmq死信隊列

 

            

相關文章
相關標籤/搜索