瘋狂創客圈,一個Java 高併發研習社羣 【博客園 總入口 】html
瘋狂創客圈,傾力推出: 《Netty Zookeeper Redis 高併發實戰》一書, 面試必備 + 面試必備 + 面試必備java
你們好,我是做者尼恩。目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】。正在開始高併發、億級流程的 IM 聊天程序 學習和實戰node
順便說明下:
本文的內容只是一個初稿、初稿,本文的知識,在《Netty Zookeeper Redis 高併發實戰》一書時,進行大篇幅的完善和更新,而且進行的源碼的升級。 博客和書不同,書的內容更加系統化、全面化,更加層層升入、井井有條、更屢次的錯誤排查,請你們以書的內容爲準。
本文的最終內容, 具體請參考瘋狂創客圈 傾力編著,機械工業出版社出版的 《Netty Zookeeper Redis 高併發實戰》一書 。
mysql
爲何要開始一個高併發IM的實戰呢?web
首先,實戰完成一個分佈式、高併發的IM系統,具備至關的技術挑戰性。這一點,對於從事傳統的企業級WEB開發的兄弟來講,至關於進入了一片全新的天地。企業級WEB,QPS峯值可能在1000之內,甚至在100之內,沒有多少技術挑戰性和含金量,屬於重複性的CRUD的體力活。面試
而一個分佈式、高併發的IM繫系統,面臨的QPS峯值可能在十萬、百萬、千萬,甚至上億級別。若是此縱深的層次化遞進的高併發需求,直接無極限的考驗着系統的性能。須要不斷的從通信的協議、到系統的架構進行優化,對技術能力是一種很是極致的考驗和訓練。redis
其次,不一樣的QPS峯值規模的IM系統,所處的用戶需求環境是不同的。這就形成了不一樣用戶規模的IM系統,都具備必定的實際需求和市場須要,不必定須要全部的系統,都須要上億級的高併發。可是,做爲一個頂級的架構師,就應該具有全棧式的架構能力。須要能適應不一樣的用戶規模的、差別化的技術場景,提供和架構出和對應的場景相互匹配的高併發IM系統。也就是說,IM系統綜合性相對較強,相關的技術須要覆蓋到知足各類不一樣場景的網絡傳輸、分佈式協調、分佈式緩存、服務化架構等等。算法
來具體看看高併發IM的應用場景吧。spring
一切高實時性通信、消息推送的場景,都須要高併發 IM 。sql
隨着移動互聯網、AI的飛速發展,高性能高併發IM(即時通信),有着很是普遍的應用場景。
典型的應用場景以下:私信、聊天、大規模推送、視頻會議、彈幕、抽獎、互動遊戲、基於位置的應用(Uber、滴滴司機位置)、在線教育、智能家居等。
尤爲是對於APP開發的小夥伴們來講,即時通信,已經成爲大多數APP標配。移動互聯網時代,推送(Push)服務成爲App應用不可或缺的重要組成部分,推送服務能夠提高用戶的活躍度和留存率。咱們的手機天天接收到各類各樣的廣告和提示消息等大多數都是經過推送服務實現的。
隨着5G時代物聯網的發展,將來全部接入物聯網的智能設備,都將是IM系統的客戶端,這就意味着推送服務將來會面臨海量的設備和終端接入。爲了支持這些千萬級、億級終端,必定是須要強悍的後臺系統。
有這麼多的應用場景,對於想成長爲JAVA高手的小夥伴們,高併發IM 都繞不開一個話題。
對於想在後臺有所成就的小夥伴們來講,高併發IM實戰,更是在終極BOSS PK以前的一場不可或缺的打怪練手。
總之,真刀真槍的完成一個高併發IM的實戰,既能夠積累到很是全面的高併發經驗,又能夠得到更多的挑戰機會。
總體的架構以下圖:
主要的集羣介紹以下:
(1)Netty 服務集羣
主要用來負責維持和客戶端的TCP鏈接
(2)鏈接器集羣
負責 Netty Server 集羣的管理,包括註冊、路由、負載均衡。集羣IP註冊和節點ID分配。主要在基於Zookeeper集羣提供底層服務,來完成。
(3)緩存集羣
負責用戶、用戶綁定關係、用戶羣組關係、用戶遠程會話等等數據的緩存。緩存臨時數據、加快讀速度。
(4)DB持久層集羣
存在用戶、羣組、離線消息等等
(5)消息隊列集羣
用戶狀態廣播,羣組消息廣播等等。
並無徹底涉及所有的集羣介紹。只是介紹其中的部分核心功能。 若是所有的功能感興趣,請關注瘋狂創客圈的億級流量實戰學習項目。
理論上,以上集羣具有徹底的擴展能力,進行合理的橫向擴展和局部的優化,支撐億級流量,沒有任何問題。
爲何這麼說呢
單體的Netty服務器,遠遠不止支持10萬併發,在CPU 、內存還不錯的狀況下,若是配置得當,甚至能撐到100萬級別。因此,經過合理的高併發架構,可以讓系統動態擴展到成百上千的Netty節點,支撐億級流量,是沒有任何問題的。
單體的Netty服務器,如何支撐100萬高併發,請查詢瘋狂創客圈社羣的文章《Netty 100萬級高併發服務器配置》
IM通信協議,屬於數據交換協議。IM系統的客戶端和服務器節點之間,須要按照同一種數據交換協議,進行數據的交換。
數據交換協議的功能,簡單的說:就是規定網絡中的字節流數據,如何與應用程序須要的結構化數據相互轉換。
數據交換協議主要的工做分爲兩步:結構化數據到二進制數據的序列化和反序列化。
數據交換協議按序列化類型:分爲文本協議和二進制協議。
常見的文本協議包括XML、JSON。文本協議序列化以後,可讀性好,便於調試,方便擴展。但文本協議的缺點在於解析效率通常,有不少的冗餘數據,這一點主要體如今XML格式上。
常見的二進制協議包括PrototolBuff、Thrift,這些協議都自帶了數據壓縮,編解碼效率高,同時兼具擴展性。二進制協議的優點很明顯,可是劣勢也很是的突出。和文本協議相反,序列化以後的二進制協議報文數據,基本上沒有什麼可讀性,很顯然,這點不利於你們開發和調試。
所以,在協議的選擇上,對於併發度不高的IM系統,建議使用文本協議,好比JSON。對於併發度很是之高,QPS在千萬級、億級的通信系統,儘可能選擇二進制的協議。
聽說,微信所使用的數據交換協議,就是 PrototolBuff二進制協議。
什麼是長鏈接呢?
客戶端client向server發起鏈接,server接受client鏈接,雙方創建鏈接。Client與server完成一次讀寫以後,它們之間的鏈接並不會主動關閉,後續的讀寫操做會繼續使用這個鏈接。
你們知道,TCP協議的鏈接過程是比較繁瑣的,創建鏈接是須要三次握手的,而釋放則須要4次握手,因此說每一個鏈接的創建都是須要資源消耗和時間消耗的。
在高併發的IM系統中,客戶端和服務器之間,須要大量的發送通信的消息,若是每次發送消息,都去創建鏈接,客戶端的和服務器的鏈接創建和斷開的開銷是很是巨大的。因此,IM消息的發送,確定是須要長鏈接。
什麼是短鏈接呢?
客戶端client向server發起鏈接,server接受client鏈接,在三次握手以後,雙方創建鏈接。Client與server完成一次讀寫,發送數據包並獲得返回的結果以後,經過客戶端和服務端的四次握手進行關閉斷開。
短鏈接適用於數據請求頻度較低的場景。好比網站的瀏覽和普通的web請求。短鏈接的優勢是:管理起來比較簡單,存在的鏈接都是有用的鏈接,不須要額外的控制手段。
在高併發的IM系統中,客戶端和服務器之間,除了消息的通信外,還須要用戶的登陸與認證、好友的更新與獲取等等一些低頻的請求,這些都使用短鏈接來實現。
綜上所述,在這個高併發IM系統中,存在兩個類的服務器。一類短鏈接服務器和一個長鏈接服務器。
短鏈接服務器也叫Web服務服務器,主要是功能是實現用戶的登陸鑑權和拉取好友、羣組、數據檔案等相對低頻的請求操做。
長鏈接服務器也叫IM即時通信服務器,主要做用就是用來和客戶端創建並維持長鏈接,實現消息的傳遞和即時的轉發。而且,分佈式網絡很是複雜,長鏈接管理是重中之重,須要考慮到鏈接保活、鏈接檢測、自動重連等方方面面的工做。
短鏈接Web服務器和長鏈接IM服務器之間,是相互配合的。在分佈式集羣的環境下,用戶首先經過短鏈接登陸Web服務器。Web服務器在完成用戶的帳號/密碼驗證,返回uid和token時,還須要經過必定策略,獲取目標IM服務器的IP地址和端口號列表,返回給客戶端。客戶端開始鏈接IM服務器,鏈接成功後,發送鑑權請求,鑑權成功則受權的長鏈接正式創建。
若是用戶規模龐大,不管是短鏈接Web服務器,仍是長鏈接IM服務器,都須要進行橫向的擴展,都須要擴展到上十臺、百臺、甚至上千臺機器。只有這樣,纔能有良好性能,提升良好的用戶體驗。所以,須要引入一個新的角色,短鏈接網關(WebGate)。
WebGate短鏈接網關的職責,首先是代理大量的Web服務器,從而無感知的實現短鏈接的高併發。在客戶端登陸時和進行其餘短鏈接時,不直接鏈接Web服務器,而是鏈接Web網關。圍繞Web網關和Web高併發的相關技術,目前很是成熟,可使用SpringCloud 或者 Dubbo 等分佈式Web技術,也很容易擴展。
除此以外,大量的IM服務器,又如何協同和管理呢?
基於Zookeeper或者其餘的分佈式協調中間件,能夠很是方便、輕鬆的實現一個IM服務器集羣的管理,包括並且不限於命名服務、服務註冊、服務發現、負載均衡等管理。
當用戶登陸成功的時候,WebGate短鏈接網關能夠經過負載均衡技術,從Zookeeper集羣中,找出一個可用的IM服務器的地址,返回給用戶,讓用戶來創建長鏈接。
(1)核心:
Netty4.x + spring4.x + zookeeper 3.x + redis 3.x + rocketMQ 3.x
(2)短鏈接服務:spring cloud
基於restful 短鏈接的分佈式微服務架構, 完成用戶在線管理、單點登陸系統。
(3)長鏈接服務:Netty
Netty就不用太多介紹了。
(4)消息隊列:
rocketMQ 高速隊列。整流做用。
(5)底層數據庫:mysql+mongodb
mysql作業務仍是很方便的,用來存儲結構化數據,如用戶數據。
mongodb 很重要,用來存儲非結構化離線消息。
(6)協議 Protobuf + JSON
Protobuf 是最高效的IM二進制協議,用於長鏈接。
JSON 是最緊湊的文本協議,用於短鏈接。
文本協議 Gson + fastjson。 Gson 谷歌的東西,fastjson 淘寶的東西,二者互補,結合使用。
什麼是會話?
爲了方便客戶端的開發,管理與服務器的鏈接,這裏引入一個很是重要的中間角色——Session (會話)。有點兒像Web開發中的Tomcat的服務器 Session,可是又有很大的不一樣。
客戶端的本地會話概念圖,以下圖所示:
客戶端會話有兩個很重的成員,一個是user,表明了擁有會話的用戶。一個是channel,表明了鏈接的通道。兩個成員的做用是:
(1)user成員 —— 經過它能夠得到當前的用戶信息
(2)channel成員 —— 經過它能夠發送Netty消息
Session須要和 channel 相互綁定,爲何呢?緣由有兩點:
(1)消息發送的時候, 須要從Session 寫入 Channel ,這至關於正向的綁定;
(2)收到消息的時候,消息是從Channel 過來的,因此能夠直接找到 綁定的Session ,這至關於反向的綁定。
Session和 channel 相互綁定的代碼以下:
//正向綁定 ClientSession session = new (channel); //反向綁定 channel.attr(ClientSession.SESSION).set(session);
正向綁定,是直接經過ClientSession構造函數完成。反向綁定是經過channel 自身的所具有的容器能力完成。Netty的Channel類型實現了AttributeMap接口 ,它至關於一個 Map容器。 反向的綁定,利用了channel 的這個特色。
總的來講,會話Session 左手用戶實例,右手服務器的channel鏈接通道,能夠說是左擁右抱,是開發中常用到的類。
在分佈式環境下,本地的Session只能綁定本地的用戶和通道,夠不着其餘Netty節點上的用戶和通道。
如何解決這個難題呢? 一個簡單的思路是:製做一個本地Session的副本,保存在分佈式緩存Redis中。對於其餘的Netty節點來講,能夠取到這份Redis副本,從而進行消息的路由和轉發。
基於redis進行分佈式的Session 緩存,與本地Session的內容不同,不須要保存用戶的具體實例,也不須要保存用戶的Netty Channel通道。只須要可以根據它找到對於的Netty服務器節點便可。
咱們將這個Session,命名爲 SessionDistrubuted。代碼以下:
/** \* create by 尼恩 @ 瘋狂創客圈 **/ @Data public class SessionDistrubuted implements ServerSession { //用戶ID private String userId; //Netty 服務器ID private long nodeId; //sessionId private String sessionId; public SessionDistrubuted( String sessionId, String userId, long nodeId) { this.sessionId = sessionId; this.userId = userId; this.nodeId = nodeId; } //... }
如何判斷這個Session是否有效呢? 能夠根據其nodeId,在本地路由器WorkerRouter中查找對應的消息轉發器,若是沒有找到,說明該Netty服務節點是不能夠鏈接的。因而,該Session爲無效。
判斷Session是否有效的代碼以下:
@Override public boolean isValid() { WorkerReSender sender = WorkerRouter.getInst() .getRedirectSender(nodeId); if (null == sender) { return false; } return true; }
只要該Session爲有效。就能夠經過它,轉發消息到目的nodeId對應的Netty 服務器。
代碼以下:
@Override public void writeAndFlush(Object pkg) { WorkerReSender sender = WorkerRouter .getInst().getRedirectSender(nodeId); sender.writeAndFlush(pkg); }
在分佈式環境下,結合本地Session和遠程Session,發送消息也就變得很是之簡單。若是在本地找到了目標的Session,就直接經過其Channel發送消息到客戶端。反之,就經過遠程Session,將消息轉發到客戶端所在的Netty服務器,由該服務器發送到目標客戶端。
顧名思義,計數器是用來計數的。在分佈式環境中,常規的計數器是不能使用的,在此介紹基本zookeeper實現的分佈式計數器。利用ZooKeeper能夠實現一個集羣共享的計數器,只要使用相同的path就能夠獲得最新的計數器值, 這是由ZooKeeper的一致性保證的。
Curator有兩個計數器, 一個是用int來計數(SharedCount),一個用long來計數(DistributedAtomicLong)。
這裏使用DistributedAtomicLong來實現高併發IM系統中的在線用戶統計。
代碼以下:
/** * create by 尼恩 @ 瘋狂創客圈 **/ public class OnlineCounter { private static final int QTY = 5; private static final String PATH = "/im/OnlineCounter"; //Zk客戶端 private CuratorFramework client = null; //單例模式 private static OnlineCounter singleInstance = null; DistributedAtomicLong onlines = null; public static OnlineCounter getInst() { if (null == singleInstance) { singleInstance = new OnlineCounter(); singleInstance.client = ZKclient.instance.getClient(); singleInstance.init(); } return singleInstance; } private void init() { //分佈式計數器,失敗時重試10,每次間隔30毫秒 onlines = new DistributedAtomicLong( client, PATH, new RetryNTimes(10, 30)); } public boolean increment() { boolean result = false; AtomicValue<Long> val = null; try { val = onlines.increment(); result = val.succeeded(); System.out.println("old cnt: " + val.preValue() + " new cnt : " + val.postValue() + " result:" + val.succeeded()); } catch (Exception e) { e.printStackTrace(); } return result; } public boolean decrement() { boolean result = false; AtomicValue<Long> val = null; try { val = onlines.decrement(); result = val.succeeded(); System.out.println("old cnt: " + val.preValue() + " new cnt : " + val.postValue() + " result:" + val.succeeded()); } catch (Exception e) { e.printStackTrace(); } return result; } }
當用戶上線的時候,使用increase方法,分佈式的增長一次數量:
/** \* 增長本地session */ public void addSession(String sessionId, SessionLocal s) { localMap.put(sessionId, s); String uid = s.getUser().getUid(); //增長用戶數 OnlineCounter.getInst().increment(); //... }
當用戶下線的時候,使用decrease方法,分佈式的減小一次數量:
/** \* 刪除本地session */ public void removeLocalSession(String sessionId) { if (!localMap.containsKey(sessionId)) { return; } localMap.remove(sessionId); //減小用戶數 OnlineCounter.getInst().decrement(); //... }
前面提到,一個高併發系統會有不少的節點組成,並且,節點的數量是不斷動態變化的。
在一個即時消息通信系統中,從0到1到N,用戶量可能會愈來愈多,或者說因爲某些活動影響,會不斷的出現流量洪峯。這時須要動態加入大量的節點。另外,因爲機器或者網絡的緣由,一些節點主動的離開的集羣。如何爲大量的動態節點命名呢?最好的辦法是使用分佈式命名服務,按照必定的規則,爲動態上線和下線的工做節點命名。
瘋狂創客圈的高併發IM實戰學習項目,基於Zookeeper構建分佈式命名服務,爲每個IM工做服務器節點動態命名。
首先定義一個POJO類,保存IM worker節點的基礎信息如Netty 服務IP、Netty 服務端口,以及Netty的服務鏈接數。
具體以下:
/** * create by 尼恩 @ 瘋狂創客圈 **/ @Data public class ImNode implements Comparable<ImNode> { //worker 的Id,由Zookeeper負責生成 private long id; //Netty 服務 的鏈接數 private AtomicInteger balance; //Netty 服務 IP private String host; //Netty 服務 端口 private String port; //... }
這個POJO類的IP、端口、balance負載,和每個節點的Netty服務器相關。而id屬性,則由利用Zookeeper的中Znode子節點能順序編號的性質,由Zookeeper生成。
命名服務的思路是:全部的工做節點,都在Zookeeper的同一個的父節點下,建立順序節點。而後從返回的臨時路徑上,取得屬於本身的那個後綴的編號。
主要的代碼以下:
package com.crazymakercircle.imServer.distributed; import com.crazymakercircle.imServer.server.ServerUtils; import com.crazymakercircle.util.ObjectUtil; import com.crazymakercircle.zk.ZKclient; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; /** * create by 尼恩 @ 瘋狂創客圈 **/ public class ImWorker { //Zk curator 客戶端 private CuratorFramework client = null; //保存當前Znode節點的路徑,建立後返回 private String pathRegistered = null; private ImNode node = ImNode.getLocalInstance(); private static ImWorker singleInstance = null; //取得單例 public static ImWorker getInst() { if (null == singleInstance) { singleInstance = new ImWorker(); singleInstance.client = ZKclient.instance.getClient(); singleInstance.init(); } return singleInstance; } private ImWorker() { } // 在zookeeper中建立臨時節點 public void init() { createParentIfNeeded(ServerUtils.MANAGE_PATH); // 建立一個 ZNode 節點 // 節點的 payload 爲當前worker 實例 try { byte[] payload = ObjectUtil.Object2JsonBytes(node); pathRegistered = client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL_SEQUENTIAL) .forPath(ServerUtils.pathPrefix, payload); //爲node 設置id node.setId(getId()); } catch (Exception e) { e.printStackTrace(); } } public long getId() { String sid = null; if (null == pathRegistered) { throw new RuntimeException("節點註冊失敗"); } int index = pathRegistered.lastIndexOf(ServerUtils.pathPrefix); if (index >= 0) { index += ServerUtils.pathPrefix.length(); sid = index <= pathRegistered.length() ? pathRegistered.substring(index) : null; } if (null == sid) { throw new RuntimeException("節點ID生成失敗"); } return Long.parseLong(sid); } public boolean incBalance() { if (null == node) { throw new RuntimeException("尚未設置Node 節點"); } // 增長負載:增長負載,並寫回zookeeper while (true) { try { node.getBalance().getAndIncrement(); byte[] payload = ObjectUtil.Object2JsonBytes(this); client.setData().forPath(pathRegistered, payload); return true; } catch (Exception e) { return false; } } } public boolean decrBalance() { if (null == node) { throw new RuntimeException("尚未設置Node 節點"); } // 增長負載:增長負載,並寫回zookeeper while (true) { try { int i = node.getBalance().decrementAndGet(); if (i < 0) { node.getBalance().set(0); } byte[] payload = ObjectUtil.Object2JsonBytes(this); client.setData().forPath(pathRegistered, payload); return true; } catch (Exception e) { return false; } } } private void createParentIfNeeded(String managePath) { try { Stat stat = client.checkExists().forPath(managePath); if (null == stat) { client.create() .creatingParentsIfNeeded() .withProtection() .withMode(CreateMode.PERSISTENT) .forPath(managePath); } } catch (Exception e) { e.printStackTrace(); } } }
注意,這裏有三個Znode相關的路徑:
(1)MANAGE_PATH
(2)pathPrefix
(3)pathRegistered
第一個MANAGE_PATH是一個常量。爲全部臨時工做Worker節點的父親節點的路徑,在建立Worker節點以前,首先要檢查一下,父親Znode節點是否存在,不然的話,先建立父親節點。父親節點的建立方式是:持久化節點,而不是臨時節點。
檢查和建立父親節點的代碼以下:
private void createParentIfNeeded(String managePath) { try { Stat stat = client.checkExists().forPath(managePath); if (null == stat) { client.create() .creatingParentsIfNeeded() .withProtection() .withMode(CreateMode.PERSISTENT) .forPath(managePath); } } catch (Exception e) { e.printStackTrace(); } }
第二路徑pathPrefix是全部臨時節點的前綴。例子的值「/im/Workers/」,是在工做路徑後,加上一個「/」分割符。也但是在工做路徑的後面,加上「/」分割符和其餘的前綴字符,如:「/im/Workers/id-」,「/im/Workers/seq-」等等。
第三路徑pathRegistered是臨時節點的建立成功以後,返回的完整的路徑。好比:/im/Workers/0000000000,/im/Workers/0000000001 等等。後邊的編號是順序的。
建立節點成功後,截取後邊的數字,放在POJO對象中,供後邊使用:
//爲node 設置id node.setId(getId());
若是鏈接在不一樣的Netty工做站點的客戶端之間,須要相互進行消息的發送,那麼,就須要在不一樣的Worker節點之間進行路由和轉發。
Worker節點路由是指,根據消息須要轉發的目標用戶,找到用戶的鏈接所在的Worker節點。因爲節點和節點之間,都有可能須要相互轉發,因此,節點之間的關係是一種網狀結構。每個節點,都須要具有路由的能力。
爲每個Worker節點增長一個IM路由器類,叫作WorkerRouter 。爲了可以轉發到全部的節點,須要一是要訂閱到集羣中全部的在線Netty服務器,而且保存起來,二是要其餘的Netty服務器創建一個長鏈接,用於轉發消息。
WorkerRouter 核心代碼,節選以下:
/** \* create by 尼恩 @ 瘋狂創客圈 **/ @Slf4j public class WorkerRouter { //Zk客戶端 private CuratorFramework client = null; //單例模式 private static WorkerRouter singleInstance = null; //監聽路徑 private static final String path = "/im/Workers"; //節點的容器 private ConcurrentHashMap<Long, WorkerReSender> workerMap = new ConcurrentHashMap<>(); public static WorkerRouter getInst() { if (null == singleInstance) { singleInstance = new WorkerRouter(); singleInstance.client = ZKclient.instance.getClient(); singleInstance.init(); } return singleInstance; } private void init() { try { //訂閱節點的增長和刪除事件 TreeCache treeCache = new TreeCache(client, path); TreeCacheListener l = new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { ChildData data = event.getData(); if (data != null) { switch (event.getType()) { case NODE_REMOVED: processNodeRemoved(data); break; case NODE_ADDED: processNodeAdded(data); break; default: break; } } } }; treeCache.getListenable().addListener(l); treeCache.start(); } catch (Exception e) { e.printStackTrace(); } //... }
在上一小節中,咱們已經知道,一個節點上線時,首先要經過命名服務,加入到Netty 集羣中。上面的代碼中,WorkerRouter 路由器使用curator的TreeCache 緩存,訂閱了節點的NODE_ADDED節點新增消息。當一個新的Netty節點加入是,經過processNodeAdded(data) 方法, 在本地保存一份節點的POJO信息,而且創建一個消息中轉的Netty客戶鏈接。
處理節點新增的方法 processNodeAdded(data)比較重要,代碼以下:
private void processNodeAdded(ChildData data) { log.info("[TreeCache]節點更新端口, path={}, data={}", data.getPath(), data.getData()); byte[] payload = data.getData(); String path = data.getPath(); ImNode imNode = ObjectUtil.JsonBytes2Object(payload, ImNode.class); long id = getId(path); imNode.setId(id); WorkerReSender reSender = workerMap.get(imNode.getId()); //重複收到註冊的事件 if (null != reSender && reSender.getNode().equals(imNode)) { return; } //服務器從新上線 if (null != reSender) { //關閉老的鏈接 reSender.stopConnecting(); } //建立一個消息轉發器 reSender = new WorkerReSender(imNode); //創建轉發的鏈接 reSender.doConnect(); workerMap.put(id, reSender); }
router路由器有一個容器成員workerMap,用於封裝和保存全部的在線節點。當一個節點新增時,router取到新增的Znode路徑和負載。Znode路徑中有新節點的ID,Znode的payload負載中,有新節點的Netty服務的IP和端口,這個三個信息共同構成新節點的POJO信息 —— ImNode節點信息。 router在檢查完和肯定本地不存在該節點的轉發器後,新增一個轉發器 WorkerReSender,將新節點的轉發器,保存在本身的容器中。
這裏有一個問題,爲何在router路由器中,不簡單、直接、乾脆的保存新節點的POJO信息呢?
由於router路由器的主要做用,除了路由節點,還須要方便的進行消息的轉發,因此,router路由器保存的是轉發器 WorkerReSender,而新增的遠程Netty節點的POJO信息,封裝在轉發器中。
IM轉發器,封裝了遠程節點的IP、端口、以及ID消息,具體是在ImNode類型的成員中。另外,IM轉發器還維持一個到遠程節點的長鏈接。也就是說,它是一個Netty的NIO客戶端,維護了一個到遠程節點的Netty Channel 通道成員,經過這個通道,將消息轉發給遠程的節點。
IM轉發器的核心代碼,以下:
package com.crazymakercircle.imServer.distributed; import com.crazymakercircle.im.common.bean.User; import com.crazymakercircle.im.common.codec.ProtobufEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.GenericFutureListener; import lombok.Data; import lombok.extern.slf4j.Slf4j; import java.util.Date; import java.util.concurrent.TimeUnit; /** * create by 尼恩 @ 瘋狂創客圈 **/ @Slf4j @Data public class WorkerReSender { //鏈接遠程節點的Netty 通道 private Channel channel; //鏈接遠程節點的POJO信息 private ImNode remoteNode; /** * 鏈接標記 */ private boolean connectFlag = false; GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) -> { log.info(": 分佈式鏈接已經斷開……", remoteNode.toString()); channel = null; connectFlag = false; WorkerRouter.getInst().removeWorkerById(remoteNode); }; private GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> { final EventLoop eventLoop = f.channel().eventLoop(); if (!f.isSuccess()) { log.info("鏈接失敗!在10s以後準備嘗試重連!"); eventLoop.schedule( () -> WorkerReSender.this.doConnect(), 10, TimeUnit.SECONDS); connectFlag = false; } else { connectFlag = true; log.info("分佈式IM節點鏈接成功:", remoteNode.toString()); channel = f.channel(); channel.closeFuture().addListener(closeListener); } }; private Bootstrap b; private EventLoopGroup g; public WorkerReSender(ImNode n) { this.remoteNode = n; /** * 客戶端的是Bootstrap,服務端的則是 ServerBootstrap。 * 都是AbstractBootstrap的子類。 **/ b = new Bootstrap(); /** * 經過nio方式來接收鏈接和處理鏈接 */ g = new NioEventLoopGroup(); } // 鏈接和重連 public void doConnect() { // 服務器ip地址 String host = remoteNode.getHost(); // 服務器端口 int port = Integer.parseInt(remoteNode.getPort()); try { if (b != null && b.group() == null) { b.group(g); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); b.remoteAddress(host, port); // 設置通道初始化 b.handler( new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) { ch.pipeline().addLast(new ProtobufEncoder()); } } ); log.info(new Date() + "開始鏈接分佈式節點", remoteNode.toString()); ChannelFuture f = b.connect(); f.addListener(connectedListener); // 阻塞 // f.channel().closeFuture().sync(); } else if (b.group() != null) { log.info(new Date() + "再一次開始鏈接分佈式節點", remoteNode.toString()); ChannelFuture f = b.connect(); f.addListener(connectedListener); } } catch (Exception e) { log.info("客戶端鏈接失敗!" + e.getMessage()); } } public void stopConnecting() { g.shutdownGracefully(); connectFlag = false; } public void writeAndFlush(Object pkg) { if (connectFlag == false) { log.error("分佈式節點未鏈接:", remoteNode.toString()); return; } channel.writeAndFlush(pkg); } }
IM轉發器中,主體是與Netty相關的代碼,比較簡單。至少,IM轉發器比Netty服務器的代碼,簡單太多了。
轉發器有一個消息轉發的方法,直接經過Netty channel通道,將消息發送到遠程節點。
public void writeAndFlush(Object pkg) { if (connectFlag == false) { log.error("分佈式節點未鏈接:", remoteNode.toString()); return; } channel.writeAndFlush(pkg); }
理論上來講,負載均衡是一種手段,用來把對某種資源的訪問分攤給不一樣的服務器,從而減輕單點的壓力。
在高併發的IM系統中,負載均衡就是須要將IM長鏈接分攤到不一樣的Netty服務器,防止單個Netty服務器負載過大,而致使其不可用。
前面講到,當用戶登陸成功的時候,短鏈接網關WebGate須要返回給用戶一個可用的Netty服務器的地址,讓用戶來創建Netty長鏈接。而每臺Netty工做服務器在啓動時,都會去zookeeper的「/im/Workers」節點下注冊臨時節點。
所以,短鏈接網關WebGate能夠在用戶登陸成功以後,去「/im/Workers」節點下取得全部可用的Netty服務器列表,並經過必定的負載均衡算法計算得出一臺Netty工做服務器,而且返回給客戶端。
短鏈接網關WebGate 得到Netty服務器的地址,經過查詢Zookeeper集羣來實現。定義一個負載均衡器,ImLoadBalance類 ,將計算最佳服務器的算法,放在負載均衡器中。
ImLoadBalance類 的核心代碼,以下:
package com.crazymakercircle.Balance; import com.crazymakercircle.ObjectUtil; import com.crazymakercircle.util.ImNode; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * create by 尼恩 @ 瘋狂創客圈 **/ @Data @Slf4j public class ImLoadBalance { //Zk客戶端 private CuratorFramework client = null; //工做節點的路徑 private String mangerPath = "/im/Workers"; public ImLoadBalance() { } public ImLoadBalance(CuratorFramework client, String mangerPath) { this.client = client; this.mangerPath = mangerPath; } public static ImLoadBalance instance() { } public ImNode getBestWorker() { List<ImNode> workers =getWorkers(); ImNode best= balance(workers); return best; } protected ImNode balance(List<ImNode> items) { if (items.size() > 0) { // 根據balance值由小到大排序 Collections.sort(items); // 返回balance值最小的那個 return items.get(0); } else { return null; } } ///.... }
短鏈接網關WebGate 會調用getBestWorker()方法,取得最佳的IM服務器。而在這個方法中,有兩個很重要的方法。 一個是取得全部的IM服務器列表,注意是帶負載的。二個是經過負載信息,計算最小負載的服務器。
全部的IM服務器列表的代碼以下:
/** * 從zookeeper中拿到全部IM節點 */ protected List<ImNode> getWorkers() { List<ImNode> workers = new ArrayList<ImNode>(); List<String> children = null; try { children = client.getChildren().forPath(mangerPath); } catch (Exception e) { e.printStackTrace(); return null; } for (String child : children) { log.info("child:", child); byte[] payload = null; try { payload = client.getData().forPath(child); } catch (Exception e) { e.printStackTrace(); } if (null == payload) { continue; } ImNode worker = ObjectUtil. JsonBytes2Object(payload, ImNode.class); workers.add(worker); } return workers; }
代碼中,首先取得 "/im/Workers" 目錄下全部的臨時節點,使用的是curator的getChildren 獲取子節點方法。而後,經過getData方法,取得每個子節點的二進制負載。最後,將負載信息轉成成 POJO ImNode 對象。
取到了工做節點的POJO 列表以後,經過一個簡單的算法,計算出balance值最小的ImNode對象。
取得最小負載的 balance 方法的代碼以下:
protected ImNode balance (List<ImNode> items) { if (items.size() > 0) { // 根據balance由小到大排序 Collections.sort(items); // 返回balance值最小的那個 return items.get(0); } else { return null; } }
在用戶登陸的Http API 方法中,調用ImLoadBalance類的getBestWorker()方法,取得最佳的IM服務器信息,返回給登陸的客戶端。
核心代碼以下:
@EnableAutoConfiguration @RestController @RequestMapping(value = "/user", produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public class UserAction extends BaseController { @Resource private UserService userService; @RequestMapping(value = "/login/{username}/{password}") public String loginAction( @PathVariable("username") String username, @PathVariable("password") String password) { User user = new User(); user.setUserName(username); user.setPassWord(password); User loginUser = userService.login(user); ImNode best=ImLoadBalance.instance().getBestWorker(); LoginBack back =new LoginBack(); back.setImNode(best); back.setUser(loginUser); back.setToken(loginUser.getUserId().toString()); String r = super.getJsonResult(back); return r; } //.... }
目前和幾個小夥伴一塊兒,組織了一個高併發的實戰社羣【瘋狂創客圈】,完成整個項目的完整的架構和開發實戰,歡迎參與。
Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
瘋狂創客圈 【 博客園 總入口 】