本博客是基於老譚t-io showcase中的tio-websocket-showcase 示例來實現集羣。看showcase 入門仍是挺容易的,入坑(入門)請看老譚寫的用t-io來寫一個網頁聊天室或客服是個怎樣的體驗。 要深刻理解具體實現原理後續的業務擴展,把t-io玩6起來還須要耐心看看源碼,看了以後我相信你必定會有收穫的,祝你好運。java
其實t-io 在2.4
的版本中已加入的集羣實現的邏輯代碼,只是官方沒有寫文檔以及完整的示例而已,在此不得不說t-io 是一個比較良心的開源項目,不少業務場景都有考慮到。大家有需求也能夠去t-io提issues。git
實現思路就是基於redis來作一個發佈/訂閱的方式達到多節點協做的目的,t-io內置的集羣也是使用的此解決方案。下面就來聊聊如何使用t-io的內置集羣。web
在t-io
中是否開啓集羣是經過org.tio.core.GroupContext
中的tioClusterConfig
是否爲空來判斷的。redis
好了,閒話少說直接上菜(代碼)apache
判斷是否開啓集羣(org.tio.core.GroupContext)編程
/** * 是不是集羣 * @return true: 是集羣 * @author: tanyaowu */ public boolean isCluster() { return tioClusterConfig != null; }
tio-websocket-showcase中增長集羣解決方案json
//實例化t-io集羣配置 TioClusterConfig tioClusterConfig = TioClusterConfig.newInstance("Javen", RedissonTemplate.me().getRedissonClient()); //開啓羣組集羣-默認不集羣 tioClusterConfig.setCluster4group(true); //配置t-io集羣 serverGroupContext.setTioClusterConfig(tioClusterConfig);
J-IM
中的部分代碼,目的是來實例化RedissonClient
RedissonTemplate 代碼以下慢慢品讀api
package org.jim.common.cache.redis; import java.io.Serializable; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.redisson.config.SingleServerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @author WChao * @date 2018年5月18日 下午2:46:55 */ public class RedissonTemplate implements Serializable{ private static final long serialVersionUID = -4528751601700736437L; private static final Logger logger = LoggerFactory.getLogger(RedissonTemplate.class); private static RedissonTemplate instance = null; private static RedisConfiguration redisConfig = null; private static final String REDIS = "redis"; private static RedissonClient redissonClient = null; private RedissonTemplate(){}; public static RedissonTemplate me() throws Exception{ if (instance == null) { synchronized (RedissonTemplate.class) { if(instance == null){ redisConfig = RedisConfigurationFactory.parseConfiguration(); init(); instance = new RedissonTemplate(); } } } return instance; } private static final void init() throws Exception { String host = redisConfig.getHost(); if(host == null) { logger.error("the server ip of redis must be not null!"); throw new Exception("the server ip of redis must be not null!"); } int port = redisConfig.getPort(); String password = redisConfig.getAuth(); Config redissonConfig = new Config(); SingleServerConfig singleServerConfig = redissonConfig.useSingleServer(); singleServerConfig.setAddress(REDIS+"://"+host+":"+port).setPassword(password).setTimeout(redisConfig.getTimeout()).setRetryAttempts(redisConfig.getRetryNum()); try { redissonClient = Redisson.create(redissonConfig); } catch (Exception e) { logger.error("cann't create RedissonClient for server"+redisConfig.getHost()); throw new Exception("cann't create RedissonClient for server"+redisConfig.getHost()); } } /** * 獲取RedissonClient客戶端; * @return */ public final RedissonClient getRedissonClient(){ return redissonClient; } }
看到這裏有人可能要問,在什麼地方發佈消息以及處理訂閱消息!!!微信
什麼地方發佈消息websocket
固然是發送消息的時候,調用Tio.sendXxx()
系列方法的時候。在tio-websocket-showcase
中主要實現的是羣聊,調用的是Tio.sendToGroup()
,具體實現代碼以下:
/** * 發消息到組 * @param groupContext * @param group * @param packet * @param channelContextFilter * @author tanyaowu */ private static Boolean sendToGroup(GroupContext groupContext, String group, Packet packet, ChannelContextFilter channelContextFilter, boolean isBlock) { try { SetWithLock<ChannelContext> setWithLock = groupContext.groups.clients(groupContext, group); if (setWithLock == null) { log.debug("{}, 組[{}]不存在", groupContext.getName(), group); return false; } Boolean ret = sendToSet(groupContext, setWithLock, packet, channelContextFilter, isBlock); return ret; } finally { //判斷是否集羣以及是否是集羣經過topic轉過來的消息包 if (groupContext.isCluster() && !packet.isFromCluster()) { TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig(); //判斷是否開啓了羣組集羣 if (tioClusterConfig.isCluster4group()) { // TioClusterVo tioClusterVo = new TioClusterVo(packet); // tioClusterVo.setGroup(group); // tioClusterConfig.publishAsyn(tioClusterVo); //在集羣環境下,把羣組消息通知到集羣中的其它機器 notifyClusterForGroup(groupContext, group, packet); } } } }
/** * 在集羣環境下,把羣組消息通知到集羣中的其它機器 * @param groupContext * @param group * @param packet */ public static void notifyClusterForGroup(GroupContext groupContext, String group, Packet packet) { TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig(); TioClusterVo tioClusterVo = new TioClusterVo(packet); tioClusterVo.setGroup(group); tioClusterConfig.publishAsyn(tioClusterVo); }
處理訂閱消息
其實在t-io
中有默認實現,具體的代碼以下
public void setTioClusterConfig(TioClusterConfig tioClusterConfig) { this.tioClusterConfig = tioClusterConfig; if (this.tioClusterConfig != null) { this.tioClusterConfig.addMessageListener(new DefaultMessageListener(this)); } }
org.tio.core.cluster.DefaultMessageListener 有詳細的註釋慢慢品讀
package org.tio.core.cluster; import org.apache.commons.lang3.StringUtils; import org.redisson.api.listener.MessageListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.tio.core.Tio; import org.tio.core.GroupContext; import org.tio.core.intf.Packet; import org.tio.utils.json.Json; import java.util.Objects; import java.util.concurrent.atomic.AtomicLong; /** * 默認的集羣消息監聽類 * 做者: 陳磊(Cooppor) * 日期: 2018-05-28 15:08 */ public class DefaultMessageListener implements MessageListener<TioClusterVo> { private static Logger log = LoggerFactory.getLogger(DefaultMessageListener.class); /** * 收到了多少次topic */ private static final AtomicLong RECEIVED_TOPIC_COUNT = new AtomicLong(); private GroupContext groupContext; public DefaultMessageListener(GroupContext groupContext) { this.groupContext = groupContext; } @Override public void onMessage(String channel, TioClusterVo tioClusterVo) { log.info("收到topic:{}, count:{}, tioClusterVo:{}", channel, RECEIVED_TOPIC_COUNT.incrementAndGet(), Json.toJson(tioClusterVo)); String clientid = tioClusterVo.getClientId(); if (StringUtils.isBlank(clientid)) { log.error("clientid is null"); return; } if (Objects.equals(TioClusterVo.CLIENTID, clientid)) { log.info("本身發佈的消息,忽略掉,{}", clientid); return; } Packet packet = tioClusterVo.getPacket(); if (packet == null) { log.error("packet is null"); return; } packet.setFromCluster(true); //發送給全部 boolean isToAll = tioClusterVo.isToAll(); if (isToAll) { Tio.sendToAll(groupContext, packet); } //發送給指定組 String group = tioClusterVo.getGroup(); if (StringUtils.isNotBlank(group)) { Tio.sendToGroup(groupContext, group, packet); } //發送給指定用戶 String userid = tioClusterVo.getUserid(); if (StringUtils.isNotBlank(userid)) { Tio.sendToUser(groupContext, userid, packet); } //發送給指定token String token = tioClusterVo.getToken(); if (StringUtils.isNotBlank(token)) { Tio.sendToToken(groupContext, token, packet); } //發送給指定ip String ip = tioClusterVo.getIp(); if (StringUtils.isNotBlank(ip)) { Tio.sendToIp(groupContext, ip, packet); } //發送給指定channelId String channelId = tioClusterVo.getChannelId(); if (StringUtils.isNotBlank(channelId)) { Tio.sendToId(groupContext, channelId, packet); } } }
哥們,測試時別忘了配置Redis。
/tio-websocket-showcase/src/main/resources/redis.properties
#鏈接池鏈接不夠用時,重試獲取鏈接次數 retrynum = 100 #可用鏈接實例的最大數目,默認值爲8; maxactive = -1 #控制一個pool最多有多少個狀態爲idle(空閒的)的jedis實例,默認值也是8。 maxidle = 20 #等待可用鏈接的最大時間,單位毫秒,默認值爲-1,表示永不超時。 maxwait = 5000 timeout = 2000 #redis所在機器ip host = 127.0.0.1 #redis端口號 port = 6379 #redis密碼 auth =
開啓兩個端口測試
9326
以及9327
到這裏在t-io 中藉助Redis來實現集羣部署實現步驟就介紹完了,我的能力有限若有錯誤歡迎指正。你有更好的解決方案或者建議歡迎一塊兒交流討論,若有疑問歡迎留言。
Fork
的源碼地址 https://gitee.com/javen205/tio-websocket-showcase