websocket+rabbitmq實戰

1. websocket+rabbitmq實戰

1.1. 前言

  接到的需求是後臺定向給指定web登陸用戶推送消息,且可能同一帳號會登陸多個客戶端都要接收到消息前端

1.2. 遇坑

  1. 基於springboot環境搭建的websocket+rabbitmq,搭建完成後發現websocket每隔一段時間會斷開,看網上有人由於nginx的鏈接超時機制斷開,而我這彷佛是由於長鏈接空閒時間太長而斷開
  2. 通過測試,若是一直保持每隔段時間發送消息,那麼鏈接不會斷開,因此我採用了斷開重連機制,分三種狀況
    1. 服務器正常,客戶端正常且空閒時間不超過1分鐘,則狀況正常,超過一分鐘會斷線,前端發起請求重連
    2. 服務器正常,客戶端關閉或註銷,服務器正常收到通知,去除對應客戶端session
    3. 服務器異常,客戶端正常,客戶端發現連不上服務器會嘗試重連3次,3次都連不上放棄重連
  3. rabbitmq定向推送,按需求須要一臺機器對應一批用戶,因此定製化須要服務啓動的時候定向訂閱該ip對應的隊列名,簡單說就是動態隊列名的設定,因此又複雜了點,不能直接在註解寫死。同時由於使用的apollo配置中心,同一集羣應該相同的配置,因此也不能經過提取配置的方式設定值,爲了這個點設置apollo的集羣方式有點小題大作,因此採用動態讀取數據庫對應的ip取出對應的隊列名。
  4. 部署線上tomcat的話,不須要加上一塊代碼
/**
 * 使用tomcat啓動無需配置
 */
//@Configuration
//@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

1.3. 正式代碼

1.3.1. rabbimq部分

  1. application.properties配置
spring.rabbitmq.addresses = i.tzxylao.com:5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = 123456
spring.rabbitmq.virtual-host = /
spring.rabbitmq.connection-timeout = 15000
  1. 交換機和隊列配置
/**
 * @author laoliangliang
 * @date 2019/3/29 11:41
 */
@Configuration
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class RabbitmqConfig {

    final public static String EXCHANGENAME = "websocketExchange";

    /**
     * 建立交換器
     */
    @Bean
    FanoutExchange exchange() {
        return new FanoutExchange(EXCHANGENAME);
    }

    @Bean
    public Queue queue(){
        return new Queue(orderQueueName());
    }

    @Bean
    Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
        // 監聽隊列的名稱
        container.setQueueNames(orderQueueName());
        container.setExposeListenerChannel(true);
        // 設置每一個消費者獲取的最大消息數量
        container.setPrefetchCount(100);
        // 消費者的個數
        container.setConcurrentConsumers(1);
        // 設置確認模式爲自動確認
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(orderReceiver);
        return container;
    }


    /**
     * 在這裏寫獲取訂單隊列名的具體過程
     * @return
     */
    public String orderQueueName(){
        return "orderChannel";
    }
}
  1. 消息監聽類
/**
 * @author laoliangliang
 * @date 2019/3/29 11:38
 */
@Component
@Slf4j
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class OrderReceiver implements ChannelAwareMessageListener {

    @Autowired
    private MyWebSocket myWebSocket;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        byte[] body = message.getBody();
        log.info("接收到消息:" + new String(body));
        try {
            myWebSocket.sendMessage(new String(body));
        } catch (IOException e) {
            log.error("send rabbitmq message error", e);
        }
    }
}

1.3.2. websocket部分

  1. 配置服務端點
@Configuration
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
  1. 核心代碼
/**
 * @author laoliangliang
 * @date 2019/3/28 14:40
 */
public abstract class AbstractWebSocket {

    protected static Map<String, CopyOnWriteArraySet<Session>> sessionStore = new HashMap<>();

    public void sendMessage(String message) throws IOException {
        List<String> userCodes = beforeSendMessage();
        for (String userCode : userCodes) {
            CopyOnWriteArraySet<Session> sessions = sessionStore.get(userCode);
            //阻塞式的(同步的)
            if (sessions !=null && sessions.size() != 0) {
                for (Session s : sessions) {
                    if (s != null) {
                        s.getBasicRemote().sendText(message);
                    }
                }
            }
        }
    }

    /**
     * 刪選給誰發消息
     * @return
     */
    protected abstract List<String> beforeSendMessage();

