你們新年快樂!git
新的一年第一篇技術文章但願開個好頭,因此元旦三天我也沒怎麼閒着,但願給你們帶來一篇比較感興趣的乾貨內容。github
老讀者應該還記得我在去年國慶節前分享過一篇《設計一個百萬級的消息推送系統》;雖然我在文中有貼一些僞代碼,依然有些朋友但願能直接分享一些能夠運行的源碼;這麼久了是時候把坑填上了。算法
本文較長,高能預警;帶好瓜子板凳。緩存
因而在以前的基礎上我完善了一些內容,先來看看這個項目的介紹吧:服務器
CIM(CROSS-IM)
一款面向開發者的 IM(即時通信)
系統;同時提供了一些組件幫助開發者構建一款屬於本身可水平擴展的 IM
。架構
藉助 CIM
你能夠實現如下需求:負載均衡
IM
即時通信系統。APP
的消息推送中間件。IOT
海量鏈接場景中的消息透傳中間件。完整源碼託管在 GitHub : https://github.com/crossoverJie/cimide
本次主要涉及到 IM 即時通信,因此特意錄了兩段視頻演示(羣聊、私聊)。工具
點擊下方連接能夠查看視頻版 Demo。性能
YouTube | Bilibili |
---|---|
羣聊 私聊 | 羣聊 私聊 |
![]() |
![]() |
也在公網部署了一套演示環境,想要試一試的能夠聯繫我加入內測羣獲取帳號一塊兒尬聊😋。
下面來看看具體的架構設計。
CIM
中的各個組件均採用 SpringBoot
構建。Netty + Google Protocol Buffer
構建底層通訊。Redis
存放各個客戶端的路由信息、帳號信息、在線狀態等。Zookeeper
用於 IM-server
服務的註冊與發現。總體主要由如下模塊組成:
IM
服務端;用於接收 client
鏈接、消息透傳、消息推送等功能。
支持集羣部署。
消息路由服務器;用於處理消息路由、消息轉發、用戶登陸、用戶下線以及一些運營工具(獲取在線用戶數等)。
IM
客戶端;給用戶使用的消息終端,一個命令便可啓動並向其餘人發起通信(羣聊、私聊);同時內置了一些經常使用命令方便使用。
總體的流程也比較簡單,流程圖以下:
route
發起登陸。Zookeeper
中選擇可用 IM-server
返回給客戶端,並保存登陸、路由信息到 Redis
。IM-server
發起長鏈接,成功後保持心跳。route
清除狀態信息。因此當咱們本身部署時須要如下步驟:
Redis、Zookeeper
。cim-server
,這是真正的 IM 服務器,爲了知足性能需求因此支持水平擴展,只須要註冊到同一個 Zookeeper
便可。cim-forward-route
,這是路由服務器,全部的消息都須要通過它。因爲它是無狀態的,因此也能夠利用 Nginx
代理提升可用性。cim-client
真正面向用戶的客戶端;啓動以後會自動鏈接 IM 服務器即可以在控制檯收發消息了。更多使用介紹能夠參考快速啓動。
接下來重點看看具體的實現,好比羣聊、私聊消息如何流轉;IM 服務端負載均衡;服務如何註冊發現等等。
先來看看服務端;主要是實現客戶端上下線、消息下發等功能。
首先是服務啓動:
因爲是在 SpringBoot
中搭建的,因此在應用啓動時須要啓動 Netty
服務。
從 pipline
中能夠看出使用了 Protobuf
的編解碼(具體報文在客戶端中分析)。
須要知足 IM
服務端的水平擴展需求,因此 cim-server
是須要將自身數據發佈到註冊中心的。
這裏參考以前分享的《搞定服務註冊與發現》有具體介紹。
因此在應用啓動成功後須要將自身數據註冊到 Zookeeper
中。
最主要的目的就是將當前應用的 ip + cim-server-port+ http-port
註冊上去。
上圖是我在演示環境中註冊的兩個 cim-server
實例(因爲在一臺服務器,因此只是端口不一樣)。
這樣在客戶端(監聽這個 Zookeeper
節點)就能實時的知道目前可用的服務信息。
當客戶端請求 cim-forward-route
中的登陸接口(詳見下文)作完業務驗證(就至關於平常登陸其餘網站同樣)以後,客戶端會向服務端發起一個長鏈接,如以前的流程所示:
這時客戶端會發送一個特殊報文,代表當前是登陸信息。
服務端收到後就須要將該客戶端的 userID
和當前 Channel
通道關係保存起來。
同時也緩存了用戶的信息,也就是 userID
和 用戶名。
當客戶端斷線後也須要將剛纔緩存的信息清除掉。
同時也須要調用 route
接口清除相關信息(具體接口看下文)。
從架構圖中能夠看出,路由層是很是重要的一環;它提供了一系列的 HTTP
服務承接了客戶端和服務端。
目前主要是如下幾個接口。
因爲每個客戶端都是須要登陸才能使用的,因此第一步天然是註冊。
這裏就設計的比較簡單,直接利用 Redis
來存儲用戶信息;用戶信息也只有 ID
和 userName
而已。
只是爲了方便查詢在 Redis
中的 KV
又反過來存儲了一份 VK
,這樣 ID
和 userName
都必須惟一。
這裏的登陸和 cim-server
中的登陸不同,具備業務性質,
Zookeeper
中獲取服務列表(cim-server
)並根據某種算法選擇一臺服務返回給客戶端。Redis
中。爲了實現只能一個用戶登陸,使用了 Redis
中的 set
來保存登陸信息;利用 userID
做爲 key
,重複的登陸就會寫入失敗。
相似於 Java 中的 HashSet,只能去重保存。
獲取一臺可用的路由實例也比較簡單:
Zookeeper
獲取全部的服務實例作一個內部緩存。固然要獲取 Zookeeper
中的服務實例前天然是須要監聽 cim-server
以前註冊上去的那個節點。
具體代碼以下:
也是在應用啓動以後監聽 Zookeeper
中的路由節點,一旦發生變化就會更新內部緩存。
這裏使用的是 Guava 的 cache,它基於
ConcurrentHashMap
,因此能夠保證清除、新增緩存
的原子性。
這是一個真正發消息的接口,實現的效果就是其中一個客戶端發消息,其他全部客戶端都能收到!
流程確定是客戶端發送一條消息到服務端,服務端收到後在上文介紹的 SessionSocketHolder
中遍歷全部 Channel
(通道)而後下發消息便可。
服務端是單機倒也能夠,但如今是集羣設計。因此全部的客戶端會根據以前的輪詢算法分配到不一樣的 cim-server
實例中。
所以就須要路由層來發揮做用了。
路由接口收到消息後首先遍歷出全部的客戶端和服務實例的關係。
路由關係在 Redis
中的存放以下:
因爲 Redis
單線程的特質,當數據量大時;一旦使用 keys 匹配全部 cim-route:*
數據,會致使 Redis 不能處理其餘請求。
因此這裏改成使用 scan 命令來遍歷全部的 cim-route:*
。
接着會挨個調用每一個客戶端所在的服務端的 HTTP
接口用於推送消息。
在 cim-server
中的實現以下:
cim-server
收到消息後會在內部緩存中查詢該 userID 的通道,接着只須要發消息便可。
這是一個輔助接口,能夠查詢出當前在線用戶信息。
實現也很簡單,也就是查詢以前保存 」用戶登陸狀態的那個去重 set
「便可。
之因此說獲取在線用戶是一個輔助接口,其實就是用於輔助私聊使用的。
通常咱們使用私聊的前提確定得知道當前哪些用戶在線,接着你纔會知道你要和誰進行私聊。
相似於這樣:
在咱們這個場景中,私聊的前提就是須要得到在線用戶的 userID
。
因此私聊接口在收到消息後須要查詢到接收者所在的 cim-server
實例信息,後續的步驟就和羣聊一致了。調用接收者所在實例的 HTTP
接口下發信息。
只是羣聊是遍歷全部的在線用戶,私聊只發送一個的區別。
一旦客戶端下線,咱們就須要將以前存放在 Redis
中的一些信息刪除掉(路由信息、登陸狀態)。
客戶端中的一些邏輯其實在上文已經談到一些了。
第一步也就是登陸,須要在啓動時調用 route
的登陸接口,得到 cim-server
信息再建立鏈接。
登陸過程當中 route
接口會判斷是否爲重複登陸,重複登陸則會直接退出程序。
接下來是利用 route
接口返回的 cim-server
實例信息(ip+port
)建立鏈接。
最後一步就是發送一個登陸標誌的信息到服務端,讓它保持客戶端和 Channel
的關係。
上文提到的一些登陸報文、真正的消息報文
這些其實都是在咱們自定義協議中能夠區別出來的。
因爲是使用 Google Protocol Buffer
編解碼,因此先看看原始格式。
其實這個協議中目前一共就三個字段:
requestId
能夠理解爲 userId
。reqMsg
就是真正的消息。type
也就是上文提到的消息類別。目前主要是三種類型,分別對應不一樣的業務:
爲了保持客戶端和服務端的鏈接,每隔一段時間沒有發送消息都須要自動的發送心跳。
目前的策略是每隔一分鐘就是發送一個心跳包到服務端:
這樣服務端每隔一分鐘沒有收到業務消息時就會收到 ping
的心跳包:
客戶端也內置了一些基本命令來方便使用。
命令 | 描述 |
---|---|
:q |
退出客戶端 |
:olu |
獲取全部在線用戶信息 |
:all |
獲取全部命令 |
: |
更多命令正在開發中。。 |
好比輸入 :q
就會退出客戶端,同時會關閉一些系統資源。
當輸入 :olu
(onlineUser
的簡寫)就會去調用 route
的獲取全部在線用戶接口。
羣聊的使用很是簡單,只須要在控制檯輸入消息回車便可。
這時會去調用 route
的羣聊接口。
私聊也是同理,但前提是須要觸發關鍵字;使用 userId;;消息內容
這樣的格式纔會給某個用戶發送消息,因此通常都須要先使用 :olu
命令獲取因此在線用戶才方便使用。
爲了知足一些定製需求,好比消息須要保存之類的。
因此在客戶端收到消息以後會回調一個接口,在這個接口中能夠自定義實現。
所以先建立了一個 caller
的 bean
,這個 bean
中包含了一個 CustomMsgHandleListener
接口,須要自行處理只須要實現此接口便可。
因爲我本身不怎麼會寫界面,但保不許有其餘大牛會寫。因此客戶端中的羣聊、私聊、獲取在線用戶、消息回調等業務(以及以後的業務)都是以接口形式提供。
也方便後面作頁面集成,只須要調這些接口就好了;具體實現不用怎麼關心。
cim
目前只是初版,BUG 多,功能少(只拉了幾個羣友作了測試);不事後續還會接着完善,至少這一版會給那些沒有相關經驗的朋友帶來一些思路。
後續計劃:
完整源碼:
https://github.com/crossoverJie/cim
若是這篇對你有所幫助還請不吝轉發。