java 實現websocket(轉)

Java web項目使用webSocket

 

前端:css

複製代碼
<%@ page language="java" import="java.util.*" pageEncoding="UTF-8" %>
<%
    String path = request.getContextPath();
    String basePath = request.getScheme() + "://"
            + request.getServerName() + ":" + request.getServerPort()
            + path + "/";
    String ppp = request.getServerName() + ":" + request.getServerPort()
            + path + "/";


%>

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
    <base href="<%=basePath%>">

    <title>My JSP 'MyJsp.jsp' starting page</title>

    <meta http-equiv="pragma" content="no-cache">
    <meta http-equiv="cache-control" content="no-cache">
    <meta http-equiv="expires" content="0">
    <meta http-equiv="keywords" content="keyword1,keyword2,keyword3">
    <meta http-equiv="description" content="This is my page">
    <!--
        <link rel="stylesheet" type="text/css" href="styles.css">
        -->

    <script src="baseui/js/plugins/jquery.js"></script>
</head>
<script>

    function keepalive(ws) {
        var device = new Object();
//        device.deviceIds = $('#msgText').val();
        device.optType = 'hb';
        var t = JSON.stringify(device);
        ws.send(t);
    }

    var websocket;
    if ('WebSocket' in window) {
        websocket = new WebSocket("ws://<%=ppp%>/app/indexConfig/indexConfigWebSocket.do");
    } else if ('MozWebSocket' in window) {
        websocket = new MozWebSocket("ws://<%=ppp%>/IBMS/point/webSocketServer.do");
    } else {
        websocket = new SockJS("http://<%=ppp%>/IBMS/sockjs/webSocketServer.do");
    }
    websocket.onopen = function (evnt) {
        var old = $("#msgcount").html();
        $("#msgcount").html(old + "</br><font color='green'>" + "onopen 方法" + "</font>");

        var old = $("#msgcount").html();
        $("#msgcount").html(old + "</br>ws URL:<font color='green'>" + "ws://<%=ppp%>/app/indexConfig/indexConfigWebSocket.do" + "</font>");
        setInterval(function () {
            keepalive(websocket)
        }, 3000);
    };
    websocket.onmessage = function (evnt) {

        var old = $("#msgcount").html();
        $("#msgcount").html(old + "</br>(<font color='red'>" + evnt.data + "</font>)");
        $("#msgcount").scrollTop($("#msgcount")[0].offsetHeight);
    };
    websocket.onerror = function (e) {
        for (var p in e) {
            var old = $("#msgcount").html();
            $("#msgcount").html(old + "</br>onerror 方法:<font color='green'>" + p + "=" + e[p] + "</font>");
        }
    };
    websocket.onclose = function (evnt) {
        var old = $("#msgcount").html();
        $("#msgcount").html(old + "</br>onclose 方法:<font color='green'>" + "onclose" + "</font>");
    }

    function send() {

        var device = new Object();
        //device = {"data":[{"statisticsId":"設備1","statisticsTypeId":"1","statisticsData":"dt1"}, {"statisticsId":"報警1","statisticsTypeId":"2","statisticsData":"dt1"}, {"statisticsId":"點位1","statisticsTypeId":"3","statisticsData":"po9884"}, {"statisticsId":"屬性1","statisticsTypeId":"4","statisticsData":"st32,sv91"}], "optType":""};
        var t = $('#msgText').val();
        //var t = JSON.stringify(device);
        console.log(t)
        var old = $("#msgcount").html();
        $("#msgcount").html(old + "</br>請求報文:<font color='blue'>" + t + "</font>")
        websocket.send(t);
        if (true)return;
        var param = new Array();
        var point = new Object();
        point.pointId = '1';
        var point2 = new Object();
        point2.pointId = '2';
        point2.newValue = '789';
        var json = JSON.stringify(point);
        var json2 = JSON.stringify(point2);
        param[0] = point;
        param[1] = point2;
        var t = JSON.stringify(param);
        t = eval(t);
        var arrParam = JSON.stringify(t);
        websocket.send(arrParam);
    }
    function pause() {

        var device = new Object();
        device.deviceIds = $('#msgText').val();
        device.optType = 'pausePush';
        var t = JSON.stringify(device);

        var old = $("#msgcount").html();
        $("#msgcount").html(old + "</br>請求報文:<font color='blue'>" + t + "</font>")

        websocket.send(t);
    }
