小改下把tio-websocket-showcase變成可集羣方式

    原先譚總(他不讓咱們叫老譚,他說他還小。。。不知道他哪裏「小」,開個玩笑哈)的tio社區版是不帶集羣功能的,雖然大部分狀況下已經能知足要求了,可是你們仍是很關心集羣方案,對於新手來講最好別太複雜,那就基於redis來作一個發佈/訂閱的方式達到多節點協做。java

基於簡單原則,就不考慮代碼或者結構方面的修理了,直接基於譚總的websocket-showcase來改改。node

大概原理相對來講好理解,相似以下圖git

客戶端和服務端節點經過代理服務器來分發,服務端節點全部的接收到的消息均發佈到redis指定頻道,而後各個服務器節點去訂閱此頻道的消息,這樣即便客戶端不在同一個服務器節點均能接收到訂閱消息。web

實際上,以最簡單的需求來講,各個節點之間只須要知道誰在線,誰離開,消息發送時可以到達對方,有這三個便可知足最簡單的集羣了,因此就這麼幹了。redis

先預覽下代碼修改處(社會我人傻話很少,直接上代碼吧):apache

其中pojo包是封裝用的bean,封裝用戶和消息體:json

import java.io.Serializable;
import java.util.Set;

/**
 * 用戶模型
 *
 * @author huanglin
 */
public class User implements Serializable {
    private String username;
    private Set<String> group;
    /**
     * 所在節點,只是爲了方便後期的操做,暫時沒用上
     */
    private String node;

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public Set<String> getGroup() {
        return group;
    }

    public void setGroup(Set<String> group) {
        this.group = group;
    }

    public String getNode() {
        return node;
    }

    public void setNode(String node) {
        this.node = node;
    }
}
import java.io.Serializable;

/**
 * 消息bean對象.
 *
 * @author : huanglin
 * @version : 1.0
 * @since :2018/5/10 上午9:36
 */
public class Msg implements Serializable {
    private int action;
    private String msg;
    private String from;
    private String to;
    private String status;

    public int getAction() {
        return action;
    }

    public void setAction(int action) {
        this.action = action;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }
}

processor包是將握手後onAfterHandshaked、斷開鏈接onBeforeClose、接收到文本onText消息時的抽象處理(多節點時要用到),其中DefaultServerProcessor只是將原來老譚的代碼原本來本放回去,不作任何改動;ServerProcessorOnPubSub則是將這幾個動做經過redis的發佈訂閱完成多節點協做(也算是集羣了)消息互通。api

抽象接口ServerProcessor:服務器

import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;

/**
 * 主要的操做抽象
 *
 * @author huanglin
 */
public interface ServerProcessor {
    /**
     * 當關閉前作通知
     *
     * @param channelContext
     * @param throwable
     * @param remark
     * @param isRemove
     * @throws Exception
     */
    void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception;

    /**
     * 握手成功後的通知
     *
     * @param httpRequest
     * @param httpResponse
     * @param channelContext
     * @throws Exception
     */
    void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception;

    /**
     * 收到文本信息時的通知操做
     *
     * @param wsRequest
     * @param text
     * @param channelContext
     * @return
     * @throws Exception
     */
    Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception;
}

默認實現DefaultServerProcessor:websocket

import net.hlin.wss.server.Const;
import net.hlin.wss.server.ShowcaseServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.common.WsResponse;
import org.tio.websocket.common.WsSessionContext;

import java.util.Objects;

/**
 * 原來的showcase裏面的東西不懂
 * @author huanglin
 */
public class DefaultServerProcessor implements ServerProcessor {

    private static Logger log = LoggerFactory.getLogger(DefaultServerProcessor.class);

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
        if (log.isInfoEnabled()) {
            log.info("onBeforeClose\r\n{}", channelContext);
        }

        WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute();

