適合新手:從零開發一個IM服務端(基於Netty,有完整源碼)

本文由「yuanrw」分享,博客:juejin.im/user/5cefab8451882510eb758606,收錄時內容有改動和修訂。php

0、引言

站長提示:本文適合IM新手閱讀,但最好有必定的網絡編程經驗,必竟實踐性的代碼上手就是網絡編程。若是你對網絡編程,以及IM的一些理論知識知之甚少,請務必首先閱讀:《新手入門一篇就夠:從零開發移動端IM》,該文爲IM小白分類整理了詳盡的理論資料,請按需補充相關知識。html

配套源碼:本文寫的比較淺顯但不太易懂,建議結合代碼一塊兒來讀,文章配套的完整源碼 請從本文文末 「十一、完整源碼下載」 處下載!java

學習交流:node

- 即時通信/推送技術開發交流5羣:215477170 [推薦]mysql

- 移動端IM開發入門文章:《新手入門一篇就夠:從零開發移動端IMgit

(本文同步發佈於:www.52im.net/thread-2768…
github

一、內容概述

首先講講IM(即時通信)技術能夠用來作什麼:redis

1)聊天:qq、微信;算法

2)直播:鬥魚直播、抖音;sql

3)實時位置共享、遊戲多人互動等等。

能夠說幾乎全部高實時性的應用場景都須要用到IM技術。

本篇將帶你們從零開始搭建一個輕量級的IM服務端。

麻雀雖小,五臟俱全,咱們搭建的IM服務端實現如下功能:

1)一對一的文本消息、文件消息通訊;

2)每一個消息有「已發送」/「已送達」/「已讀」回執;

3)存儲離線消息;

4)支持用戶登陸,好友關係等基本功能;

5)可以方便地水平擴展。

經過這個項目能學到不少後端必備知識:

1)rpc通訊;

2)數據庫;

3)緩存;

4)消息隊列;

5)分佈式、高併發的架構設計;

6)docker部署。

二、相關文章

更多實踐性代碼參考:

相關IM架構方面的文章:

三、消息通訊

3.1 文本消息

咱們先從最簡單的特性開始實現:一個普通消息的發送。

消息格式以下:

message ChatMsg{

id= 1;

//消息id

fromId = Alice

//發送者userId

destId = Bob

//接收者userId

msgBody = hello

//消息體

}

如上圖,咱們如今有兩個用戶:Alice和Bob鏈接到了服務器,當Alice發送消息message(hello)給Bob,服務端接收到消息,根據消息的destId進行轉發,轉發給Bob。

3.2 發送回執

那咱們要怎麼來實現回執的發送呢?

咱們定義一種回執數據格式ACK,MsgType有三種,分別是sent(已發送),delivered(已送達), read(已讀)。

消息格式以下:

message AckMsg {

id;

//消息id

fromId;

//發送者id

destId;

//接收者id

msgType;

//消息類型

ackMsgId;

//確認的消息id

}


enum MsgType {

DELIVERED;

READ;

}

當服務端接受到Alice發來的消息時:

1)向Alice發送一個sent(hello)表示消息已經被髮送到服務器:

message AckMsg {

id= 2;

fromId = Alice;

destId = Bob;

msgType = SENT;

ackMsgId = 1;

}

2)服務器把hello轉發給Bob後,馬上向Alice發送delivered(hello)表示消息已經發送給Bob:

message AckMsg {

id= 3;

fromId = Bob;

destId = Alice;

msgType = DELIVERED;

ackMsgId = 1;

}

3)Bob閱讀消息後,客戶端向服務器發送read(hello)表示消息已讀:

message AckMsg {

id= 4;

fromId = Bob;

destId = Alice;

msgType = READ;

ackMsgId = 1;

}

這個消息會像一個普通聊天消息同樣被服務器處理,最終發送給Alice。

在服務器這裏不區分ChatMsg和AckMsg,處理過程都是同樣的:解析消息的destId並進行轉發。

四、水平擴展

當用戶量愈來愈大,必然須要增長服務器的數量,用戶的鏈接被分散在不一樣的機器上。此時,就須要存儲用戶鏈接在哪臺機器上。

