原先譚總(他不讓咱們叫老譚,他說他還小。。。不知道他哪裏「小」,開個玩笑哈)的tio社區版是不帶集羣功能的,雖然大部分狀況下已經能知足要求了,可是你們仍是很關心集羣方案,對於新手來講最好別太複雜,那就基於redis來作一個發佈/訂閱的方式達到多節點協做。java
基於簡單原則,就不考慮代碼或者結構方面的修理了,直接基於譚總的websocket-showcase來改改。node
大概原理相對來講好理解,相似以下圖git
客戶端和服務端節點經過代理服務器來分發,服務端節點全部的接收到的消息均發佈到redis指定頻道,而後各個服務器節點去訂閱此頻道的消息,這樣即便客戶端不在同一個服務器節點均能接收到訂閱消息。web
實際上,以最簡單的需求來講,各個節點之間只須要知道誰在線,誰離開,消息發送時可以到達對方,有這三個便可知足最簡單的集羣了,因此就這麼幹了。redis
先預覽下代碼修改處(社會我人傻話很少,直接上代碼吧):apache
其中pojo包是封裝用的bean,封裝用戶和消息體:json
import java.io.Serializable; import java.util.Set; /** * 用戶模型 * * @author huanglin */ public class User implements Serializable { private String username; private Set<String> group; /** * 所在節點,只是爲了方便後期的操做,暫時沒用上 */ private String node; public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public Set<String> getGroup() { return group; } public void setGroup(Set<String> group) { this.group = group; } public String getNode() { return node; } public void setNode(String node) { this.node = node; } }
import java.io.Serializable; /** * 消息bean對象. * * @author : huanglin * @version : 1.0 * @since :2018/5/10 上午9:36 */ public class Msg implements Serializable { private int action; private String msg; private String from; private String to; private String status; public int getAction() { return action; } public void setAction(int action) { this.action = action; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public String getFrom() { return from; } public void setFrom(String from) { this.from = from; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } }
processor包是將握手後onAfterHandshaked、斷開鏈接onBeforeClose、接收到文本onText消息時的抽象處理(多節點時要用到),其中DefaultServerProcessor只是將原來老譚的代碼原本來本放回去,不作任何改動;ServerProcessorOnPubSub則是將這幾個動做經過redis的發佈訂閱完成多節點協做(也算是集羣了)消息互通。api
抽象接口ServerProcessor:服務器
import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; /** * 主要的操做抽象 * * @author huanglin */ public interface ServerProcessor { /** * 當關閉前作通知 * * @param channelContext * @param throwable * @param remark * @param isRemove * @throws Exception */ void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception; /** * 握手成功後的通知 * * @param httpRequest * @param httpResponse * @param channelContext * @throws Exception */ void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception; /** * 收到文本信息時的通知操做 * * @param wsRequest * @param text * @param channelContext * @return * @throws Exception */ Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception; }
默認實現DefaultServerProcessor:websocket
import net.hlin.wss.server.Const; import net.hlin.wss.server.ShowcaseServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; import org.tio.websocket.common.WsResponse; import org.tio.websocket.common.WsSessionContext; import java.util.Objects; /** * 原來的showcase裏面的東西不懂 * @author huanglin */ public class DefaultServerProcessor implements ServerProcessor { private static Logger log = LoggerFactory.getLogger(DefaultServerProcessor.class); @Override public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception { if (log.isInfoEnabled()) { log.info("onBeforeClose\r\n{}", channelContext); } WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute(); if (wsSessionContext.isHandshaked()) { int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size(); String msg = channelContext.getClientNode().toString() + " 離開了,如今共有【" + count + "】人在線"; //用tio-websocket,服務器發送到客戶端的Packet都是WsResponse WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET); //羣發 Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse); } } @Override public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { //綁定到羣組,後面會有羣發 Aio.bindGroup(channelContext, Const.GROUP_ID); int count = Aio.getAllChannelContexts(channelContext.getGroupContext()).getObj().size(); String msg = channelContext.getClientNode().toString() + " 進來了,如今共有【" + count + "】人在線"; //用tio-websocket,服務器發送到客戶端的Packet都是WsResponse WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET); //羣發 Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse); } @Override public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception { WsSessionContext wsSessionContext = (WsSessionContext) channelContext.getAttribute(); HttpRequest httpRequest = wsSessionContext.getHandshakeRequestPacket();//獲取websocket握手包 if (log.isDebugEnabled()) { log.debug("握手包:{}", httpRequest); } log.info("收到ws消息:{}", text); if (Objects.equals("心跳內容", text)) { return null; } String msg = channelContext.getClientNode().toString() + " 說:" + text; //用tio-websocket,服務器發送到客戶端的Packet都是WsResponse WsResponse wsResponse = WsResponse.fromText(msg, ShowcaseServerConfig.CHARSET); //羣發 Aio.sendToGroup(channelContext.getGroupContext(), Const.GROUP_ID, wsResponse); //返回值是要發送給客戶端的內容,通常都是返回null return null; } }
基於redis的發佈訂閱實現:
import cn.hutool.core.io.resource.ResourceUtil; import com.alibaba.fastjson.JSON; import net.hlin.wss.server.Const; import net.hlin.wss.server.pojo.Msg; import net.hlin.wss.server.pojo.User; import net.hlin.wss.server.util.MsgUtil; import org.redisson.Redisson; import org.redisson.api.RBucket; import org.redisson.api.RTopic; import org.redisson.api.RedissonClient; import org.redisson.api.listener.MessageListener; import org.redisson.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; import java.io.IOException; public class ServerProcessorOnPubSub implements ServerProcessor { private static Logger log = LoggerFactory.getLogger(ServerProcessorOnPubSub.class); private RedissonClient client; private RTopic<Msg> topic; public ServerProcessorOnPubSub() { try { Config config = Config.fromJSON(ResourceUtil.getStream("classpath:redisson.json")); client = Redisson.create(config); topic = client.getTopic(Const.WS_MSG_TOPIC_CHANNEL); subcribeMsg(); } catch (IOException e) { e.printStackTrace(); } } @Override public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { String username = channelContext.getUserid(); //TODO 如查詢當前用戶所在組的功能 //Set<String> groups = userService.getUserGroups(username); // for 循環 :Aio.bindGroup(channelContext, group); //無論以前是否已經登陸,直接覆蓋,實際業務時會有具體處理 User user = new User(); // user.setGroup(groups); user.setUsername(username); user.setNode(channelContext.getServerNode().toString()); RBucket<User> userRBucket = client.getBucket(Const.WS_USER_PREFIX + username); userRBucket.set(user); log.info("用戶{}加入", username); } @Override public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception { String username = channelContext.getUserid(); client.getBucket(Const.WS_USER_PREFIX + username).delete(); log.info("用戶{}離開", username); } @Override public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception { Msg msg = JSON.parseObject(text, Msg.class); topic.publish(msg); return null; } private void subcribeMsg() { topic.addListener(new MessageListener<Msg>() { @Override public void onMessage(String channel, Msg msg) { int action = msg.getAction(); Msg respMsg = new Msg(); //響應信息則直接返回給客戶端便可 if (action % 11 == 0 && MsgUtil.existsUser(msg.getTo())) { //從新包裝下後再發送 respMsg.setMsg(msg.getMsg()); respMsg.setAction(msg.getAction()); respMsg.setStatus(msg.getStatus()); MsgUtil.sendToUser(msg.getTo(), respMsg); } else { respMsg.setTo(msg.getFrom()); respMsg.setStatus("200"); if (action == Const.Action.P2P_MSG_REQ.val()) { respMsg.setAction(Const.Action.P2P_MSG_RESP.val()); if (MsgUtil.existsUser(msg.getTo())) { MsgUtil.sendToUser(msg.getTo(), msg); topic.publish(respMsg); } } else if (action == Const.Action.GROUP_MSG_REQ.val()) { MsgUtil.sendToGroup(msg.getTo(), msg); respMsg.setAction(Const.Action.GROUP_MSG_RESP.val()); topic.publish(respMsg); } } } }); } }
其中工具包中的MsgUtil是封裝了Aio的部分功能,簡化代碼目的:
import cn.hutool.json.JSONUtil; import net.hlin.wss.server.ShowcaseServerConfig; import net.hlin.wss.server.pojo.Msg; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.utils.lock.SetWithLock; import org.tio.websocket.common.WsResponse; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 聊天工具類. * * @author : huanglin * @version : 1.0 * @since :2018/5/8 上午11:23 */ public class MsgUtil { public static boolean existsUser(String userId) { SetWithLock<ChannelContext> set = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId); if(set == null || set.size() < 1) { return false; } return true; } /** * 發送到指定用戶 * @param userId * @param message */ public static void sendToUser(String userId, Msg message) { SetWithLock<ChannelContext> toChannleContexts = Aio.getChannelContextsByUserid(ShowcaseServerConfig.groupContext, userId); if(toChannleContexts == null || toChannleContexts.size() < 1) { return; } ReentrantReadWriteLock.ReadLock readLock = toChannleContexts.getLock().readLock(); readLock.lock(); try{ Set<ChannelContext> channels = toChannleContexts.getObj(); for(ChannelContext channelContext : channels){ send(channelContext, message); } }finally{ readLock.unlock(); } } /** * 功能描述:[發送到羣組(全部不一樣協議端)] * @param group * @param msg */ public static void sendToGroup(String group, Msg msg){ if(msg == null) { return; } SetWithLock<ChannelContext> withLockChannels = Aio.getChannelContextsByGroup(ShowcaseServerConfig.groupContext, group); if(withLockChannels == null) { return; } ReentrantReadWriteLock.ReadLock readLock = withLockChannels.getLock().readLock(); readLock.lock(); try{ Set<ChannelContext> channels = withLockChannels.getObj(); if(channels != null && channels.size() > 0){ for(ChannelContext channelContext : channels){ send(channelContext,msg); } } }finally{ readLock.unlock(); } } /** * 發送到指定通道; * @param channelContext * @param msg */ public static void send(ChannelContext channelContext,Msg msg){ if(channelContext == null) { return; } WsResponse response = WsResponse.fromText(JSONUtil.toJsonStr(msg), ShowcaseServerConfig.CHARSET); Aio.sendToId(channelContext.getGroupContext(), channelContext.getId(), response); } }
其餘幾個用紅色箭頭的代碼表示在原來的基礎上有所改動。
常量類Const:
/** * @author tanyaowu * @modify huanglin */ public class Const { /** * 用於羣聊的group id */ public static final String GROUP_ID = "showcase-websocket"; public static final String WS_MSG_TOPIC_CHANNEL = "WS_MSG_TOPIC_CHANNEL"; public static final String WS_USER_PREFIX = "WS_USER_PREFIX:"; /** * 客戶端和服務端的交互動做枚舉 */ public enum Action { /** * 點對點消息請求 */ P2P_MSG_REQ(3), /** * 點對點消息響應 */ P2P_MSG_RESP(33), /** * 羣組消息請求 */ GROUP_MSG_REQ(4), /** * 羣組消息響應 */ GROUP_MSG_RESP(44); private int action; Action(int action) { this.action = action; } public int val(){ return this.action; } } }
ShowcaseIpStatListener無變化,這裏就不列了
ShowcaseServerAioListener:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.ChannelContext; import org.tio.core.intf.Packet; import org.tio.websocket.server.WsServerAioListener; /** * @author tanyaowu * 用戶根據狀況來完成該類的實現 * @modify huanglin */ public class ShowcaseServerAioListener extends WsServerAioListener { private static Logger log = LoggerFactory.getLogger(ShowcaseServerAioListener.class); public static final ShowcaseServerAioListener me = new ShowcaseServerAioListener(); private ShowcaseServerAioListener() { } @Override public void onAfterConnected(ChannelContext channelContext, boolean isConnected, boolean isReconnect) throws Exception { super.onAfterConnected(channelContext, isConnected, isReconnect); if (log.isInfoEnabled()) { log.info("onAfterConnected\r\n{}", channelContext); } } @Override public void onAfterSent(ChannelContext channelContext, Packet packet, boolean isSentSuccess) throws Exception { super.onAfterSent(channelContext, packet, isSentSuccess); if (log.isInfoEnabled()) { log.info("onAfterSent\r\n{}\r\n{}", packet.logstr(), channelContext); } } @Override public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String remark, boolean isRemove) throws Exception { super.onBeforeClose(channelContext, throwable, remark, isRemove); ShowcaseServerConfig.processor.onBeforeClose(channelContext, throwable, remark, isRemove); } @Override public void onAfterDecoded(ChannelContext channelContext, Packet packet, int packetSize) throws Exception { super.onAfterDecoded(channelContext, packet, packetSize); if (log.isInfoEnabled()) { log.info("onAfterDecoded\r\n{}\r\n{}", packet.logstr(), channelContext); } } @Override public void onAfterReceivedBytes(ChannelContext channelContext, int receivedBytes) throws Exception { super.onAfterReceivedBytes(channelContext, receivedBytes); if (log.isInfoEnabled()) { log.info("onAfterReceivedBytes\r\n{}", channelContext); } } @Override public void onAfterHandled(ChannelContext channelContext, Packet packet, long cost) throws Exception { super.onAfterHandled(channelContext, packet, cost); if (log.isInfoEnabled()) { log.info("onAfterHandled\r\n{}\r\n{}", packet.logstr(), channelContext); } } }
ShowcaseServerConfig:
增長部分
/** * 若是使用DefaultServerProcessor就是單節點,shiyong ServerProcessorOnPubSub就能集羣了 */ public static ServerProcessor processor; /** * 給MsgUtil hold住實例,直接調用 */ public static ServerGroupContext groupContext;
完整代碼:
import net.hlin.wss.server.processor.ServerProcessor; import org.tio.server.ServerGroupContext; import org.tio.utils.time.Time; /** * @author tanyaowu * @modify Huang lin * */ public abstract class ShowcaseServerConfig { /** * 協議名字(能夠隨便取,主要用於開發人員辨識) */ public static final String PROTOCOL_NAME = "showcase"; public static final String CHARSET = "utf-8"; /** * 監聽的ip null表示監聽全部,並不指定ip */ public static final String SERVER_IP = null; /** * 監聽端口 */ public static final int SERVER_PORT = 9326; /** * 心跳超時時間,單位:毫秒 */ public static final int HEARTBEAT_TIMEOUT = 1000 * 60; /** * 若是使用DefaultServerProcessor就是單節點,shiyong ServerProcessorOnPubSub就能集羣了 */ public static ServerProcessor processor; /** * 給MsgUtil hold住實例,直接調用 */ public static ServerGroupContext groupContext; /** * ip數據監控統計,時間段 * @author tanyaowu * */ public interface IpStatDuration { Long DURATION_1 = Time.MINUTE_1 * 5; Long[] IP_STAT_DURATIONS = new Long[] { DURATION_1 }; } }
ShowcaseWebsocketStarter:
import java.io.IOException; import net.hlin.wss.server.processor.ServerProcessorOnPubSub; import org.apache.commons.lang3.StringUtils; import org.tio.core.ssl.SslConfig; import org.tio.server.ServerGroupContext; import org.tio.websocket.server.WsServerStarter; /** * @author tanyaowu * 2017年6月28日 下午5:34:04 */ public class ShowcaseWebsocketStarter { private WsServerStarter wsServerStarter; private ServerGroupContext serverGroupContext; /** * * @author tanyaowu */ public ShowcaseWebsocketStarter(int port, ShowcaseWsMsgHandler wsMsgHandler) throws Exception { wsServerStarter = new WsServerStarter(port, wsMsgHandler); serverGroupContext = wsServerStarter.getServerGroupContext(); serverGroupContext.setName(ShowcaseServerConfig.PROTOCOL_NAME); serverGroupContext.setServerAioListener(ShowcaseServerAioListener.me); //設置ip統計時間段 serverGroupContext.ipStats.addDurations(ShowcaseServerConfig.IpStatDuration.IP_STAT_DURATIONS); //設置ip監控 serverGroupContext.setIpStatListener(ShowcaseIpStatListener.me); //設置心跳超時時間 serverGroupContext.setHeartbeatTimeout(ShowcaseServerConfig.HEARTBEAT_TIMEOUT); //若是你但願經過wss來訪問,就加上下面這一行吧,不過首先你得有證書哦 //initSsl(serverGroupContext); } private static void initSsl(ServerGroupContext serverGroupContext) throws Exception { String keyStoreFile = "classpath:config/ssl/keystore.jks"; String trustStoreFile = "classpath:config/ssl/keystore.jks"; String keyStorePwd = "214323428310224"; if (StringUtils.isNotBlank(keyStoreFile) && StringUtils.isNotBlank(trustStoreFile)) { SslConfig sslConfig = SslConfig.forServer(keyStoreFile, trustStoreFile, keyStorePwd); serverGroupContext.setSslConfig(sslConfig); } } /** * @author tanyaowu * @throws IOException */ public static void start() throws Exception { ShowcaseWebsocketStarter appStarter = new ShowcaseWebsocketStarter(ShowcaseServerConfig.SERVER_PORT, ShowcaseWsMsgHandler.me); ShowcaseServerConfig.processor = new ServerProcessorOnPubSub(); ShowcaseServerConfig.groupContext = appStarter.getServerGroupContext(); appStarter.wsServerStarter.start(); } /** * @return the serverGroupContext */ public ServerGroupContext getServerGroupContext() { return serverGroupContext; } public WsServerStarter getWsServerStarter() { return wsServerStarter; } public static void main(String[] args) throws Exception { start(); } }
ShowcaseWsMsgHandler:
import cn.hutool.core.util.StrUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.Aio; import org.tio.core.ChannelContext; import org.tio.http.common.HttpRequest; import org.tio.http.common.HttpResponse; import org.tio.websocket.common.WsRequest; import org.tio.websocket.server.handler.IWsMsgHandler; /** * @author tanyaowu * 2017年6月28日 下午5:32:38 */ public class ShowcaseWsMsgHandler implements IWsMsgHandler { private static Logger log = LoggerFactory.getLogger(ShowcaseWsMsgHandler.class); public static ShowcaseWsMsgHandler me = new ShowcaseWsMsgHandler(); private ShowcaseWsMsgHandler() { } /** * 握手時走這個方法,業務能夠在這裏獲取cookie,request參數等 */ @Override public HttpResponse handshake(HttpRequest request, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { String username = request.getParam("username"); //不作合法性處理了,根據具體業務來處理就行了 if (StrUtil.isNotEmpty(username)) { Aio.bindUser(channelContext, username); return httpResponse; } return null; } /** * @param httpRequest * @param httpResponse * @param channelContext * @throws Exception * @author tanyaowu */ @Override public void onAfterHandshaked(HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { ShowcaseServerConfig.processor.onAfterHandshaked(httpRequest, httpResponse, channelContext); } /** * 字節消息(binaryType = arraybuffer)過來後會走這個方法 */ @Override public Object onBytes(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception { return null; } /** * 當客戶端發close flag時,會走這個方法 */ @Override public Object onClose(WsRequest wsRequest, byte[] bytes, ChannelContext channelContext) throws Exception { Aio.remove(channelContext, "receive close flag"); return null; } /** * 字符消息(binaryType = blob)過來後會走這個方法 */ @Override public Object onText(WsRequest wsRequest, String text, ChannelContext channelContext) throws Exception { return ShowcaseServerConfig.processor.onText(wsRequest, text, channelContext); } }
最後來看看運行結果(啓用了兩個端口分別啓動了2個服務器節點):
源碼請移步:https://gitee.com/hlinwork/tio-websocket-showcase