    protected void clearSession(Session session) {
        Collection<CopyOnWriteArraySet<Session>> values = sessionStore.values();
        for (CopyOnWriteArraySet<Session> sessions : values) {
            for (Session session1 : sessions) {
                if (session.equals(session1)) {
                    sessions.remove(session);
                }
            }
        }
    }
}
@ServerEndpoint(value = "/websocket")
@Component
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class MyWebSocket extends AbstractWebSocket {


    private static Logger log = LogManager.getLogger(MyWebSocket.class);

    @Autowired
    private AmqpTemplate amqpTemplate;

    @PostConstruct
    public void init() {
        /*ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(new Runnable() {

            int i = 0;

            @Override
            public void run() {
                amqpTemplate.convertAndSend(RabbitFanout.EXCHANGENAME, "",("msg num : " + i).getBytes());
                i++;
            }
        }, 50, 1, TimeUnit.SECONDS);*/
    }

    /**
     * 鏈接創建成功調用的方法
     *
     * @param session 可選的參數。session爲與某個客戶端的鏈接會話,須要經過它來給客戶端發送數據
     */
    @OnOpen
    public void onOpen(Session session) throws TimeoutException {
        log.info("websocket connect");
        //10M
        session.setMaxTextMessageBufferSize(10485760);
    }

    /**
     * 鏈接關閉調用的方法
     */
    @OnClose
    public void onClose(Session session) {
        clearSession(session);
    }

    /**
     * 收到客戶端消息後調用的方法
     *
     * @param message 客戶端發送過來的消息
     * @param session 可選的參數
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("from client request:" + message);
        CopyOnWriteArraySet<Session> sessions = sessionStore.get(message);
        if (sessions == null) {
            sessions = new CopyOnWriteArraySet<>();
        }
        sessions.add(session);
        sessionStore.put(message, sessions);
    }

    /**
     * 發生錯誤時調用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        clearSession(session);
    }

    /**
     * 這裏返回須要給哪些用戶發送消息
     * @return
     */
    @Override
    protected List<String> beforeSendMessage() {
        //TODO 給哪些用戶發送消息
        return Lists.newArrayList("6");
    }
}

1.3.3. 前端代碼

var websocket = null;
var reconnectCount = 0;
function connectSocket(){
    var data = basicConfig();
    if(data.websocketEnable !== "true"){
        return;
    }
    //判斷當前瀏覽器是否支持WebSocket
    if ('WebSocket' in window) {
        if(data.localIp && data.localIp !== "" && data.serverPort && data.serverPort !== ""){
            websocket = new WebSocket("ws://"+data.localIp+":"+data.serverPort+data.serverContextPath+"/websocket");
        }else{
            return;
        }
    }else {
        alert('當前瀏覽器 不支持WebSocket')
    }

    //鏈接發生錯誤的回調方法
    websocket.onerror = function () {
        console.log("鏈接發生錯誤");
    };

    //鏈接成功創建的回調方法
    websocket.onopen = function () {
        reconnectCount = 0;
        console.log("鏈接成功");
    };

    //接收到消息的回調方法,此處添加處理接收消息方法,當前是將接收到的信息顯示在網頁上
    websocket.onmessage = function (event) {
        console.log("receive message:" + event.data);
    };

    //鏈接關閉的回調方法
    websocket.onclose = function () {
        console.log("鏈接關閉,如需登陸請刷新頁面。");
        if(reconnectCount === 3) {
            reconnectCount = 0;
            return;
        }
        connectSocket();
        basicConfig();
        reconnectCount++;
    };

    //添加事件監聽
    websocket.addEventListener('open', function () {
        websocket.send(data.userCode);
    });


    //監聽窗口關閉事件,當窗口關閉時,主動去關閉websocket鏈接,防止鏈接還沒斷開就關閉窗口,server端會拋異常。
    window.onbeforeunload = function () {
        console.log("closeWebSocket");
    };


}

connectSocket();

function basicConfig(){
        var result = {};
        $.ajax({
            type: "post",
            async: false,
            url: "${request.contextPath}/basicConfig",
            data: {},
            success: function (data) {
                result = data;
            }
        });
        return result;
    }

1.3.4. 後端提供接口

@ApolloConfig
    private Config config;

    @RequestMapping(value = {"/basicConfig"})
    @ResponseBody
    public Map<String, Object> getUserCode(HttpSession session) {
        Map<String, Object> map = new HashMap<>(2);
        map.put("userCode",String.valueOf(session.getAttribute("userCode")));
        String websocketEnable = config.getProperty("websocket.enabled", "false");
        String serverContextPath  = config.getProperty("server.context-path", "");
        map.put("websocketEnable", websocketEnable);
        map.put("serverContextPath", serverContextPath);

        String localIp = config.getProperty("local.ip", "");
        String serverPort = config.getProperty("server.port", "80");

        map.put("localIp", localIp);
        map.put("serverPort", serverPort);
        return map;
    }
相關文章
相關標籤/搜索