        if (wsSessionContext.isHandshaked()) {
            int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size();

            String msg = channelContext.getClientNode().toString() + " 離開了,如今共有【" + count + "】人在線";
            //用tio-websocket,服務器發送到客戶端的Packet都是WsResponse
            WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET);
            //羣發
            Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse);
        }
    }

    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        //綁定到羣組,後面會有羣發
        Aio.bindGroup(channelContext, Const.GROUP_ID);
        int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size();
        String msg = channelContext.getClientNode().toString() + " 進來了,如今共有【" + count + "】人在線";
        //用tio-websocket,服務器發送到客戶端的Packet都是WsResponse
        WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET);
        //羣發
        Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse);
    }

    @Override
    public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
        WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute();
        HttpRequest httpRequest = wsSessionContext.getHandshakeRequestPacket();//獲取websocket握手包
        if (log.isDebugEnabled()) {
            log.debug("握手包:{}", httpRequest);
        }

        log.info("收到ws消息:{}", text);

        if (Objects.equals("心跳內容", text)) {
            return null;
        }

        String msg = channelContext.getClientNode().toString() + " 說:" + text;
        //用tio-websocket,服務器發送到客戶端的Packet都是WsResponse
        WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET);
        //羣發
        Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse);

        //返回值是要發送給客戶端的內容,通常都是返回null
        return null;
    }
}

基於redis的發佈訂閱實現:

import cn.hutool.core.io.resource.ResourceUtil;
import com.alibaba.fastjson.JSON;
import net.hlin.wss.server.Const;
import net.hlin.wss.server.pojo.Msg;
import net.hlin.wss.server.pojo.User;
import net.hlin.wss.server.util.MsgUtil;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.api.listener.MessageListener;
import org.redisson.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;

import java.io.IOException;

public class ServerProcessorOnPubSub implements ServerProcessor {

    private static Logger log = LoggerFactory.getLogger(ServerProcessorOnPubSub.class);
    private RedissonClient client;
    private RTopic<Msg> topic;

    public ServerProcessorOnPubSub() {
        try {
            Config config = Config.fromJSON(ResourceUtil.getStream("classpath:redisson.json"));
            client = Redisson.create(config);
            topic = client.getTopic(Const.WS_MSG_TOPIC_CHANNEL);
            subcribeMsg();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
        String username = channelContext.getUserid();
        //TODO 如查詢當前用戶所在組的功能
        //Set<String> groups = userService.getUserGroups(username);
        // for 循環 :Aio.bindGroup(channelContext, group);
        //無論以前是否已經登陸,直接覆蓋,實際業務時會有具體處理
        User user = new User();
        // user.setGroup(groups);
        user.setUsername(username);
        user.setNode(channelContext.getServerNode().toString());
        RBucket<User> userRBucket = client.getBucket(Const.WS_USER_PREFIX + username);
        userRBucket.set(user);
        log.info("用戶{}加入", username);
    }

    @Override
    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
        String username = channelContext.getUserid();
        client.getBucket(Const.WS_USER_PREFIX + username).delete();
        log.info("用戶{}離開", username);
    }

    @Override
    public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
        Msg msg = JSON.parseObject(text, Msg.class);
        topic.publish(msg);
        return null;
    }

    private void subcribeMsg() {
        topic.addListener(new MessageListener<Msg>() {
            @Override
            public void onMessage(String channel, Msg msg) {
                int action = msg.getAction();
                Msg respMsg = new Msg();
                //響應信息則直接返回給客戶端便可
                if (action % 11 == 0 && MsgUtil.existsUser(msg.getTo())) {
                    //從新包裝下後再發送
                    respMsg.setMsg(msg.getMsg());
                    respMsg.setAction(msg.getAction());
                    respMsg.setStatus(msg.getStatus());
                    MsgUtil.sendToUser(msg.getTo(), respMsg);
                } else {
                    respMsg.setTo(msg.getFrom());
                    respMsg.setStatus("200");
                    if (action == Const.Action.P2P_MSG_REQ.val()) {
                        respMsg.setAction(Const.Action.P2P_MSG_RESP.val());
                        if (MsgUtil.existsUser(msg.getTo())) {
                            MsgUtil.sendToUser(msg.getTo(), msg);
                            topic.publish(respMsg);
                        }
                    } else if (action == Const.Action.GROUP_MSG_REQ.val()) {
                        MsgUtil.sendToGroup(msg.getTo(), msg);
                        respMsg.setAction(Const.Action.GROUP_MSG_RESP.val());
                        topic.publish(respMsg);
                    }
                }
            }
        });
    }
}