</script>


<body>
<input type="text" id="msgText">
<input type="button" onclick="send()" value="發送">
<input type="button" onclick="pause()" value="暫停">

<br>
<div id="msgcount"></div>
</body>
</html>
複製代碼

後端須要三個類:註冊類、握手類、處理類(終端類)html

握手類:前端

複製代碼
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;

import javax.servlet.http.HttpSession;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;


public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {

    private static Logger logger = LoggerFactory
            .getLogger(HandshakeInterceptor.class);

    @Override
    public boolean beforeHandshake(ServerHttpRequest request,
            ServerHttpResponse response, WebSocketHandler wsHandler,
            Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
//            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;
//            HttpSession session = servletRequest.getServletRequest()
//                    .getSession(true);
            // 保存session中已登陸的用戶到websocket的上下文環境中。在推送消息的時候,須要根據當前登陸用戶獲取點位權限
            final IbmsUser user = IbmsUserHolder.getUser();
            attributes.put(IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER, user);
            // if (session != null) {
            // // 使用userName區分WebSocketHandler,以便定向發送消息
            // String userName = (String) session
            // .getAttribute(Constants.SESSION_USERNAME);
            // if(userName==null){
            // userName = "qianshihua";
            // }
            // attributes.put(Constants.WEBSOCKET_USERNAME, userName);
            // }
        }
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request,
            ServerHttpResponse response, WebSocketHandler wsHandler,
            Exception exception) {
        URI uri = request.getURI();
        InetSocketAddress remoteAddress = request.getRemoteAddress();
        String msg = "afterHandshake*******************\r\nuri:" + uri + ";\r\nremoteAddress;" + remoteAddress;
        System.err.println(msg);
        logger.debug(msg);

    }
}
複製代碼

 

websocket註冊類,註冊類依賴握手類,能夠編碼實現,也能夠直接經過spring配置實現:java

複製代碼
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

@Configuration
@EnableWebMvc
@EnableWebSocket
public class WebSocketConfig  implements WebSocketConfigurer {
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(systemWebSocketHandler(),"/point/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())
//        .setAllowedOrigins("http://localhost:8087","http://10.16.38.21:8087","http://localhost:63342")
        ;

        registry.addHandler(systemWebSocketHandler(), "/point/sockjs/webSocketServer.do").addInterceptors(new WebSocketHandshakeInterceptor())
                .withSockJS();

        registry.addHandler(indexConfigWebSocketHandler(),"/app/indexConfig/indexConfigWebSocket.do").addInterceptors(new WebSocketHandshakeInterceptor());
    }

    @Bean
    public WebSocketHandler systemWebSocketHandler(){
        return new IbmsWebSocketHandler();
    }

    @Bean
    public WebSocketHandler indexConfigWebSocketHandler(){
        return new IndexConfigWebSocketHandler();
    }

}
複製代碼

後端能夠註冊多個handler,如上圖配置。jquery

handler:web

複製代碼
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;

import java.util.List;

/**
 * 數據訂閱處理類
 */
public class IndexConfigWebSocketHandler implements WebSocketHandler {

    private static final Logger logger = LoggerFactory.getLogger(IndexConfigWebSocketHandler.class);

    @Override
    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws Exception {
        logger.debug("indexconfig connect to the websocket success......");
    }

