從零開始開發IM(即時通信)服務端(二)

好消息:IM1.0.0版本已經上線啦,支持特性html

  • 私聊發送文本/文件
  • 已發送/已送達/已讀回執
  • 支持使用ldap登陸
  • 支持接入外部的登陸認證系統
  • 提供客戶端jar包,方便客戶端開發

github連接: https://github.com/yuanrw/IMnode

本篇將帶你們從零開始搭建一個輕量級的IM服務端,IM的總體設計思路和架構在個人上篇博客中已經講過了,沒看過的同窗請點擊從零開始開發IM(即時通信)服務端git

這篇將給你們帶來更多的細節實現。我將從三個方面來闡述如何構建一個完整可靠的IM系統。github

  1. 可靠性
  2. 安全性
  3. 存儲設計

可靠性

什麼是可靠性?對於一個IM系統來講,可靠的定義至少是不丟消息消息不重複不亂序,知足這三點,才能說有一個好的聊天體驗。數據庫

不丟消息

咱們先從不丟消息開始講起。數組

首先複習一下上一篇設計的服務端架構
im-structure.png安全

咱們先從一個簡單例子開始思考:當Alice給Bob發送一條消息時,可能要通過這樣一條鏈路:
route服務器

  1. client-->connecter
  2. connector-->transfer
  3. transfer-->connector
  4. connector-->client

在這整個鏈路中的每一個環節都有可能出問題,雖然tcp協議是可靠的,可是它只能保證鏈路層的可靠,沒法保證應用層的可靠。網絡

例如在第一步中,connector收到了從client發出的消息,可是轉發給transfer失敗,那麼這條消息Bob就沒法收到,而Alice也不會意識到消息發送失敗了。session

若是Bob狀態是離線,那麼消息鏈路就是:

  1. client-->connector
  2. connector-->transfer
  3. transfer-->mq

若是在第三步中,transfer收到了來自connector的消息,可是離線消息入庫失敗,
那麼這個消息也是傳遞失敗了。
爲了保證應用層的可靠,咱們必需要有一個ack機制,使發送方可以確認對方收到了這條消息。

具體的實現,咱們模仿tcp協議作一個應用層的ack機制。

tcp的報文是以字節(byte)爲單位的,而咱們以message單位。
ack
發送方每次發送一個消息,就要等待對方的ack迴應,在ack確認消息中應該帶有收到的id以便發送方識別。

其次,發送方須要維護一個等待ack的隊列。 每次發送一個消息以後,就將消息和一個計時器入隊。

另外存在一個線程一直輪詢隊列,若是有超時未收到ack的,就取出消息重發。

超時未收到ack的消息有兩種處理方式:

  1. 和tcp同樣不斷髮送直到收到ack爲止。
  2. 設定一個最大重試次數,超過這個次數還沒收到ack,就使用失敗機制處理,節約資源。例如若是是connector長時間未收到client的ack,那麼能夠主動斷開和客戶端的鏈接,剩下未發送的消息就做爲離線消息入庫,客戶端斷連後嘗試重連服務器便可。

不重複、不亂序

有的時候由於網絡緣由可能致使ack收到較慢,發送方就會重複發送,那麼接收方必須有一個去重機制。
去重的方式是給每一個消息增長一個惟一id。這個惟一id並不必定是全局的,只須要在一個會話中惟一便可。例如某兩我的的會話,或者某一個羣。若是網絡斷連了,從新鏈接後,就是新的會話了,id會從新從0開始。

接收方須要在當前會話中維護收到的最後一個消息的id,叫作lastId
每次收到一個新消息, 就將id與lastId做比較看是否連續,若是不連續,就放入一個暫存隊列 queue中稍後處理。

例如:

  • 當前會話的lastId=1,接着服務器收到了消息msg(id=2),能夠判斷收到的消息是連續的,就處理消息,將lastId修改成2。

  • 可是若是服務器收到消息msg(id=3),就說明消息亂序到達了,那麼就將這個消息入隊,等待lastId變爲2後,(即服務器收到消息msg(id=2)並處理完了),再取出這個消息處理。

所以,判斷消息是否重複只須要判斷msgId>lastId && !queue.contains(msgId)便可。若是收到重複的消息,能夠判斷是ack未送達,就再發送一次ack。

接收方收到消息後完整的處理流程以下:
offer.png

僞代碼以下:

class ProcessMsgNode{
    /**
     * 接收到的消息
     */
    private Message message;
    /**
     * 處理消息的方法
     */
    private Consumer<Message> consumer;
}

public CompletableFuture<Void> offer(Long id,Message     message,Consumer<Message> consumer) {
    if (isRepeat(id)) {
    //消息重複
        sendAck(id);
        return null;
    }
    if (!isConsist(id)) {
    //消息不連續
        notConsistMsgMap.put(id, new ProcessMsgNode(message, consumer));
        return null;
    }
    //處理消息
    return process(id, message, consumer);
}