其中工具包中的MsgUtil是封裝了Aio的部分功能,簡化代碼目的:

import cn.hutool.json.JSONUtil;
import net.hlin.wss.server.ShowcaseServerConfig;
import net.hlin.wss.server.pojo.Msg;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.utils.lock.SetWithLock;
import org.tio.websocket.common.WsResponse;

import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 聊天工具類.
 *
 * @author : huanglin
 * @version : 1.0
 * @since :2018/5/8 上午11:23
 */
public class MsgUtil {

    public static boolean existsUser(String userId) {
        SetWithLock<ChannelContext> set = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId);
        if(set == null || set.size() < 1) {
            return false;
        }
        return true;
    }

    /**
     * 發送到指定用戶
     * @param userId
     * @param message
     */
    public static void sendToUser(String userId, Msg message) {
        SetWithLock<ChannelContext> toChannleContexts = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId);
        if(toChannleContexts == null || toChannleContexts.size() < 1) {
            return;
        }
        ReentrantReadWriteLock.ReadLock readLock = toChannleContexts.getLock().readLock();
        readLock.lock();
        try{
            Set<ChannelContext> channels = toChannleContexts.getObj();
            for(ChannelContext channelContext : channels){
                send(channelContext, message);
            }
        }finally{
            readLock.unlock();
        }
    }

    /**
     * 功能描述:[發送到羣組(全部不一樣協議端)]
     * @param group
     * @param msg
     */
    public static void sendToGroup(String group, Msg msg){
        if(msg == null) {
            return;
        }
        SetWithLock<ChannelContext> withLockChannels = Aio.getChannelContextsByGroup(ShowcaseServerConfig.groupContext, group);
        if(withLockChannels == null) {
            return;
        }
        ReentrantReadWriteLock.ReadLock readLock = withLockChannels.getLock().readLock();
        readLock.lock();
        try{
            Set<ChannelContext> channels = withLockChannels.getObj();
            if(channels != null && channels.size() > 0){
                for(ChannelContext channelContext : channels){
                    send(channelContext,msg);
                }
            }
        }finally{
            readLock.unlock();
        }
    }

    /**
     * 發送到指定通道;
     * @param channelContext
     * @param msg
     */
    public static void send(ChannelContext channelContext,Msg msg){
        if(channelContext == null) {
            return;
        }
        WsResponse response = WsResponse.fromText(JSONUtil.toJsonStr(msg), ShowcaseServerConfig.CHARSET);
        Aio.sendToId(channelContext.getGroupContext(), channelContext.getId(), response);
    }
}

 

其餘幾個用紅色箭頭的代碼表示在原來的基礎上有所改動。

常量類Const:

/**
 * @author tanyaowu
 * @modify huanglin
 */
public class Const {
	/**
	 * 用於羣聊的group id
	 */
	public static final String GROUP_ID = "showcase-websocket";

	public static final String WS_MSG_TOPIC_CHANNEL = "WS_MSG_TOPIC_CHANNEL";

	public static final String WS_USER_PREFIX = "WS_USER_PREFIX:";

	/**
	 * 客戶端和服務端的交互動做枚舉
	 */
	public enum Action {
		/**
		 * 點對點消息請求
		 */
		P2P_MSG_REQ(3),
		/**
		 * 點對點消息響應
		 */
		P2P_MSG_RESP(33),
		/**
		 * 羣組消息請求
		 */
		GROUP_MSG_REQ(4),
		/**
		 * 羣組消息響應
		 */
		GROUP_MSG_RESP(44);

		
		private int action;
		Action(int action) {
			this.action = action;
		}
		public int val(){
			return this.action;
		}
	}
}

ShowcaseIpStatListener無變化,這裏就不列了

ShowcaseServerAioListener:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import org.tio.websocket.server.WsServerAioListener;

/**
 * @author tanyaowu
 * 用戶根據狀況來完成該類的實現
 * @modify huanglin
 */
