AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,爲面向消息的中間件設計。消息中間件主要用於組件之間的解耦。
web
後端(集羣)經過websocket往各自維持的websocket session推送消息,若是採用每一個實例監聽同一個queue,那麼生產者往該queue中推送一條消息,該消息只能被集羣中某個實例消費一次。json
想要實現後端每一個實例同時消費該消息,即可採用RabbitMQ中的topic模式,即每一個實例啓動時,新建一個topic類型的exchange,routing key爲"#.queue",而且每一個實例的queue與該exchange綁定。每一個實例中,根據hostname+".queue"(queue爲配置文件中指定的默認隊列名稱)來建立各自的queue,這樣每一個實例都各自監聽本身的queue。後端
生產者往該exchange中推送消息,routing key使用默認隊列(或者每臺實例本身的隊列均可以,只要能匹配到"#.queue"便可),這樣exchange接收到消息後,會根據routingkey來匹配與該exchange綁定的queues,並將消息發送到符合條件的queues中,這樣每臺實例都能收到該消息並消費。服務器
RabbitMQ是一個開源的AMQP實現,服務器端用Erlang語言編寫,用於在分佈式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。
websocket
 session
Exchange意思爲交換機,從圖中能夠看出,publisher發送消息,先進入Exchange,而後由Exchange分配到隊列Queue。
1. Direct-Exchange
Direct Exchange是RabbitMQ Broker的默認Exchange,在此類型下,沒必要指定routing key的名字,建立的Queue有一個默認的routing key,通常與建立的Queue同名。
 socket
2. Topic-Exchange分佈式
Topic Exchange是根據routing key和Exchange的類型將message發送到一個或者多個Queue中,能夠經過它來實現pub/sub模式,即發佈訂閱。
 ide
注:「#」表示0個或若干個關鍵字,「*」表示一個關鍵字
若是Exchange沒有發現可以與RouteKey匹配的Queue,則會拋棄此消息ui
3. Fanout-Exchange
此類型的Exchange比較特殊,會忽略routing key的存在,直接將message廣播到全部的Queue中。

4. Headers-Exchange
Headers Exchange不一樣於上面三種Exchange,它是根據Message的一些頭部信息來分發過濾Message,忽略routing key的屬性,若是Header信息和message消息的頭信息相匹配,那麼這條消息就匹配上了。
1. rabbitmq.properties(mq配置文件)
rabbitmq.host=127.0.0.1 rabbitmq.port=5672 rabbitmq.username=admin rabbitmq.password=admin rabbitmq.my.exchange=my-exchange rabbitmq.my.queue=my-queue
2. 自動注入配置類(注意:須要建立自定義rabbitAdmin,不然啓動時不會自動建立exchange、queue、bindings相關信息,具體能夠查看RabbitAdmin類中的initialize()方法)
/** * @Description: rabbit mq相關配置 * @author yehaixiao * @date 2018年5月30日 */ @Configuration public class RabbitMQConfig { private static final Logger LOGGER = Logger.getLogger(RabbitMQConfig.class); @Value("${rabbitmq.host}") String host; @Value("${rabbitmq.port}") int port; @Value("${rabbitmq.username}") String username; @Value("${rabbitmq.password}") String password; @Value("${rabbitmq.my.exchange}") String exchange; @Value("${rabbitmq.my.queue}") String queue; /** * 建立rabbit mq鏈接工廠 */ @Bean(name = "rabbitConnectionFactory") public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(host + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setPublisherConfirms(true); return connectionFactory; } /** * 自定義connectionFactory,須要聲明rabbitAdmin 內部initialize()方法會進行exchanges、queues、bindings聲明 */ @Bean public RabbitAdmin rabbitAdmin() { return new RabbitAdmin(connectionFactory()); } /** * 建立amqp消息模版,用於發送者發送消息 */ @Bean public AmqpTemplate amqpTemplate() { RabbitTemplate amqpTemplate = new RabbitTemplate(connectionFactory()); // json消息轉化 amqpTemplate.setMessageConverter(messageConverter()); return amqpTemplate; } /** * 建立topic類型交換器 */ @Bean public TopicExchange topicExchange() { return new TopicExchange(exchange, false, true); } /** * 根據hostname建立queue(目的:爲了實現多個集羣中多個實例共享一條消息) * durable:是否持久化 * exclusive:僅建立者可使用的私有隊列,斷開後自動刪除 * auto-delete:當全部消費端鏈接斷開後,是否自動刪除隊列 */ @Bean public Queue queue() { InetAddress netAddress = null; try { netAddress = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } String hostName = netAddress.getHostName(); String queueName = String.format("%s." + queue, hostName); LOGGER.info("dynamic create queue name:" + queueName); Queue queue = new Queue(queueName, false, false, true); return queue; } /** * 將隊列queue與exchange綁定,binding_key爲#.queue,模糊匹配 */ @Bean public Binding binding(Queue queue, TopicExchange topicExchange) { return BindingBuilder.bind(queue).to(topicExchange).with("#." + queue); } /** * 建立消息監聽器 */ @Bean public MessageListener customizeMessageListener() { return new CustomizeMessageListener(); } /** * 建立json消息轉化器 */ @Bean public MessageConverter messageConverter() { Jackson2JsonMessageConverter fastJsonMessageConverter = new Jackson2JsonMessageConverter(); return fastJsonMessageConverter; } /** * 綁定queue和listener關係 */ @Bean public SimpleMessageListenerContainer mqMessageContainer(Queue queue, MessageListener customizeMessageListener) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue); container.setExposeListenerChannel(true); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(customizeMessageListener); return container; } }
3. 消息消費者(經過實現MessageListener來重寫onMessage()方法)
/** * @Description: 消息監聽器 * @author yehaixiao * @date 2018年5月24日 */ @Service public class CustomizeMessageListener implements MessageListener { private static final Logger LOGGER = LoggerFactory.getLogger(CustomizeMessageListener.class); @Override public void onMessage(Message message) { try { String queue = message.getMessageProperties().getConsumerQueue(); String msg = new String(message.getBody()); LOGGER.info("consumer msg : {}, from queue : {}", msg, queue); } catch (Exception e) { LOGGER.error("consumer message error!", e); } } }
4. 消息生產者(經過AmqpTemplate發送消息)
/** * @Description: 消息生產者 * @author yehaixiao * @date 2018年5月28日 */ @Service public class MessageProducer { private static final Logger LOGGER = LoggerFactory.getLogger(MessageProducer.class); @Value("${rabbitmq.my.exchange}") private String exchange; @Value("${rabbitmq.my.queue}") private String queue; @Resource private AmqpTemplate amqpTemplate; public void sendMessage(Object message) { LOGGER.info("send message : {} , to queue : {}", JSON.toJSONString(message), queue); // message是須要傳遞的信息, 指定特定的queue amqpTemplate.convertAndSend(exchange, queue, message); } }