咱們引入一個新的模塊來管理用戶的鏈接信息。

4.1 管理用戶狀態

模塊叫作user status,共有三個接口:

public interface UserStatusService {


/**

* 用戶上線,存儲userId與機器id的關係

*

* @param userId

* @param connectorId

* @return 若是當前用戶在線,則返回他鏈接的機器id,不然返回null

*/

String online(String userId, String connectorId);


/**

* 用戶下線

*

* @param userId

*/

voidoffline(String userId);


/**

* 經過用戶id查找他當前鏈接的機器id

*

* @param userId

* @return

*/

String getConnectorId(String userId);

}

這樣咱們就可以對用戶鏈接狀態進行管理了,具體的實現應考慮服務的用戶量、指望性能等進行實現。

此處咱們使用redis來實現,將userId和connectorId的關係以key-value的形式存儲。

4.2 消息轉發

除此以外,還須要一個模塊在不一樣的機器上轉發消息,以下結構:

此時咱們的服務被拆分紅了connector和transfer兩個模塊,connector模塊用於維持用戶的長連接,而transfer的做用是將消息在多個connector之間轉發。

如今Alice和Bob鏈接到了兩臺connector上,那麼消息要如何傳遞呢?

1)Alice上線,鏈接到機器[1]上時:

1.1)將Alice和它的鏈接存入內存中。

1.2)調用user status的online方法記錄Alice上線。

2)Alice發送了一條消息給Bob:

2.1)機器[1]收到消息後,解析destId,在內存中查找是否有Bob。

2.2)若是沒有,表明Bob未鏈接到這臺機器,則轉發給transfer。

3)transfer調用user status的getConnectorId(Bob)方法找到Bob所鏈接的connector,返回機器[2],則轉發給機器[2]。

流程圖:

4.3 總結

引入user status模塊管理用戶鏈接,transfer模塊在不一樣的機器之間轉發,使服務能夠水平擴展。爲了知足實時轉發,transfer須要和每臺connector機器都保持長連接。

五、離線消息

若是用戶當前不在線,就必須把消息持久化下來,等待用戶下次上線再推送,這裏使用mysql存儲離線消息。

爲了方便地水平擴展,咱們使用消息隊列進行解耦:

1)transfer接收到消息後若是發現用戶不在線,就發送給消息隊列入庫;

2)用戶登陸時,服務器從庫里拉取離線消息進行推送。

六、用戶登陸、好友關係

用戶的註冊登陸、帳戶管理、好友關係鏈等功能更適合使用http協議,所以咱們將這個模塊作成一個restful服務,對外暴露http接口供客戶端調用。

至此服務端的基本架構就完成了:

七、中場休息 ... ...

本文以上內容,本篇幫你們構建了IM服務端的架構,但還有不少細節須要咱們去思考。

例如:

1)如何保證消息的順序和惟一

2)多個設備在線如何保證消息一致性

3)如何處理消息發送失敗

4)消息的安全性

5)若是要存儲聊天記錄要怎麼作

6)數據庫分表分庫

7)服務高可用

……

更多細節實現請繼續讀下半部分啦~

八、可靠性

什麼是可靠性?對於一個IM系統來講,可靠的定義至少是不丟消息、消息不重複、不亂序,知足這三點,才能說有一個好的聊天體驗。

8.1 不丟消息

咱們先從不丟消息開始講起。

首先複習一下上面章節中設計的服務端架構:

咱們先從一個簡單例子開始思考:當Alice給Bob發送一條消息時,可能要通過這樣一條鏈路:

1)client-->connecter

2)connector-->transfer

3)transfer-->connector

4)connector-->client

在這整個鏈路中的每一個環節都有可能出問題,雖然tcp協議是可靠的,可是它只能保證鏈路層的可靠,沒法保證應用層的可靠。

例如在第一步中,connector收到了從client發出的消息,可是轉發給transfer失敗,那麼這條消息Bob就沒法收到,而Alice也不會意識到消息發送失敗了。

若是Bob狀態是離線,那麼消息鏈路就是:

1)client-->connector

2)connector-->transfer

3)transfer-->mq