public class ShowcaseServerAioListener extends WsServerAioListener {
	private static Logger log = LoggerFactory.getLogger(ShowcaseServerAioListener.class);

	public static final ShowcaseServerAioListener me = new ShowcaseServerAioListener();

	private ShowcaseServerAioListener() {

	}

	@Override
	public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception {
		super.onAfterConnected(channelContext, isConnected, isReconnect);
		if (log.isInfoEnabled()) {
			log.info("onAfterConnected\r\n{}", channelContext);
		}

	}

	@Override
	public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception {
		super.onAfterSent(channelContext, packet, isSentSuccess);
		if (log.isInfoEnabled()) {
			log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext);
		}
	}

	@Override
	public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception {
		super.onBeforeClose(channelContext, throwable, remark, isRemove);
		ShowcaseServerConfig.processor.onBeforeClose(channelContext, throwable, remark, isRemove);
	}

	@Override
	public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception {
		super.onAfterDecoded(channelContext, packet, packetSize);
		if (log.isInfoEnabled()) {
			log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext);
		}
	}

	@Override
	public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception {
		super.onAfterReceivedBytes(channelContext, receivedBytes);
		if (log.isInfoEnabled()) {
			log.info("onAfterReceivedBytes\r\n{}", channelContext);
		}
	}

	@Override
	public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception {
		super.onAfterHandled(channelContext, packet, cost);
		if (log.isInfoEnabled()) {
			log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext);
		}
	}

}

ShowcaseServerConfig:

增長部分

/**
 * 若是使用DefaultServerProcessor就是單節點,shiyong ServerProcessorOnPubSub就能集羣了
 */
public static ServerProcessor processor;

/**
 * 給MsgUtil hold住實例,直接調用
 */
public static ServerGroupContext groupContext;

完整代碼:

import net.hlin.wss.server.processor.ServerProcessor;
import org.tio.server.ServerGroupContext;
import org.tio.utils.time.Time;

/**
 * @author tanyaowu
 * @modify Huang lin
 *
 */
public abstract class ShowcaseServerConfig {
	/**
	 * 協議名字(能夠隨便取,主要用於開發人員辨識)
	 */
	public static final String PROTOCOL_NAME = "showcase";
	
	public static final String CHARSET = "utf-8";
	/**
	 * 監聽的ip null表示監聽全部,並不指定ip
	 */
	public static final String SERVER_IP = null;

	/**
	 * 監聽端口
	 */
	public static final int SERVER_PORT = 9326;

	/**
	 * 心跳超時時間,單位:毫秒
	 */
	public static final int HEARTBEAT_TIMEOUT = 1000 * 60;

	/**
	 * 若是使用DefaultServerProcessor就是單節點,shiyong ServerProcessorOnPubSub就能集羣了
	 */
	public static ServerProcessor processor;

	/**
	 * 給MsgUtil hold住實例,直接調用
	 */
	public static ServerGroupContext groupContext;

	/**
	 * ip數據監控統計,時間段
	 * @author tanyaowu
	 *
	 */
	public interface IpStatDuration {
		Long DURATION_1 = Time.MINUTE_1 * 5;
		Long[] IP_STAT_DURATIONS = new Long[] { DURATION_1 };
	}

}

ShowcaseWebsocketStarter:

import java.io.IOException;

import net.hlin.wss.server.processor.ServerProcessorOnPubSub;
import org.apache.commons.lang3.StringUtils;
import org.tio.core.ssl.SslConfig;
import org.tio.server.ServerGroupContext;
import org.tio.websocket.server.WsServerStarter;

/**
 * @author tanyaowu
 * 2017年6月28日 下午5:34:04
 */
public class ShowcaseWebsocketStarter {

	private WsServerStarter wsServerStarter;
	private ServerGroupContext serverGroupContext;