private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) {
    return CompletableFuture
        .runAsync(() -> consumer.accept(message))
        .thenAccept(v -> sendAck(id))
        .thenAccept(v -> lastId.set(id))
        .thenComposeAsync(v -> {
            Long nextId = nextId(id);
            if (notConsistMsgMap.containsKey(nextId)) {
                //隊列中有下個消息
                ProcessMsgNode node = notConsistMsgMap.get(nextId);
                return process(nextId, node.getMessage(), consumer);
            } else {
                //隊列中沒有下個消息
                CompletableFuture<Void> future = new CompletableFuture<>();
                future.complete(null);
                return future;
            }
        })
        .exceptionally(e -> {
            logger.error("[process received msg] has error", e);
            return null;
        });
}

安全性

不管是聊天記錄仍是離線消息,確定都會在服務端存儲備份,那麼消息的安全性,保護客戶的隱私也相當重要。
所以全部的消息都必需要加密處理。
在存儲模塊裏,維護用戶信息和關係鏈有兩張基礎表,分別是im_user用戶表和im_relation關係鏈表。

  • im_user表用於存放用戶常規信息,例如用戶名密碼等,結構比較簡單。
  • im_relation表用於記錄好友關係,結構以下:
CREATE TABLE `im_relation` (
  `id` bigint(20) COMMENT '關係id',
  `user_id1` varchar(100) COMMENT '用戶1id',
  `user_id2` varchar(100) COMMENT '用戶2id',
  `encrypt_key` char(33) COMMENT 'aes密鑰',
  `gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,
  `gmt_update` timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, 
  PRIMARY KEY (`id`),
  UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)
);
  • user_id1user_id2是互爲好友的用戶id,爲了不重複,存儲時按照user_id1<user_id2的順序存,而且加上聯合索引。
  • encrypt_key是隨機生成的密鑰。當客戶端登陸時,就會從數據庫中獲取該用戶的全部的relation,存在內存中,以便後續加密解密。
  • 當客戶端給某個好友發送消息時,取出內存中該關係的密鑰,加密後發送。一樣,當收到一條消息時,取出相應的密鑰解密。

客戶端完整登陸流程以下:
login process

  1. client調用rest接口登陸。
  2. client調用rest接口獲取該用戶全部relation
  3. client向connector發送greet消息,通知上線。
  4. connector拉取離線消息推送給client。
  5. connector更新用戶session。

那爲何connector要先推送離線消息再更新session呢?咱們思考一下若是順序倒過來會發生什麼:

  1. 用戶Alice登陸服務器
  2. connector更新session
  3. 推送離線消息
  4. 此時Bob發送了一條消息給Alice

若是離線消息還在推送的過程當中,Bob發送了新消息給Alice,服務器獲取到Alice的session,就會馬上推送。這時新消息就有可能夾在一堆離線消息當中推過去了,那這時,Alice收到的消息就亂序了。

而咱們必須保證離線消息的順序在新消息以前。

那麼若是先推送離線消息,以後才更新session。在離線消息推送的過程當中,Alice的狀態就是「未上線」,這時Bob新發送的消息只會入庫im_offlineim_offline表中的數據被讀完以後纔會「上線」開始接受新消息。這也就避免了亂序。

存儲設計

存儲離線消息

當用戶不在線時,離線消息必然要存儲在服務端,等待用戶上線再推送。理解了上一個小節後,離線消息的存儲就很是容易了。增長一張離線消息表im_offline,表結構以下:

CREATE TABLE `im_offline` (
  `id` int(11) COMMENT '主鍵',
  `msg_id` bigint(20) COMMENT '消息id',
  `msg_type` int(2) COMMENT '消息類型',
  `content` varbinary(5000) COMMENT '消息內容',
  `to_user_id` varchar(100) COMMENT '收件人id',
  `has_read` tinyint(1) COMMENT '是否閱讀',
  `gmt_create` timestamp COMMENT '建立時間',
  PRIMARY KEY (`id`)
);

msg_type用於區分消息類型(chat,ack),content加密後的消息內容以byte數組的形式存儲。
用戶上線時按照條件to_user_id=用戶id拉取記錄便可。

防止離線消息重複推送

咱們思考一下多端登陸的狀況,Alice有兩臺設備同時登錄,在這種併發的狀況下,咱們就須要某種機制來保證離線消息只被讀取一次。

這裏利用CAS機制來實現:

  1. 首先取出全部has_read=false的字段。
  2. 檢查每條消息的has_read值是否爲false,若是是,則改成true。這是原子操做。
update im_offline set has_read = true where id = ${msg_id} and has_read = false
  1. 修改爲功則推送,失敗則不推送。

相信到這裏,同窗們已經能夠本身動手搭建一個完整可用的IM服務端了。更多問題歡迎評論區留言~~

IM1.0.0版本已上線,github連接:
https://github.com/yuanrw/IM
以爲對你有幫助請點個star吧~!

相關文章
相關標籤/搜索