    /**
     * 處理前端發起的訂閱信息
     * 訂閱列表中的id包含fmt前綴
     *
     * @param session
     * @param message
     * @throws Exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        String jsontext = (String) message.getPayload();
        logger.info("收到統計項訂閱:::" + jsontext);
        Object objUser = session.getAttributes().get(
                IBMSConstant.SESSION_WEBSOCKET_LOGINED_USER);
        if (objUser == null) {
            // 取不到session中的用戶信息
            throw new RuntimeException("會話中無用戶信息");
        }
        JSONObject jsonObject = JSONObject.parseObject(jsontext);
        Object optType = jsonObject.get("optType");//狀態字段
        String data = jsonObject.getString("data");//數據字段
        //將數據字段解析成SubscribeBO列表
        List<SubscribeBO> subscribeBOs = JSON.parseArray(data, SubscribeBO.class);
        boolean ignoreSession = false;
        if (subscribeBOs == null || subscribeBOs.size() == 0) {
            if ("pausePush".equals(optType)) {
                //若是data爲空,而且optType==pausePush,關閉該session的全部推送
                this.removeReader(session);
            }
            return;
        }
        if (optType != null && "hb".equals(optType)) {
            //心跳
            return;
        }
        if (optType != null && "pausePush".equals(optType)) {
            //暫時關閉推送
            ignoreSession = true;
        }


        for (int i = 0; i < subscribeBOs.size(); i++) {
            SubscribeBO item = subscribeBOs.get(i);
            String id = item.getSubscribeId();
            String type = item.getSubscribeTypeId();
            if (StringUtils.isBlank(id) || StringUtils.isBlank(type)) {
                continue;
            }
            /*if(SubscribeType.WEATHER.getCode().equals(type)){
                //若是是天氣預報,構造惟一的天氣訂閱
                item.setSubscribeData(JOBDATA_KEY_WEATHER);
                item.setSubscribeId(JOBDATA_KEY_WEATHER);
            }*/
            //根據類型不一樣,選擇不一樣的存儲空間
            BaseWSSHolder holder = this.getHolderByType(type);
            //根據SubscribeBO獲取已訂閱的session列表
            List<WebSocketSession> sessions = holder.getSessionBySubscribe(item);
            boolean exists = false;
            for (WebSocketSession wss : sessions) {
                //將本次session與session列表進行比對,已存在則 exists = true;
                if (wss.equals(session)) {
                    exists = true;
                }
            }
            String msg = "關注";
            if (ignoreSession == true) {
                msg = "取消關注";
            }
            logger.info("websocket會話:" + session + msg + "了:"
                    + SubscribeType.getDes(item.getSubscribeTypeId()) + "  " + item.getSubscribeData());
            //若是session列表中不存在本次session,則加入
            if (exists == false && ignoreSession == false) {
                holder.putSession(session, item);
            }
            if (exists == true && ignoreSession == true) {
                //暫時關閉推送
                sessions.remove(session);
            }
        }
    }


    @Override
    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) throws Exception {
        if (webSocketSession.isOpen()) {
            webSocketSession.close();
        }
        logger.debug("indexconfig websocket connection closed......");
    }

    @Override
    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) throws Exception {
        logger.debug("indexconfig websocket connection closed......");
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 根據類型獲取session holder
     *
     * @param type
     * @return
     * @throws Exception
     */
    private BaseWSSHolder getHolderByType(String type) throws Exception {
        SubscribeType subscribeType = SubscribeType.getByCode(type);
        BaseWSSHolder holder = null;
        if (subscribeType == null) {
            throw new Exception("數據傳入錯誤");
        }
        switch (subscribeType) {
            case DEVICE_COUNT:
                //設備數量
                holder = DeviceWSSHolder.getInstance();
                break;
            case ALARM_DEVICE_COUNT:
                //報警數量
                holder = AlarmDeviceWSSHolder.getInstance();
                break;
            case STATE_DEVICE_COUNT:
                //某狀態設備數量
                holder = StateDeviceWSSHolder.getInstance();
                break;
            case POINT_COUNT:
                //點位值
                holder = PointWSSHolder.getInstance();
                break;
            case WEATHER:
                //點位值
                holder = WeatherWSSHolder.getInstance();
                break;
        }
        if (holder == null) {
            logger.error("不存在對應的存儲:" + type);
            throw new Exception("不存在對應的存儲:" + type);
        }
        return holder;
    }

    private void removeReader(WebSocketSession session) {
        AlarmDeviceWSSHolder.getInstance().removeReader(session, null);
        DeviceWSSHolder.getInstance().removeReader(session, null);
        PointWSSHolder.getInstance().removeReader(session, null);
        StateDeviceWSSHolder.getInstance().removeReader(session, null);
        WeatherWSSHolder.getInstance().removeReader(session, null);
    }
}
複製代碼

訂閱的信息存儲在內存中,形式爲<session,List<data>>的鍵值對spring

存儲工具類:apache

複製代碼
import com.google.common.collect.Lists;
import org.springframework.web.socket.WebSocketSession;

