以前咱們給咱們的系統加了一個使用SpringAOP+RabbitMQ+WebSocket進行實時消息通知功能(http://www.javashuo.com/article/p-tzzbgrle-be.html)。在測試環境下沒有問題,但上到生產環境後部分用戶反映出現了丟消息的狀況,針對這個問題咱們進行了排查,發現,本來咱們的系統是單機的,但用戶在以前作了調整,在內外網服務器分別部署了系統,兩個服務器都公用一個RabbitMQ。那麼問題就來了。html
以前生產者向Exchange生產消息,消費者從queue消費消息使用的是direct模式,經過routing-key保證生產的消息只有指定的消費者可消費。但當兩臺服務器共用一個MQ時即有了兩個消費者鏈接同一個queue,此時rabbitmq並非一個消息兩個消費者都能消費,而是採用默認的輪詢發送方式,A服務器收到消息一、三、5 。。。而B服務器收到二、四、6 。。。java
這就出現了用戶感受的丟消息現象。因此咱們考慮將通知改成廣播形式即fanout。web
RabbitMQ將消息中間件的實現分紅了Exchange+Queue的形式,Exchange和Queue使用Binding;生產者向Exchange生產消息,消息根據指定的binding進入Queue,消費者從Queue取消息。Fanout模式如圖:服務器
一個消費者會對應一個queue,那麼多個消費者要有多個queue。websocket
修改後代碼以下:socket
RabbitMQConfig.javaide
//聲明exchange Connection connection = factory.createConnection(); Channel channel = connection.createChannel(false); //生產環境中有多個server,每一個server都是一個消費者,對同一個消息都要進行處理。選用廣播模式 channel.exchangeDeclare("exchange.websocket.msg", BuiltinExchangeType.FANOUT);
RabbitMessageQueue.java測試
@Override public void send(WebSocketMsgEntity entity) { logger.warn("::product msg to MQ-websocket_msg_queue!"); //若沒有指定exchange,則使用默認名爲「」的exchange,binding名與queue名相同 rabbitTemplate.convertAndSend("exchange.websocket.msg","", entity); }
RabbitMQListener.javaui
@Component public class RabbitMQListener { private static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class); @Autowired private RabbitMQService mqService; /** * WebSocket推送監聽器 * @param socketEntity * @param deliveryTag * @param channel */ @RabbitListener(bindings ={@QueueBinding(value = @Queue(exclusive = "true"), exchange = @Exchange(value = "exchange.websocket.msg", type = ExchangeTypes.FANOUT))}) public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException { logger.warn("::consume msg from MQ-websocket_msg_queue!"); mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel); } }
本來的客戶端監聽是監聽Queue,而如今改爲監聽Binding。並不顯式的指定一個Queue,而是將Queue設置成exclusive = true,這樣每一個消費者在監聽Binding時都會默認建立一個Queue與指定的Exchange綁定,在消費者斷開鏈接後Queue自動刪除。如有兩個消費者,則建立兩個Queue,他們綁定的Exchange相同,當生產者有消息時會向兩個Queue各插入一條,那麼兩個系統的用戶就都能收到通知啦!spa