若是在第三步中,transfer收到了來自connector的消息,可是離線消息入庫失敗,那麼這個消息也是傳遞失敗了。

爲了保證應用層的可靠,咱們必需要有一個ack機制,使發送方可以確認對方收到了這條消息。

具體的實現,咱們模仿tcp協議作一個應用層的ack機制。

tcp的報文是以字節(byte)爲單位的,而咱們以message單位。

發送方每次發送一個消息,就要等待對方的ack迴應,在ack確認消息中應該帶有收到的id以便發送方識別。

其次,發送方須要維護一個等待ack的隊列。 每次發送一個消息以後,就將消息和一個計時器入隊。

另外存在一個線程一直輪詢隊列,若是有超時未收到ack的,就取出消息重發。

超時未收到ack的消息有兩種處理方式:

1)和tcp同樣不斷髮送直到收到ack爲止。

2)設定一個最大重試次數,超過這個次數還沒收到ack,就使用失敗機制處理,節約資源。例如若是是connector長時間未收到client的ack,那麼能夠主動斷開和客戶端的鏈接,剩下未發送的消息就做爲離線消息入庫,客戶端斷連後嘗試重連服務器便可。

8.2 不重複、不亂序

有的時候由於網絡緣由可能致使ack收到較慢,發送方就會重複發送,那麼接收方必須有一個去重機制。

去重的方式是給每一個消息增長一個惟一id。這個惟一id並不必定是全局的,只須要在一個會話中惟一便可。例如某兩我的的會話,或者某一個羣。若是網絡斷連了,從新鏈接後,就是新的會話了,id會從新從0開始。

關於消息ID的生成算法方面的文章,請詳細參考:

融雲技術分享:解密融雲IM產品的聊天消息ID生成策略

微信技術分享:微信的海量IM聊天消息序列號生成實踐(算法原理篇)

微信技術分享:微信的海量IM聊天消息序列號生成實踐(容災方案篇)

美團技術分享:深度解密美團的分佈式ID生成算法

接收方須要在當前會話中維護收到的最後一個消息的id,叫作lastId。

每次收到一個新消息, 就將id與lastId做比較看是否連續,若是不連續,就放入一個暫存隊列 queue中稍後處理。

例如:

1)當前會話的lastId=1,接着服務器收到了消息msg(id=2),能夠判斷收到的消息是連續的,就處理消息,將lastId修改成2;

2)可是若是服務器收到消息msg(id=3),就說明消息亂序到達了,那麼就將這個消息入隊,等待lastId變爲2後,(即服務器收到消息msg(id=2)並處理完了),再取出這個消息處理。

所以,判斷消息是否重複只須要判斷msgId>lastId && !queue.contains(msgId)便可。若是收到重複的消息,能夠判斷是ack未送達,就再發送一次ack。

接收方收到消息後完整的處理流程以下:

僞代碼以下:

class ProcessMsgNode{

/**

* 接收到的消息

*/

privateMessage message;

/**

* 處理消息的方法

*/

privateConsumer<Message> consumer;

}


public CompletableFuture<Void> offer(Long id,Message message,Consumer<Message> consumer) {

if(isRepeat(id)) {

//消息重複

sendAck(id);

return null;

}

if(!isConsist(id)) {

//消息不連續

notConsistMsgMap.put(id, newProcessMsgNode(message, consumer));

return null;

}

//處理消息

returnprocess(id, message, consumer);

}


private CompletableFuture<Void> process(Long id, Message message, Consumer<Message> consumer) {

return CompletableFuture

.runAsync(() -> consumer.accept(message))

.thenAccept(v -> sendAck(id))

.thenAccept(v -> lastId.set(id))

.thenComposeAsync(v -> {

Long nextId = nextId(id);

if(notConsistMsgMap.containsKey(nextId)) {

//隊列中有下個消息

ProcessMsgNode node = notConsistMsgMap.get(nextId);

returnprocess(nextId, node.getMessage(), consumer);

} else{

//隊列中沒有下個消息

CompletableFuture<Void> future = newCompletableFuture<>();

future.complete(null);

returnfuture;

}

})

.exceptionally(e -> {

logger.error("[process received msg] has error", e);

returnnull;

});

}