import java.util.*;

/**
 * 保存設備數量的訂閱(哪些session訂閱了設備數量)
 * 不存儲統計數據值
 */
public class DeviceWSSHolder implements BaseWSSHolder {
    /**
     * key值爲統計,value值回哪些session關心這個點位
     */
    private Map<SubscribeBO, List<WebSocketSession>> sessions;

    private DeviceWSSHolder() {
    }

    private static class SingletonHolder {
        public final static DeviceWSSHolder holder = new DeviceWSSHolder();
    }

    public static DeviceWSSHolder getInstance() {
        return SingletonHolder.holder;
    }

    /**
     * 保存統計ID和websocket會話的關係
     *
     * @param s
     * @param subscribeBO
     */
    @Override
    public void putSession(WebSocketSession s, SubscribeBO subscribeBO) {
        if (getInstance().sessions == null) {
            getInstance().sessions = new HashMap<SubscribeBO, List<WebSocketSession>>();
        }
        if (getInstance().sessions.get(subscribeBO) == null) {
            getInstance().sessions.put(subscribeBO,
                    new ArrayList<WebSocketSession>());
        }
        final List<WebSocketSession> list = getInstance().sessions.get(subscribeBO);
        list.add(s);
    }

    @Override
    public void removeReader(WebSocketSession reader, SubscribeBO subscribeBO) {
        if (getInstance().sessions != null && reader != null) {
            if (subscribeBO != null) {
                //移除該session的某個具體訂閱
                List<WebSocketSession> readers = this.getSessionBySubscribe(subscribeBO);
                if (readers.size() > 0 && readers.contains(reader)) {
                    readers.remove(reader);
                }
            } else {
                //移除該session的全部訂閱
                for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                        getInstance().sessions.entrySet()) {
                    List<WebSocketSession> readers = entry.getValue();
                    //肯定有session訂閱
                    if (readers.size() > 0 && readers.contains(reader)) {
                        readers.remove(reader);
                        break;
                    }
                }
            }
        }
    }

    /**
     * 根據統計ID獲取websocket的會話信息
     *
     * @param subscribeBO
     * @return
     */
    @Override
    public List<WebSocketSession> getSessionBySubscribe(SubscribeBO subscribeBO) {
        if (getInstance().sessions == null) {
            getInstance().sessions = new HashMap<SubscribeBO, List<WebSocketSession>>();
        }
        if (getInstance().sessions.get(subscribeBO) == null) {
            getInstance().sessions.put(subscribeBO,
                    new ArrayList<WebSocketSession>());
        }
        return getInstance().sessions.get(subscribeBO);
    }

    /**
     * 獲取全部有session訂閱的業務ID
     * 業務ID帶de前綴
     * @return
     */
    public List<String> getEffectDataIds() {
        List<String> ids = Lists.newArrayList();
        if (getInstance().sessions != null) {
            for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                    getInstance().sessions.entrySet()) {
                List<WebSocketSession> readers = entry.getValue();
                //肯定有session訂閱
                if (readers != null && readers.size() > 0) {
                    SubscribeBO bo = entry.getKey();
                    ids.add(bo.getSubscribeData());//真正的業務id
                }
            }
        }
        //String idsStr = Joiner.on(",").join(ids);
        return ids;
    }

    /**
     * 根據SubscribeBO獲取一條訂閱信息
     * @param condition
     * @return
     */
    public Map.Entry<SubscribeBO, List<WebSocketSession>> getItemBySubscribeBO(SubscribeBO condition) {
        if (getInstance().sessions != null && condition != null) {
            for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                    getInstance().sessions.entrySet()) {
                if (entry.getKey().equals(condition)) {
                    return entry;
                }
            }
        }
        return null;
    }

    /*public SubscribeBO getSubscribeByData(Long data) {
        Set<SubscribeBO> boSet = getInstance().sessions.keySet();
        for (SubscribeBO bo : boSet) {

            System.out.println(str);
        }

        List<Long> ids = Lists.newArrayList();
        if (getInstance().sessions != null) {
            for (Map.Entry<SubscribeBO, List<WebSocketSession>> entry :
                    getInstance().sessions.entrySet()) {
                List<WebSocketSession> readers = entry.getValue();
                //肯定有session訂閱
                if (readers != null && readers.size() > 0) {
                    SubscribeBO bo = entry.getKey();
                    ids.add(Long.parseLong(bo.getData()));//真正的業務id
                }
            }
        }
        //String idsStr = Joiner.on(",").join(ids);
        return ids;
    }*/
}
複製代碼

