rabbitmq websocket fanout 沒有ack致使消息堆積問題

後臺服務往fanoutModelRetValReturnQueue ,fanoutModelRetValReturnQueue發送消息,前端websocket訂閱,實時拿到計算結果,中間遇到了一個大坑,配置的隊列沒有ack致使rabitmq隊列中消息ready狀態的數量一直累積最後消息沒法實時到前臺前端

解決辦法:添加listener同時設置acknowledge="auto",這樣ready狀態的消息自動被消費確認 以下:web

<rabbit:listener-container connection-factory="websocketRabbitConnectionFactory" message-converter="mqMessageConverter" acknowledge="auto"> <rabbit:listener ref="webSocketImgGenerateReturnListener" queues="${websocket.fanout.imgGenerateReturn.queue}"/> <rabbit:listener ref="webSocketModelValReturnListener" queues="${websocket.fanout.modelRetValReturn.queue}"/> </rabbit:listener-container> 服務端配置:websocket

<rabbit:fanout-exchange name="${websocket.fanout.modelRetValReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin"> rabbit:bindings <rabbit:binding queue="fanoutModelRetValReturnQueue"></rabbit:binding> </rabbit:bindings> </rabbit:fanout-exchange>socket

<rabbit:fanout-exchange name="${websocket.fanout.imgGenerateReturn.exchange}" durable="true" declared-by="websocketRabbitAdmin">
    <rabbit:bindings>
        <rabbit:binding queue="fanoutImgGenerateReturnQueue"></rabbit:binding>
    </rabbit:bindings>
</rabbit:fanout-exchange>

前端:this

export function socketClient(callback,debug = false) { var client = generatorWs(); client.debug = debug ? str => { console.log(str); } : () => {}; client.connect(window.export_minas.socketUser, window.export_minas.socketPwd, () => { return callback(client); }, function() { //error, 重連 reconnect(); }, window.export_minas.socketUser);url

//當連接斷開當即重連
client.ws.addEventListener('close', reconnect);
clientPool.push(client);
function reconnect() {
    sokcetPool.forEach(id => {
        client.unsubscribe(id);
    });
    sokcetPool = [];
    sokcetPool.length = 0;
    //重連
    socketClient(callback);
}

} Backspace 14:32:22 socketClient(client => { //訂閱模型計算通道spa

// src="https://test-img.3dker.cn/preview/dd080d075072494481512db9c47d2137/0_0@110w.jpg"
  //window.export_minas.FANOUT_MODEL
  const queue1 = client.subscribe('/topic/FANOUT_MODEL_RETVAL_RETURN', ({
    body
  }) => {
    console.log('data compu:', body);
    var {
      dfsId,
      box,
      volume,
      area,
      errorText,
      error
    } = this.parseModelData(body);

    //模型計算完畢
    var name = dfsId2Name(dfsId);
    //沒有對應關係,則不是本機的消息
    if (!(name && name.length > 0))
      return;

    //查看是否出錯
    if (error) {
      this.updateUFiles(name, {
        "error": true,
        "errorText": errorText,
        "name": name,
        "dfsId": dfsId,
        "loading": false,
        "crafts": null
      });
      return;
    }

    //更新uFiles
    this.updateUFiles(name, {
      "name": name,
      "dfsId": dfsId,
      "loading": false,
      "loaded": 100,
      "crafts": null,
      "box": box,
      "volume": volume,
      "area": area
    });
  });

  sokcetPool.push(queue1.id);

  //圖片生成隊列
  //window.export_minas.FANOUT_IMG
  var queue2 = client.subscribe('/topic/TOPIC_IMGGENERATERETURN', ({
    body
  }) => {
    console.log('img:', body);
    //用dfsid本身拼下url,而後塞進files
    var {
      dfsId,
      imgStatus,
      firstImg
    } = this.parseData(body);
    var name = dfsId2Name(dfsId);
    //沒有對應關係,則不是本機的消息
    if (!(name && name.length > 0))
      return;

    if (firstImg === 'y') {
      this.updateUFiles(name, {
        "image": true
      });
    }
    if (~~imgStatus === 1) {
      this.updateUFiles(name, {
        "preview": true
      });
    }
  });

  sokcetPool.push(queue2.id);
},this.$route.query.debug);

function generatorWs() { //${window.export_minas.socketHost} var ws = new window.WebSocket(wss://test.XXXXX.cn/ws/); return window.Stomp.over(ws); }debug

相關文章
相關標籤/搜索