RabbitMQ Exchange詳解以及Spring中Topic實戰

前言  

   AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦。web

業務需求

   後端(集羣)經過websocket往各自維持的websocket session推送消息,若是採用每一個實例監聽同一個queue,那麼生產者往該queue中推送一條消息,該消息只能被集羣中某個實例消費一次。json

   想要實現後端每一個實例同時消費該消息,即可採用RabbitMQ中的topic模式,即每一個實例啓動時,新建一個topic類型的exchange,routing key爲"#.queue",而且每一個實例的queue與該exchange綁定。每一個實例中,根據hostname+".queue"(queue爲配置文件中指定的默認隊列名稱)來建立各自的queue,這樣每一個實例都各自監聽本身的queue。後端

   生產者往該exchange中推送消息,routing key使用默認隊列(或者每臺實例本身的隊列均可以,只要能匹配到"#.queue"便可),這樣exchange接收到消息後,會根據routingkey來匹配與該exchange綁定的queues,並將消息發送到符合條件的queues中,這樣每臺實例都能收到該消息並消費。服務器

RabbitMQ介紹

   RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。 websocket

總體結構圖

    session

概念介紹
  • Broker:簡單來講就是消息隊列服務器實體
  • Virtual Host:虛擬主機,一個Broker裏能夠開設多個virtual host,用做不一樣用戶的權限分離
  • Exchange:消息交換機,它指定消息按什麼規則,路由到哪一個隊列
  • Queue:消息隊列載體,每一個消息都會被投入到一個或多個隊列
  • Binding:綁定,它的做用就是把exchange和queue按照路由規則綁定起來
  • Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞
  • Publisher:消息生產者
  • Consumer:消息消費者
  • Channel:消息通道,在客戶端的每一個鏈接裏,可創建多個channel,每一個channel表明一個會話任務
Exchange詳解
   Exchange意思爲交換機,從圖中能夠看出,publisher發送消息,先進入Exchange,而後由Exchange分配到隊列Queue
   1. Direct-Exchange

    Direct Exchange是RabbitMQ Broker的默認Exchange,在此類型下,沒必要指定routing key的名字,建立的Queue有一個默認的routing key,通常與建立的Queue同名。
    socket

   2. Topic-Exchange分佈式

    Topic Exchange是根據routing key和Exchange的類型將message發送到一個或者多個Queue中,能夠經過它來實現pub/sub模式,即發佈訂閱。
    ide

    注:「#」表示0個或若干個關鍵字,「*」表示一個關鍵字
    若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息ui

   3. Fanout-Exchange

    此類型的Exchange比較特殊,會忽略routing key的存在,直接將message廣播到全部的Queue中。
    

   4. Headers-Exchange

    Headers Exchange不一樣於上面三種Exchange,它是根據Message的一些頭部信息來分發過濾Message,忽略routing key的屬性,若是Header信息和message消息的頭信息相匹配,那麼這條消息就匹配上了。
    

Spring集成RabbitMQ實現Topic模式

   1. rabbitmq.properties(mq配置文件)

rabbitmq.host=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=admin
rabbitmq.password=admin
rabbitmq.my.exchange=my-exchange
rabbitmq.my.queue=my-queue

   2. 自動注入配置類(注意:須要建立自定義rabbitAdmin,不然啓動時不會自動建立exchange、queue、bindings相關信息,具體能夠查看RabbitAdmin類中的initialize()方法)    

/**
 * @Description: rabbit mq相關配置
 * @author yehaixiao
 * @date 2018年5月30日
 */
@Configuration
public class RabbitMQConfig {

    private static final Logger LOGGER = Logger.getLogger(RabbitMQConfig.class);

    @Value("${rabbitmq.host}")
    String host;
    @Value("${rabbitmq.port}")
    int port;
    @Value("${rabbitmq.username}")
    String username;
    @Value("${rabbitmq.password}")
    String password;
    @Value("${rabbitmq.my.exchange}")
    String exchange;
    @Value("${rabbitmq.my.queue}")
    String queue;

    /**
     * 建立rabbit mq鏈接工廠
     */
    @Bean(name = "rabbitConnectionFactory")
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(host + ":" + port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }

    /**
     * 自定義connectionFactory,須要聲明rabbitAdmin 內部initialize()方法會進行exchanges、queues、bindings聲明
     */
    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    /**
     * 建立amqp消息模版,用於發送者發送消息
     */
    @Bean
    public AmqpTemplate amqpTemplate() {
        RabbitTemplate amqpTemplate = new RabbitTemplate(connectionFactory());
        // json消息轉化
        amqpTemplate.setMessageConverter(messageConverter());
        return amqpTemplate;
    }

    /**
     * 建立topic類型交換器
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(exchange, false, true);
    }

    /**
     * 根據hostname建立queue(目的:爲了實現多個集羣中多個實例共享一條消息)
     * durable:是否持久化 
     * exclusive:僅建立者可使用的私有隊列,斷開後自動刪除 
     * auto-delete:當全部消費端鏈接斷開後,是否自動刪除隊列
     */
    @Bean
    public Queue queue() {
        InetAddress netAddress = null;
        try {
            netAddress = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        String hostName = netAddress.getHostName();
        String queueName = String.format("%s." + queue, hostName);
        LOGGER.info("dynamic create queue name:" + queueName);
        Queue queue = new Queue(queueName, false, false, true);
        return queue;
    }

    /**
     * 將隊列queue與exchange綁定,binding_key爲#.queue,模糊匹配
     */
    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("#." + queue);
    }

    /**
     * 建立消息監聽器
     */
    @Bean
    public MessageListener customizeMessageListener() {
        return new CustomizeMessageListener();
    }

    /**
     * 建立json消息轉化器
     */
    @Bean
    public MessageConverter messageConverter() {
        Jackson2JsonMessageConverter fastJsonMessageConverter = new Jackson2JsonMessageConverter();
        return fastJsonMessageConverter;
    }

    /**
     * 綁定queue和listener關係
     */
    @Bean
    public SimpleMessageListenerContainer mqMessageContainer(Queue queue, MessageListener customizeMessageListener) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
        container.setQueues(queue);
        container.setExposeListenerChannel(true);
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(customizeMessageListener);
        return container;
    }
}

   3. 消息消費者(經過實現MessageListener來重寫onMessage()方法)

/**
 * @Description: 消息監聽器
 * @author yehaixiao
 * @date 2018年5月24日
 */
@Service
public class CustomizeMessageListener implements MessageListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(CustomizeMessageListener.class);

    @Override
    public void onMessage(Message message) {
        try {
            String queue = message.getMessageProperties().getConsumerQueue();
            String msg = new String(message.getBody());
            LOGGER.info("consumer msg : {}, from queue : {}", msg, queue);
        } catch (Exception e) {
            LOGGER.error("consumer message error!", e);
        }
    }
}

   4. 消息生產者(經過AmqpTemplate發送消息) 

/**
 * @Description: 消息生產者
 * @author yehaixiao
 * @date 2018年5月28日
 */
@Service
public class MessageProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageProducer.class);

    @Value("${rabbitmq.my.exchange}")
    private String exchange;

    @Value("${rabbitmq.my.queue}")
    private String queue;

    @Resource
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Object message) {
        LOGGER.info("send message : {} , to queue : {}", JSON.toJSONString(message), queue);
        // message是須要傳遞的信息, 指定特定的queue
        amqpTemplate.convertAndSend(exchange, queue, message);
    }
}
相關文章
相關標籤/搜索