好消息:IM1.0.0版本已經上線啦,支持特性:html
github連接: https://github.com/yuanrw/IMnode
本篇將帶你們從零開始搭建一個輕量級的IM服務端,IM的總體設計思路和架構在個人上篇博客中已經講過了,沒看過的同窗請點擊從零開始開發IM(即時通信)服務端 。git
這篇將給你們帶來更多的細節實現。我將從三個方面來闡述如何構建一個完整可靠的IM系統。github
什麼是可靠性?對於一個IM系統來講,可靠的定義至少是不丟消息、消息不重複、不亂序,知足這三點,才能說有一個好的聊天體驗。數據庫
咱們先從不丟消息開始講起。數組
首先複習一下上一篇設計的服務端架構:
安全
咱們先從一個簡單例子開始思考:當Alice給Bob發送一條消息時,可能要通過這樣一條鏈路:
服務器
在這整個鏈路中的每一個環節都有可能出問題,雖然tcp協議是可靠的,可是它只能保證鏈路層的可靠,沒法保證應用層的可靠。網絡
例如在第一步中,connector
收到了從client
發出的消息,可是轉發給transfer
失敗,那麼這條消息Bob就沒法收到,而Alice也不會意識到消息發送失敗了。session
若是Bob狀態是離線,那麼消息鏈路就是:
若是在第三步中,transfer
收到了來自connector
的消息,可是離線消息入庫失敗,
那麼這個消息也是傳遞失敗了。
爲了保證應用層的可靠,咱們必需要有一個ack機制,使發送方可以確認對方收到了這條消息。
具體的實現,咱們模仿tcp協議作一個應用層的ack機制。
tcp的報文是以字節(byte)
爲單位的,而咱們以message
單位。
發送方每次發送一個消息,就要等待對方的ack迴應,在ack確認消息中應該帶有收到的id以便發送方識別。
其次,發送方須要維護一個等待ack的隊列。 每次發送一個消息以後,就將消息和一個計時器入隊。
另外存在一個線程一直輪詢隊列,若是有超時未收到ack的,就取出消息重發。
超時未收到ack的消息有兩種處理方式:
connector
長時間未收到client
的ack,那麼能夠主動斷開和客戶端的鏈接,剩下未發送的消息就做爲離線消息入庫,客戶端斷連後嘗試重連服務器便可。有的時候由於網絡緣由可能致使ack收到較慢,發送方就會重複發送,那麼接收方必須有一個去重機制。
去重的方式是給每一個消息增長一個惟一id。這個惟一id並不必定是全局的,只須要在一個會話中惟一便可。例如某兩我的的會話,或者某一個羣。若是網絡斷連了,從新鏈接後,就是新的會話了,id會從新從0開始。
接收方須要在當前會話中維護收到的最後一個消息的id,叫作lastId
。
每次收到一個新消息, 就將id與lastId
做比較看是否連續,若是不連續,就放入一個暫存隊列 queue中稍後處理。
例如:
當前會話的lastId
=1,接着服務器收到了消息msg(id=2)
,能夠判斷收到的消息是連續的,就處理消息,將lastId
修改成2。
可是若是服務器收到消息msg(id=3)
,就說明消息亂序到達了,那麼就將這個消息入隊,等待lastId
變爲2後,(即服務器收到消息msg(id=2)
並處理完了),再取出這個消息處理。
所以,判斷消息是否重複只須要判斷msgId>lastId && !queue.contains(msgId)
便可。若是收到重複的消息,能夠判斷是ack未送達,就再發送一次ack。
接收方收到消息後完整的處理流程以下:
僞代碼以下:
class ProcessMsgNode{ /** * 接收到的消息 */ private Message message; /** * 處理消息的方法 */ private Consumer<Message> consumer; } public CompletableFuture<Void> offer(Long id,Message message,Consumer<Message> consumer) { if (isRepeat(id)) { //消息重複 sendAck(id); return null; } if (!isConsist(id)) { //消息不連續 notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer)); return null; } //處理消息 return process(id, message, consumer); } private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) { return CompletableFuture .runAsync(() -> consumer.accept(message)) .thenAccept(v -> sendAck(id)) .thenAccept(v -> lastId.set(id)) .thenComposeAsync(v -> { Long nextId = nextId(id); if (notConsistMsgMap.containsKey(nextId)) { //隊列中有下個消息 ProcessMsgNode node = notConsistMsgMap.get(nextId); return process(nextId, node.getMessage(), consumer); } else { //隊列中沒有下個消息 CompletableFuture<Void> future = new CompletableFuture<>(); future.complete(null); return future; } }) .exceptionally(e -> { logger.error("[process received msg] has error", e); return null; }); }
不管是聊天記錄仍是離線消息,確定都會在服務端存儲備份,那麼消息的安全性,保護客戶的隱私也相當重要。
所以全部的消息都必需要加密處理。
在存儲模塊裏,維護用戶信息和關係鏈有兩張基礎表,分別是im_user
用戶表和im_relation
關係鏈表。
im_user
表用於存放用戶常規信息,例如用戶名密碼等,結構比較簡單。im_relation
表用於記錄好友關係,結構以下:CREATE TABLE `im_relation` ( `id` bigint(20) COMMENT '關係id', `user_id1` varchar(100) COMMENT '用戶1id', `user_id2` varchar(100) COMMENT '用戶2id', `encrypt_key` char(33) COMMENT 'aes密鑰', `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP, `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (`id`), UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`) );
user_id1
和user_id2
是互爲好友的用戶id,爲了不重複,存儲時按照user_id1
<user_id2
的順序存,而且加上聯合索引。encrypt_key
是隨機生成的密鑰。當客戶端登陸時,就會從數據庫中獲取該用戶的全部的relation
,存在內存中,以便後續加密解密。客戶端完整登陸流程以下:
relation
。那爲何connector要先推送離線消息再更新session呢?咱們思考一下若是順序倒過來會發生什麼:
Alice
登陸服務器connector
更新session若是離線消息還在推送的過程當中,Bob發送了新消息給Alice,服務器獲取到Alice的session,就會馬上推送。這時新消息就有可能夾在一堆離線消息當中推過去了,那這時,Alice收到的消息就亂序了。
而咱們必須保證離線消息的順序在新消息以前。
那麼若是先推送離線消息,以後才更新session。在離線消息推送的過程當中,Alice的狀態就是「未上線」,這時Bob新發送的消息只會入庫im_offline
,im_offline
表中的數據被讀完以後纔會「上線」開始接受新消息。這也就避免了亂序。
當用戶不在線時,離線消息必然要存儲在服務端,等待用戶上線再推送。理解了上一個小節後,離線消息的存儲就很是容易了。增長一張離線消息表im_offline
,表結構以下:
CREATE TABLE `im_offline` ( `id` int(11) COMMENT '主鍵', `msg_id` bigint(20) COMMENT '消息id', `msg_type` int(2) COMMENT '消息類型', `content` varbinary(5000) COMMENT '消息內容', `to_user_id` varchar(100) COMMENT '收件人id', `has_read` tinyint(1) COMMENT '是否閱讀', `gmt_create` timestamp COMMENT '建立時間', PRIMARY KEY (`id`) );
msg_type
用於區分消息類型(chat
,ack
),content
加密後的消息內容以byte數組的形式存儲。
用戶上線時按照條件to_user_id=用戶id
拉取記錄便可。
咱們思考一下多端登陸的狀況,Alice有兩臺設備同時登錄,在這種併發的狀況下,咱們就須要某種機制來保證離線消息只被讀取一次。
這裏利用CAS機制來實現:
has_read=false
的字段。has_read
值是否爲false,若是是,則改成true。這是原子操做。update im_offline set has_read = true where id = ${msg_id} and has_read = false
相信到這裏,同窗們已經能夠本身動手搭建一個完整可用的IM服務端了。更多問題歡迎評論區留言~~
IM1.0.0版本已上線,github連接:
https://github.com/yuanrw/IM
以爲對你有幫助請點個star吧~!