	/**
	 *
	 * @author tanyaowu
	 */
	public ShowcaseWebsocketStarter(int port, ShowcaseWsMsgHandler wsMsgHandler) throws Exception {
		wsServerStarter = new WsServerStarter(port, wsMsgHandler);

		serverGroupContext = wsServerStarter.getServerGroupContext();
		serverGroupContext.setName(ShowcaseServerConfig.PROTOCOL_NAME);
		serverGroupContext.setServerAioListener(ShowcaseServerAioListener.me);

		//設置ip統計時間段
		serverGroupContext.ipStats.addDurations(ShowcaseServerConfig.IpStatDuration.IP_STAT_DURATIONS);
		//設置ip監控
		serverGroupContext.setIpStatListener(ShowcaseIpStatListener.me);
		//設置心跳超時時間
		serverGroupContext.setHeartbeatTimeout(ShowcaseServerConfig.HEARTBEAT_TIMEOUT);
		//若是你但願經過wss來訪問,就加上下面這一行吧,不過首先你得有證書哦
		//initSsl(serverGroupContext);
	}
	
	private static void initSsl(ServerGroupContext serverGroupContext) throws Exception {
		String keyStoreFile = "classpath:config/ssl/keystore.jks";
		String trustStoreFile = "classpath:config/ssl/keystore.jks";
		String keyStorePwd = "214323428310224";

		if (StringUtils.isNotBlank(keyStoreFile) && StringUtils.isNotBlank(trustStoreFile)) {
			SslConfig sslConfig = SslConfig.forServer(keyStoreFile, trustStoreFile, keyStorePwd);
			serverGroupContext.setSslConfig(sslConfig);
		}
	}

	/**
	 * @author tanyaowu
	 * @throws IOException
	 */
	public static void start() throws Exception {
		ShowcaseWebsocketStarter appStarter = new ShowcaseWebsocketStarter(ShowcaseServerConfig.SERVER_PORT, ShowcaseWsMsgHandler.me);
		ShowcaseServerConfig.processor = new ServerProcessorOnPubSub();
		ShowcaseServerConfig.groupContext = appStarter.getServerGroupContext();
		appStarter.wsServerStarter.start();
	}

	/**
	 * @return the serverGroupContext
	 */
	public ServerGroupContext getServerGroupContext() {
		return serverGroupContext;
	}

	public WsServerStarter getWsServerStarter() {
		return wsServerStarter;
	}
	
	public static void main(String[] args) throws Exception {
		start();
	}

}

ShowcaseWsMsgHandler:

import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.http.common.HttpRequest;
import org.tio.http.common.HttpResponse;
import org.tio.websocket.common.WsRequest;
import org.tio.websocket.server.handler.IWsMsgHandler;

/**
 * @author tanyaowu
 * 2017年6月28日 下午5:32:38
 */
public class ShowcaseWsMsgHandler implements IWsMsgHandler {
	private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class);

	public static ShowcaseWsMsgHandler me = new ShowcaseWsMsgHandler();

	private ShowcaseWsMsgHandler() {

	}

	/**
	 * 握手時走這個方法,業務能夠在這裏獲取cookie,request參數等
	 */
	@Override
	public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
		String username = request.getParam("username");
		//不作合法性處理了,根據具體業務來處理就行了
		if (StrUtil.isNotEmpty(username)) {
			Aio.bindUser(channelContext, username);
			return httpResponse;
		}
		return null;
	}

	/** 
	 * @param httpRequest
	 * @param httpResponse
	 * @param channelContext
	 * @throws Exception
	 * @author tanyaowu
	 */
	@Override
	public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception {
		ShowcaseServerConfig.processor.onAfterHandshaked(httpRequest, httpResponse, channelContext);
	}

	/**
	 * 字節消息(binaryType = arraybuffer)過來後會走這個方法
	 */
	@Override
	public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
		return null;
	}

	/**
	 * 當客戶端發close flag時,會走這個方法
	 */
	@Override
	public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception {
		Aio.remove(channelContext, "receive close flag");
		return null;
	}

	/**
	 * 字符消息(binaryType = blob)過來後會走這個方法
	 */
	@Override
	public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception {
		return ShowcaseServerConfig.processor.onText(wsRequest, text, channelContext);
	}

}

最後來看看運行結果(啓用了兩個端口分別啓動了2個服務器節點):

源碼請移步:https://gitee.com/hlinwork/tio-websocket-showcase

相關文章
相關標籤/搜索