訂閱工具類(subscribeBO):主要就是將接收到的websocket信息轉成java對象json

複製代碼
import com.alibaba.fastjson.annotation.JSONField;

/**
 * 訂閱的對象
 */
public class SubscribeBO {
    @JSONField(name="statisticsId")
    private String subscribeId;
    @JSONField(name="statisticsTypeId")
    private String subscribeTypeId;
    @JSONField(name="statisticsData")
    private String subscribeData;
    @JSONField(name="statisticsValue")
    private String subscribeValue;


    public SubscribeBO() {
    }

    public SubscribeBO(String subscribeTypeId, String subscribeData) {
        this.subscribeTypeId = subscribeTypeId;
        this.subscribeData = subscribeData;
    }

    public SubscribeBO(String subscribeId, String subscribeTypeId, String subscribeData) {
        this.subscribeId = subscribeId;
        this.subscribeTypeId = subscribeTypeId;
        this.subscribeData = subscribeData;
    }

    public String getSubscribeId() {
        return subscribeId;
    }

    public void setSubscribeId(String subscribeId) {
        this.subscribeId = subscribeId;
    }

    public String getSubscribeTypeId() {
        return subscribeTypeId;
    }

    public void setSubscribeTypeId(String subscribeTypeId) {
        this.subscribeTypeId = subscribeTypeId;
    }

    public String getSubscribeData() {
        return subscribeData;
    }

    public void setSubscribeData(String subscribeData) {
        this.subscribeData = subscribeData;
    }

    public String getSubscribeValue() {
        return subscribeValue;
    }

    public void setSubscribeValue(String subscribeValue) {
        this.subscribeValue = subscribeValue;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        SubscribeBO bo = (SubscribeBO) o;

        if (!subscribeTypeId.equals(bo.subscribeTypeId)) return false;
        return subscribeData != null ? subscribeData.equals(bo.subscribeData) : bo.subscribeData == null;

    }

    @Override
    public int hashCode() {
        int result = subscribeTypeId.hashCode();
        result = 31 * result + (subscribeData != null ? subscribeData.hashCode() : 0);
        return result;
    }
}
複製代碼

推送代碼太多,主要是經過spring+Quartz進行後臺運算,運算完畢以後將值按照訂閱(DeviceWSSHolder)反查session,發送到客戶端後端

複製代碼
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import org.apache.commons.lang.math.RandomUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

import java.io.IOException;
import java.util.*;

/**
 * 設備數量推送任務
 */
@DisallowConcurrentExecution
public class DevicePushJob extends QuartzJobBean {
    private Logger log = LoggerFactory.getLogger(DevicePushJob.class);
   /* @Autowired
    private DeviceService deviceService;*/