九、安全性

不管是聊天記錄仍是離線消息,確定都會在服務端存儲備份,那麼消息的安全性,保護客戶的隱私也相當重要。

所以全部的消息都必需要加密處理。

在存儲模塊裏,維護用戶信息和關係鏈有兩張基礎表,分別是im_user用戶表和im_relation關係鏈表。

im_user表用於存放用戶常規信息,例如用戶名密碼等,結構比較簡單。

im_relation表用於記錄好友關係。

結構以下:

CREATE TABLE `im_relation` (

`id` bigint(20) COMMENT '關係id',

`user_id1` varchar(100) COMMENT '用戶1id',

`user_id2` varchar(100) COMMENT '用戶2id',

`encrypt_key` char(33) COMMENT 'aes密鑰',

`gmt_create` timestamp DEFAULT CURRENT_TIMESTAMP,

`gmt_update` timestamp DEFAUL TCURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

PRIMARYKEY(`id`),

UNIQUE KEY `USERID1_USERID2` (`user_id1`,`user_id2`)

);

1)user_id1和user_id2是互爲好友的用戶id,爲了不重複,存儲時按照user_id1<user_id2的順序存,而且加上聯合索引;

2)encrypt_key是隨機生成的密鑰。當客戶端登陸時,就會從數據庫中獲取該用戶的全部的relation,存在內存中,以便後續加密解密;

3)當客戶端給某個好友發送消息時,取出內存中該關係的密鑰,加密後發送。一樣,當收到一條消息時,取出相應的密鑰解密。

客戶端完整登陸流程以下:

1)client調用rest接口登陸;

2)client調用rest接口獲取該用戶全部relation;

3)client向connector發送greet消息,通知上線;

4)connector拉取離線消息推送給client;

5)connector更新用戶session。

那爲何connector要先推送離線消息再更新session呢?

咱們思考一下若是順序倒過來會發生什麼:

1)用戶Alice登陸服務器;

2)connector更新session;

3)推送離線消息;

4)此時Bob發送了一條消息給Alice。

若是離線消息還在推送的過程當中,Bob發送了新消息給Alice,服務器獲取到Alice的session,就會馬上推送。這時新消息就有可能夾在一堆離線消息當中推過去了,那這時,Alice收到的消息就亂序了。

而咱們必須保證離線消息的順序在新消息以前。

那麼若是先推送離線消息,以後才更新session。在離線消息推送的過程當中,Alice的狀態就是「未上線」,這時Bob新發送的消息只會入庫im_offline,im_offline表中的數據被讀完以後纔會「上線」開始接受新消息。這也就避免了亂序。

十、存儲設計

10.1 存儲離線消息

當用戶不在線時,離線消息必然要存儲在服務端,等待用戶上線再推送。理解了上一個小節後,離線消息的存儲就很是容易了。

增長一張離線消息表im_offline,表結構以下:

CREATE TABLE `im_offline` (

`id` int(11) COMMENT '主鍵',

`msg_id` bigint(20) COMMENT '消息id',

`msg_type` int(2) COMMENT '消息類型',

`content` varbinary(5000) COMMENT '消息內容',

`to_user_id` varchar(100) COMMENT '收件人id',

`has_read` tinyint(1) COMMENT '是否閱讀',

`gmt_create` timestamp COMMENT '建立時間',

PRIMARY KEY(`id`)

);

msg_type用於區分消息類型(chat,ack),content加密後的消息內容以byte數組的形式存儲。

用戶上線時按照條件to_user_id=用戶id拉取記錄便可。

10.2 防止離線消息重複推送

咱們思考一下多端登陸的狀況,Alice有兩臺設備同時登錄,在這種併發的狀況下,咱們就須要某種機制來保證離線消息只被讀取一次。

這裏利用CAS機制來實現:

1)首先取出全部has_read=false的字段;

2)檢查每條消息的has_read值是否爲false,若是是,則改成true。這是原子操做:

1updateim_offline sethas_read = truewhereid = ${msg_id} andhas_read = false

3)修改爲功則推送,失敗則不推送。

相信到這裏,同窗們已經能夠本身動手搭建一個完整可用的IM服務端了。

