瘋狂創客圈 Java 分佈式聊天室【 億級流量】實戰系列之 -25【 博客園 總入口 】html
Java基礎練習中,一個重要的實戰練習是: java的聊天程序。基本上,每個java工程師,都有寫過本身的聊天程序。java
實現一個Java的分佈式的聊天程序的分佈式練習,一樣很是重要的是。有如下幾個方面的最重要做用:node
1 體驗高併發的程序的開發:從研究承載千、萬QPS級的流量,拓展可以承載百萬級、千萬級、億萬級流量git
2 有分佈式、高併發的實戰經驗,面試談薪水的時候,能提高很多web
3 Netty集羣的分佈式原理,和大數據的分佈式原理,elasticsearch 的分佈式原理,和redis集羣的分佈式原理,和mongodb的分佈式原理,很大程度上,都是想通。 Netty集羣做爲一個實戰開發, 是一個很是好的分佈式基礎練習面試
4 更多的理由,請參考機械工業出版社的書籍 《Netty Zookeeper Redis 高併發實戰》redis
本文的代碼,來自於開源項目CrazyIm , 項目的地址爲
https://gitee.com/crazymaker/crazy_tourist_circle__im.gitspring源碼 目前已經完成了基本的通訊,在不斷迭代中,很多的羣友,經過瘋狂創客圈的QQ羣,溝通迭代過程當中的問題。mongodb
zookeeper做爲註冊中心,每個netty服務啓動的時候,把節點的信息好比ip地址+端口號註冊到zookeeper上。apache
具體的原理,請參見書籍《Netty Zookeeper Redis 高併發實戰》。
package com.crazymakercircle.entity; import lombok.Data; import java.io.Serializable; import java.util.Objects; /** * IM節點的POJO類 * create by 尼恩 @ 瘋狂創客圈 **/ @Data public class ImNode implements Comparable<ImNode>, Serializable { private static final long serialVersionUID = -499010884211304846L; //worker 的Id,zookeeper負責生成 private long id; //Netty 服務 的鏈接數 private Integer balance = 0; //Netty 服務 IP private String host; //Netty 服務 端口 private Integer port; public ImNode() { } public ImNode(String host, Integer port) { this.host = host; this.port = port; } @Override public String toString() { return "ImNode{" + "id='" + id + '\'' + "host='" + host + '\'' + ", port='" + port + '\'' + ",balance=" + balance + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ImNode node = (ImNode) o; // return id == node.id && return Objects.equals(host, node.host) && Objects.equals(port, node.port); } @Override public int hashCode() { return Objects.hash(id, host, port); } /** * 升序排列 */ public int compareTo(ImNode o) { int weight1 = this.balance; int weight2 = o.balance; if (weight1 > weight2) { return 1; } else if (weight1 < weight2) { return -1; } return 0; } public void incrementBalance() { balance++; } public void decrementBalance() { balance--; } }
利用zk有一個監聽機制,就是針對某個節點進行監聽,一點這個節點發生了變化就會收到zk的通知。咱們就是利用zk的這個watch來進行服務的上線和下線的通知,也就是咱們的服務發現功能。
package com.crazymakercircle.imServer.distributed; import com.crazymakercircle.constants.ServerConstants; import com.crazymakercircle.entity.ImNode; import com.crazymakercircle.im.common.bean.msg.ProtoMsg; import com.crazymakercircle.imServer.protoBuilder.NotificationMsgBuilder; import com.crazymakercircle.util.JsonUtil; import com.crazymakercircle.util.ObjectUtil; import com.crazymakercircle.zk.ZKclient; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import java.util.concurrent.ConcurrentHashMap; /** * create by 尼恩 @ 瘋狂創客圈 **/ @Slf4j public class PeerManager { //Zk客戶端 private CuratorFramework client = null; private String pathRegistered = null; private ImNode node = null; private static PeerManager singleInstance = null; private static final String path = ServerConstants.MANAGE_PATH; private ConcurrentHashMap<Long, PeerSender> peerMap = new ConcurrentHashMap<>(); public static PeerManager getInst() { if (null == singleInstance) { singleInstance = new PeerManager(); singleInstance.client = ZKclient.instance.getClient(); } return singleInstance; } private PeerManager() { } /** * 初始化節點管理 */ public void init() { try { //訂閱節點的增長和刪除事件 PathChildrenCache childrenCache = new PathChildrenCache(client, path, true); PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { log.info("開始監聽其餘的ImWorker子節點:-----"); ChildData data = event.getData(); switch (event.getType()) { case CHILD_ADDED: log.info("CHILD_ADDED : " + data.getPath() + " 數據:" + data.getData()); processNodeAdded(data); break; case CHILD_REMOVED: log.info("CHILD_REMOVED : " + data.getPath() + " 數據:" + data.getData()); processNodeRemoved(data); break; case CHILD_UPDATED: log.info("CHILD_UPDATED : " + data.getPath() + " 數據:" + new String(data.getData())); break; default: log.debug("[PathChildrenCache]節點數據爲空, path={}", data == null ? "null" : data.getPath()); break; } } }; childrenCache.getListenable().addListener(childrenCacheListener); System.out.println("Register zk watcher successfully!"); childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } catch (Exception e) { e.printStackTrace(); } } private void processNodeRemoved(ChildData data) { byte[] payload = data.getData(); ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class); long id = ImWorker.getInst().getIdByPath(data.getPath()); n.setId(id); log.info("[TreeCache]節點刪除, path={}, data={}", data.getPath(), JsonUtil.pojoToJson(n)); PeerSender peerSender = peerMap.get(n.getId()); if (null != peerSender) { peerSender.stopConnecting(); peerMap.remove(n.getId()); } } private void processNodeAdded(ChildData data) { byte[] payload = data.getData(); ImNode n = ObjectUtil.JsonBytes2Object(payload, ImNode.class); long id = ImWorker.getInst().getIdByPath(data.getPath()); n.setId(id); log.info("[TreeCache]節點更新端口, path={}, data={}", data.getPath(), JsonUtil.pojoToJson(n)); if (n.equals(getLocalNode())) { log.info("[TreeCache]本地節點, path={}, data={}", data.getPath(), JsonUtil.pojoToJson(n)); return; } PeerSender peerSender = peerMap.get(n.getId()); if (null != peerSender && peerSender.getNode().equals(n)) { log.info("[TreeCache]節點重複增長, path={}, data={}", data.getPath(), JsonUtil.pojoToJson(n)); return; } if (null != peerSender) { //關閉老的鏈接 peerSender.stopConnecting(); } peerSender = new PeerSender(n); peerSender.doConnect(); peerMap.put(n.getId(), peerSender); } public PeerSender getPeerSender(long id) { PeerSender peerSender = peerMap.get(id); if (null != peerSender) { return peerSender; } return null; } public void sendNotification(String json) { peerMap.keySet().stream().forEach( key -> { if (!key.equals(getLocalNode().getId())) { PeerSender peerSender = peerMap.get(key); ProtoMsg.Message pkg = NotificationMsgBuilder.buildNotification(json); peerSender.writeAndFlush(pkg); } } ); } public ImNode getLocalNode() { return ImWorker.getInst().getLocalNodeInfo(); } public void remove(ImNode remoteNode) { peerMap.remove(remoteNode.getId()); log.info("[TreeCache]移除遠程節點信息, node={}", JsonUtil.pojoToJson(remoteNode)); } }
什麼是臨時節點?服務啓動後建立臨時節點, 服務斷掉後臨時節點就不存在了
正常的思路多是註冊的時候,咱們像zk註冊一個正常的節點,而後在服務下線的時候刪除這個節點,可是這樣的話會有一個弊端。好比咱們的服務掛機,沒法去刪除臨時節點,那麼這個節點就會被咱們錯誤的提供給了客戶端。
另外咱們還要考慮持久化的節點建立以後刪除之類的問題,問題會更加的複雜化,因此咱們使用了臨時節點。
在咱們解決了服務的註冊和發現問題以後,那麼咱們究竟提供給客戶端那臺服務呢,這時候就須要咱們作出選擇,爲了讓客戶端可以均勻的鏈接到咱們的服務器上(好比有個100個客戶端,2臺服務器,每臺就分配50個),咱們須要使用一個負載均衡的策略。
這裏咱們使用輪詢的方式來爲每一個請求的客戶端分配ip。具體的代碼實現以下:
package com.crazymakercircle.Balance; import com.crazymakercircle.constants.ServerConstants; import com.crazymakercircle.entity.ImNode; import com.crazymakercircle.util.JsonUtil; import com.crazymakercircle.zk.ZKclient; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.curator.framework.CuratorFramework; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * create by 尼恩 @ 瘋狂創客圈 **/ @Data @Slf4j @Service public class ImLoadBalance { //Zk客戶端 private CuratorFramework client = null; private String managerPath; public ImLoadBalance() { this.client = ZKclient.instance.getClient(); // managerPath=ServerConstants.MANAGE_PATH+"/"; managerPath=ServerConstants.MANAGE_PATH; } /** * 獲取負載最小的IM節點 * * @return */ public ImNode getBestWorker() { List<ImNode> workers = getWorkers(); log.info("所有節點以下:"); workers.stream().forEach(node -> { log.info("節點信息:{}", JsonUtil.pojoToJson(node)); }); ImNode best = balance(workers); return best; } /** * 按照負載排序 * * @param items 全部的節點 * @return 負載最小的IM節點 */ protected ImNode balance(List<ImNode> items) { if (items.size() > 0) { // 根據balance值由小到大排序 Collections.sort(items); // 返回balance值最小的那個 ImNode node = items.get(0); log.info("最佳的節點爲:{}", JsonUtil.pojoToJson(node)); return node; } else { return null; } } /** * 從zookeeper中拿到全部IM節點 */ protected List<ImNode> getWorkers() { List<ImNode> workers = new ArrayList<ImNode>(); List<String> children = null; try { children = client.getChildren().forPath(managerPath); } catch (Exception e) { e.printStackTrace(); return null; } for (String child : children) { log.info("child:", child); byte[] payload = null; try { payload = client.getData().forPath(managerPath+"/"+child); } catch (Exception e) { e.printStackTrace(); } if (null == payload) { continue; } ImNode worker = JsonUtil.jsonBytes2Object(payload, ImNode.class); workers.add(worker); } return workers; } /** * 從zookeeper中刪除全部IM節點 */ public void removeWorkers() { try { client.delete().deletingChildrenIfNeeded().forPath(managerPath); } catch (Exception e) { e.printStackTrace(); } } }
Zookeeper的安裝和原理,以及開發的基礎知識,請參見書籍《Netty Zookeeper Redis 高併發實戰》。
啓動zookeeper的兩個節點,原本有三個,啓動二個便可
客戶端鏈接zookeeper集羣。命令以下:
./zkCli.cmd -server localhost:2181
Redis的安裝和原理,以及開發的基礎知識,請參見請參見書籍《Netty Zookeeper Redis 高併發實戰》。
redis 的客戶端界面。
使用一個WEBGate,做爲負載均衡的服務器,具體的原理,請參見書籍《Netty Zookeeper Redis 高併發實戰》。
除了負載均衡,從WEBGate還能夠從 zookeeper中刪除全部IM節點
鏈接爲: http://localhost:8080/swagger-ui.html
swagger 的界面以下:
服務端的端口爲7000
服務端的端口爲7001,自動遞增的
啓動後輸入登陸的信息
請輸入登陸信息,格式爲:用戶名@密碼
z1@1
啓動客戶端後,而且登陸後,會自動鏈接一個netty節點, 這裏爲7001,第二個Netty服務節點。
啓動後輸入登陸的信息
請輸入登陸信息,格式爲:用戶名@密碼
z2@1
啓動客戶端後,而且登陸後,按照負載均衡的機制,會自動鏈接一個netty節點, 這裏爲7000,第一個Netty服務節點。
下面演示,不一樣的客戶端,經過各自的服務器節點,進行通訊。
在第二個客戶端(用戶爲z2),發送消息給第一個客戶端(用戶爲z1),消息的格式爲 :「 內容@用戶名」
請輸入聊天信息,格式爲:內容@用戶名
hello@z1
請輸入聊天信息,格式爲:內容@用戶名
helloworld@z1
經過Netty服務節點的轉發,第一個客戶端收到的消息以下:
收到消息 from uid:z2 -> hello
收到消息 from uid:z2 -> helloworld
經過Netty+Zookeep+Redis的架構,整個Netty的集羣,具有了服務節點的自動發現,節點之間的消息路由的能力。
說明一下,整個程序,仍是比較複雜的,若是看不懂,建議不要捉急,慢慢來。
若是能從0到1的本身實現一版,開發的水平,也就不通常了。
全面的理論基礎,請參見 《Netty Zookeeper Redis 高併發實戰》 一書
《Netty Zookeeper Redis 高併發實戰》 一書,對Netty 集羣的基本原理,進行了詳盡的介紹,大體的目錄以下:
12.1 【面試必備】如何支撐億級流量的高併發IM架構的理論基礎
12.1.1 億級流量的系統架構的開發實踐 338
12.1.2 高併發架構的技術選型 338
12.1.3 詳解IM消息的序列化協議選型 339
12.1.4 詳解長鏈接和短鏈接 339
12.2 分佈式IM的命名服務的實踐案例 340
12.2.1 IM節點的POJO類 341
12.2.2 IM節點的ImWorker類 342
12.3 Worker集羣的負載均衡之實踐案例 345
12.3.1 ImLoadBalance負載均衡器 346
12.3.2 與WebGate的整合 348
12.4 即時通訊消息的路由和轉發的實踐案例 349
12.4.1 IM路由器WorkerRouter 349
12.4.2 IM轉發器WorkerReSender 352
12.5 Feign短鏈接RESTful調用 354
12.5.1 短鏈接API的接口準備 355
12.5.2 聲明遠程接口的本地代理 355
12.5.3 遠程API的本地調用 356
12.6 分佈式的在線用戶統計的實踐案例 358
12.6.1 Curator的分佈式計數器 358
12.6.2 用戶上線和下線的統計 360
Java (Netty) 聊天程序【 億級流量】實戰 開源項目實戰
瘋狂創客圈 【 博客園 總入口 】