    @SuppressWarnings("unused")
    @Override
    protected void executeInternal(JobExecutionContext context)
            throws JobExecutionException {
        final JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
        final Object object = jobDataMap.get(JOBDATA_KEY_APPCTX);
        final ApplicationContext appCtx = (ApplicationContext) object;
        final DeviceWSSHolder deviceWSSHolder = (DeviceWSSHolder) jobDataMap
                .get(JOBDATA_KEY_INDEXCONFIG_DEVICE);
        List<String> ids  = deviceWSSHolder.getEffectDataIds();
        if(ids.size()==0){
            return;
        }
        //String idsStr = Joiner.on(",").join(ids);
        //System.out.println("××××××××××××××××××要查詢的設備類別是:"+idsStr);
        //log.info("××××××××××××××××××要查詢的設備類別是:"+idsStr);
        //查詢數據 id,type,value 把數據都裝到新的List<SubscribeBO>中發送,DeviceWSSHolder數據僅做爲字典查詢用
        List<Integer> integers = Lists.newArrayList();
        for (String typeIdFmt : ids) {
            List<String> result = Splitter.onPattern(PORTLET_TREE_DEVICETYPE_FORMAT)
                    .omitEmptyStrings().splitToList(typeIdFmt);
            Integer id = Integer.parseInt(result.get(0));
            integers.add(id);
        }
        List<SubscribeBO> subscribeBOs = Lists.newArrayList();
        DeviceService deviceService = appCtx.getBean(DeviceService.class);
        Map<Integer,Integer> deviceCounts =  deviceService.countDeviceByDeviceType(integers,false);
        if (deviceCounts == null || deviceCounts.size()==0) {
            return;
        }
        for (Map.Entry<Integer, Integer> deviceCount : deviceCounts.entrySet()) {
            Integer deviceTypeId = deviceCount.getKey();
            Integer count = deviceCount.getValue();
            SubscribeBO sb = new SubscribeBO(SubscribeType.DEVICE_COUNT
                    .getCode(),PORTLET_TREE_DEVICETYPE_FORMAT+deviceTypeId.toString());
            sb.setSubscribeValue(""+count);
            subscribeBOs.add(sb);
        }
        for(SubscribeBO bo:subscribeBOs){
            Map.Entry<SubscribeBO, List<WebSocketSession>> entry = DeviceWSSHolder
                    .getInstance().getItemBySubscribeBO(bo);
            if(entry !=null){
                SubscribeBO temp = entry.getKey();
                bo.setSubscribeId(temp.getSubscribeId());
                List<WebSocketSession> sessions = entry.getValue();
                Iterator<WebSocketSession> iterator = sessions.iterator();
                while (iterator.hasNext()) {
                    WebSocketSession session = iterator.next();
                    if (session != null && session.isOpen()) {
                        try {
                            JSONObject ret = new JSONObject();
                            ret.put("success", true);
                            List<SubscribeBO> retSbo = Lists.newArrayList(bo);
                            ret.put("data", retSbo);
                            String jsonString = JSON.toJSONString(ret);
                            //System.err.println(jsonString);
                            log.info(jsonString);
                            session.sendMessage(new TextMessage(jsonString));
                        } catch (IOException e) {
                            log.error(e.getMessage());
                        }
                    }else{
                        iterator.remove();
                    }
                }
            }
        }

    }

}
複製代碼

 附:seajs封裝的前端js工具類

複製代碼
define(function (require, exports, module) {

    var devWebSocket = {};
    var indexConfigSocket = function (opt) {
        if ('WebSocket' in window) {
            devWebSocket = new WebSocket("ws://" + window.location.host + basePath + "/app/indexConfig/indexConfigWebSocket.do");
        } else if ('MozWebSocket' in window) {
            devWebSocket = new MozWebSocket(
                "ws://" + window.location.host + basePath + "/ws/point/webSocketServer.do");
        } else {
            devWebSocket = new SockJS(
                "http://" + window.location.host + basePath + "/ws/point/webSocketServer.do");
        }
        devWebSocket.onopen = function (evnt) {
        };
        devWebSocket.onmessage = function (evnt) {
            //console.log("onMessage:"+"</br>(<font color='red'>" + evnt.data + "</font>)")
            window.PubSub.publish('indexConfigSocket-onMessage', evnt);
        };
        devWebSocket.onerror = function (e) {
            console.log('indexConfig webSocket error...');
            for (var p in e) {
                //alert(p + "=" + e[p]);
            }
        };
        devWebSocket.onclose = function (evnt) {
            console.log('indexConfig webSocket error...');
        };
    };

    indexConfigSocket.prototype.send = function (indexConfigIdsStr) {
        var indexConfig = {};
        indexConfig.data = indexConfigIdsStr;
        indexConfig.optType = '';
        var t = JSON.stringify(indexConfig);
        console.log("</br>請求報文:<font color='blue'>" + t + "</font>")
        devWebSocket.send(t);
    };

    indexConfigSocket.prototype.close = function (indexConfigIdsStr) {
        var indexConfig = {};
        indexConfig.data = indexConfigIdsStr == null ? [] : indexConfigIdsStr;
        indexConfig.optType = 'pausePush';
        var t = JSON.stringify(indexConfig);
        console.log("關閉報文:" + t);
        devWebSocket.send(t);
    };


    module.exports = indexConfigSocket;

})
相關文章
相關標籤/搜索