十一、完整源碼下載

從零開發一個IM服務端(完整源碼)(52im.net).zip或自行從github下載:github.com/52im/IM

附錄:更多IM開發文章

[1] 更多IM代碼實踐(適合新手):
自已開發IM有那麼難嗎?手把手教你自擼一個Andriod版簡易IM (有源碼)
一種Android端IM智能心跳算法的設計與實現探討(含樣例代碼)
手把手教你用Netty實現網絡通訊程序的心跳機制、斷線重連機制
詳解Netty的安全性:原理介紹、代碼演示(上篇)
詳解Netty的安全性:原理介紹、代碼演示(下篇)
微信本地數據庫破解版(含iOS、Android),僅供學習研究 [附件下載]
Java NIO基礎視頻教程、MINA視頻教程、Netty快速入門視頻 [有源碼]
輕量級即時通信框架MobileIMSDK的iOS源碼(開源版)[附件下載]
開源IM工程「蘑菇街TeamTalk」2015年5月前未刪減版完整代碼 [附件下載]
微信本地數據庫破解版(含iOS、Android),僅供學習研究 [附件下載]
NIO框架入門(一):服務端基於Netty4的UDP雙向通訊Demo演示 [附件下載]
NIO框架入門(二):服務端基於MINA2的UDP雙向通訊Demo演示 [附件下載]
NIO框架入門(三):iOS與MINA二、Netty4的跨平臺UDP雙向通訊實戰 [附件下載]
NIO框架入門(四):Android與MINA二、Netty4的跨平臺UDP雙向通訊實戰 [附件下載]
用於IM中圖片壓縮的Android工具類源碼,效果可媲美微信 [附件下載]
高仿Android版手機QQ可拖拽未讀數小氣泡源碼 [附件下載]
一個WebSocket實時聊天室Demo:基於node.js+socket.io [附件下載]
Android聊天界面源碼:實現了聊天氣泡、表情圖標(可翻頁) [附件下載]
高仿Android版手機QQ首頁側滑菜單源碼 [附件下載]
開源libco庫:單機千萬鏈接、支撐微信8億用戶的後臺框架基石 [源碼下載]
分享java AMR音頻文件合併源碼,全網最全
微信團隊原創Android資源混淆工具:AndResGuard [有源碼]
一個基於MQTT通訊協議的完整Android推送Demo [附件下載]
Android版高仿微信聊天界面源碼 [附件下載]
高仿手機QQ的Android版鎖屏聊天消息提醒功能 [附件下載]
高仿iOS版手機QQ錄音及振幅動畫完整實現 [源碼下載]
Android端社交應用中的評論和回覆功能實戰分享[圖文+源碼]
Android端IM應用中的@人功能實現:仿微博、QQ、微信,零入侵、高可擴展[圖文+源碼]
仿微信的IM聊天時間顯示格式(含iOS/Android/Web實現)[圖文+源碼]
Android版仿微信朋友圈圖片拖拽返回效果 [源碼下載]
適合新手:從零開發一個IM服務端(基於Netty,有完整源碼)
>> 更多同類文章 ……

[2] IM羣聊相關的技術文章:
快速裂變:見證微信強大後臺架構從0到1的演進歷程(一)
如何保證IM實時消息的「時序性」與「一致性」?
IM單聊和羣聊中的在線狀態同步應該用「推」仍是「拉」?
IM羣聊消息如此複雜,如何保證不丟不重?
微信後臺團隊:微信後臺異步消息隊列的優化升級實踐分享
移動端IM中大規模羣消息的推送如何保證效率、實時性?
現代IM系統中聊天消息的同步和存儲方案探討
關於IM即時通信羣聊消息的亂序問題討論
IM羣聊消息的已讀回執功能該怎麼實現?
IM羣聊消息到底是存1份(即擴散讀)仍是存多份(即擴散寫)?
一套高可用、易伸縮、高併發的IM羣聊、單聊架構方案設計實踐
[技術腦洞] 若是把14億中國人拉到一個微信羣裏技術上能實現嗎?
IM羣聊機制,除了循環去發消息還有什麼方式?如何優化?
網易雲信技術分享:IM中的萬人羣聊技術方案實踐總結
>> 更多同類文章 ……

