RabbitMQ踩坑記

 

以前咱們給咱們的系統加了一個使用SpringAOP+RabbitMQ+WebSocket進行實時消息通知功能(http://www.javashuo.com/article/p-tzzbgrle-be.html)。在測試環境下沒有問題,但上到生產環境後部分用戶反映出現了丟消息的狀況,針對這個問題咱們進行了排查,發現,本來咱們的系統是單機的,但用戶在以前作了調整,在內外網服務器分別部署了系統,兩個服務器都公用一個RabbitMQ。那麼問題就來了。html

以前生產者向Exchange生產消息,消費者從queue消費消息使用的是direct模式,經過routing-key保證生產的消息只有指定的消費者可消費。但當兩臺服務器共用一個MQ時即有了兩個消費者鏈接同一個queue,此時rabbitmq並非一個消息兩個消費者都能消費,而是採用默認的輪詢發送方式,A服務器收到消息一、三、5 。。。而B服務器收到二、四、6 。。。java

這就出現了用戶感受的丟消息現象。因此咱們考慮將通知改成廣播形式即fanout。web

       RabbitMQ將消息中間件的實現分紅了Exchange+Queue的形式,Exchange和Queue使用Binding;生產者向Exchange生產消息,消息根據指定的binding進入Queue,消費者從Queue取消息。Fanout模式如圖:服務器

一個消費者會對應一個queue,那麼多個消費者要有多個queue。websocket

修改後代碼以下:socket

RabbitMQConfig.javaide

//聲明exchange 
Connection connection = factory.createConnection();
Channel channel = connection.createChannel(false);
//生產環境中有多個server,每一個server都是一個消費者,對同一個消息都要進行處理。選用廣播模式
channel.exchangeDeclare("exchange.websocket.msg", BuiltinExchangeType.FANOUT);

RabbitMessageQueue.java測試

@Override
public void send(WebSocketMsgEntity entity) {
    logger.warn("::product msg to MQ-websocket_msg_queue!");
    //若沒有指定exchange,則使用默認名爲「」的exchange,binding名與queue名相同
    rabbitTemplate.convertAndSend("exchange.websocket.msg","", entity);
}

RabbitMQListener.javaui

@Component
public class RabbitMQListener {
    private static Logger logger = LoggerFactory.getLogger(RabbitMQListener.class);
    @Autowired
    private RabbitMQService mqService;
    /**
     * WebSocket推送監聽器
     * @param socketEntity
     * @param deliveryTag
     * @param channel
     */
    @RabbitListener(bindings ={@QueueBinding(value = @Queue(exclusive = "true"), exchange = @Exchange(value = "exchange.websocket.msg", type = ExchangeTypes.FANOUT))})
    public void webSocketMsgListener(@Payload WebSocketMsgEntity socketMsgEntity, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
        logger.warn("::consume msg from MQ-websocket_msg_queue!");
        mqService.handleWebSocketMsg(socketMsgEntity, deliveryTag, channel);
    }

}

本來的客戶端監聽是監聽Queue,而如今改爲監聽Binding。並不顯式的指定一個Queue,而是將Queue設置成exclusive = true,這樣每一個消費者在監聽Binding時都會默認建立一個Queue與指定的Exchange綁定,在消費者斷開鏈接後Queue自動刪除。如有兩個消費者,則建立兩個Queue,他們綁定的Exchange相同,當生產者有消息時會向兩個Queue各插入一條,那麼兩個系統的用戶就都能收到通知啦!spa

相關文章
相關標籤/搜索