[3] 有關IM架構設計的文章:
淺談IM系統的架構設計
簡述移動端IM開發的那些坑:架構設計、通訊協議和客戶端
一套海量在線用戶的移動端IM架構設計實踐分享(含詳細圖文)
一套原創分佈式即時通信(IM)系統理論架構方案
從零到卓越:京東客服即時通信系統的技術架構演進歷程
蘑菇街即時通信/IM服務器開發之架構選擇
騰訊QQ1.4億在線用戶的技術挑戰和架構演進之路PPT
微信後臺基於時間序的海量數據冷熱分級架構設計實踐
微信技術總監談架構:微信之道——大道至簡(演講全文)
如何解讀《微信技術總監談架構:微信之道——大道至簡》
快速裂變:見證微信強大後臺架構從0到1的演進歷程(一)
17年的實踐:騰訊海量產品的技術方法論
移動端IM中大規模羣消息的推送如何保證效率、實時性?
現代IM系統中聊天消息的同步和存儲方案探討
IM開發基礎知識補課(二):如何設計大量圖片文件的服務端存儲架構?
IM開發基礎知識補課(三):快速理解服務端數據庫讀寫分離原理及實踐建議
IM開發基礎知識補課(四):正確理解HTTP短鏈接中的Cookie、Session和Token
WhatsApp技術實踐分享:32人工程團隊創造的技術神話
微信朋友圈千億訪問量背後的技術挑戰和實踐總結
王者榮耀2億用戶量的背後:產品定位、技術架構、網絡方案等
IM系統的MQ消息中間件選型:Kafka仍是RabbitMQ?
騰訊資深架構師乾貨總結:一文讀懂大型分佈式系統設計的方方面面
以微博類應用場景爲例,總結海量社交系統的架構設計步驟
快速理解高性能HTTP服務端的負載均衡技術原理
子彈短信光鮮的背後:網易雲信首席架構師分享億級IM平臺的技術實踐
知乎技術分享:從單機到2000萬QPS併發的Redis高性能緩存實踐之路
IM開發基礎知識補課(五):通俗易懂,正確理解並用好MQ消息隊列
微信技術分享:微信的海量IM聊天消息序列號生成實踐(算法原理篇)
微信技術分享:微信的海量IM聊天消息序列號生成實踐(容災方案篇)
新手入門:零基礎理解大型分佈式架構的演進歷史、技術原理、最佳實踐
一套高可用、易伸縮、高併發的IM羣聊、單聊架構方案設計實踐
阿里技術分享:深度揭祕阿里數據庫技術方案的10年變遷史
阿里技術分享:阿里自研金融級數據庫OceanBase的艱辛成長之路
社交軟件紅包技術解密(一):全面解密QQ紅包技術方案——架構、技術實現等
社交軟件紅包技術解密(二):解密微信搖一搖紅包從0到1的技術演進
社交軟件紅包技術解密(三):微信搖一搖紅包雨背後的技術細節
社交軟件紅包技術解密(四):微信紅包系統是如何應對高併發的
社交軟件紅包技術解密(五):微信紅包系統是如何實現高可用性的
社交軟件紅包技術解密(六):微信紅包系統的存儲層架構演進實踐
社交軟件紅包技術解密(七):支付寶紅包的海量高併發技術實踐
社交軟件紅包技術解密(八):全面解密微博紅包技術方案
社交軟件紅包技術解密(九):談談手Q紅包的功能邏輯、容災、運維、架構等
即時通信新手入門:一文讀懂什麼是Nginx?它可否實現IM的負載均衡?
即時通信新手入門:快速理解RPC技術——基本概念、原理和用途
多維度對比5款主流分佈式MQ消息隊列,媽媽不再擔憂個人技術選型了
從游擊隊到正規軍:馬蜂窩旅遊網的IM系統架構演進之路
IM開發基礎知識補課(六):數據庫用NoSQL仍是SQL?讀這篇就夠了!
>> 更多同類文章 ……

(本文同步發佈於:www.52im.net/thread-2768…

相關文章
相關標籤/搜索