WebSocket+Netty構建web聊天程序

WebSocket

傳統的瀏覽器和服務器之間的交互模式是基於請求/響應的模式,雖然可使用js發送定時任務讓瀏覽器在服務器中拉取可是弊端很明顯,首先就是不能避免的延遲,其次就是頻繁的請求,讓服務器的壓力驟然提高html

WebSocket是H5新增的協議,用於構建瀏覽器和服務器之間的不受限的長鏈接的通訊模式,再也不侷限於請求/響應式的模型,服務端能夠主動推送消息給客戶端,(遊戲有某個玩家得獎了的彈幕)基於這個特性咱們能夠構建咱們的實時的通訊程序前端

協議詳情:

websocket創建鏈接時,是經過瀏覽器發送的HTTP請求,報文以下:java

GET ws://localhost:3000/ws/chat HTTP/1.1
Host: localhost
Upgrade: websocket
Connection: Upgrade
Origin: http://localhost:3000
Sec-WebSocket-Key: client-random-string
Sec-WebSocket-Version: 13
  • 首先GET請求是以 ws開頭的
  • 其中請求頭中的Upgrade: websocket Connection: Upgrade表示嘗試創建WebSocket鏈接

對於服務端的相應數據web

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: server-random-string

其中的101,表示服務端支持WebSocket協議, 雙方基於Http請求,成功創建起WebSocket鏈接,雙方之間的通訊也再也不經過HTTPajax

JS對WebSocket的封裝對象

對於JS的WebSocket對象來講,它經常使用 4個回調方法,以及兩個主動方法數據庫

方法名 做用
onopen() 和服務端成功創建鏈接後回調
onmessage(e) 收到服務端的的消息後回調,e爲消息對象
onerror() 連接出現異常回調,如服務端關閉
onclose() 客戶端單方面斷開鏈接時回調
send(e) 主動向服務端推送消息
close() 主動關閉通道

再次對WebSocket進行封裝

知道了回調函數回調時機,咱們接下來要作的就是在他的整個生命週期的不一樣回調函數中,添加咱們指定的動做就ok了,下面是經過Window定義一個全局的聊天對象CHATjson

window.CHAT={
var socket = null;
// 初始化socket
init:function(){
// 判斷當前的瀏覽器是否支持WebSocket
if(window.WebSocket){
    // 檢驗當前的webSocket是否存在,以及鏈接的狀態,如已經鏈接,直接返回
    if(CHAT.socket!=null&&CHAT.socket!=undefined&&CHAT.socket.readyState==WebSocket.OPEN){
        return false;
    }else{// 實例化 , 第二個ws是咱們能夠自定義的, 根據後端的路由來
        CHAT.socket=new WebSocket("ws://192.168.43.10:9999/ws");
        // 初始化WebSocket原生的方法
        CHAT.socket.onopen=CHAT.myopen();
        CHAT.socket.onmessage=CHAT.mymessage();
        CHAT.socket.onerror=CHAT.myerror();
        CHAT.socket.onclose=CHAT.myclose(); 
    
    }
}else{
    alert("當前設備不支持WebSocket");
}
}
// 發送聊天消息
chat:function(msg){
    // 若是的當前的WebSocket是鏈接的狀態,直接發送 不然重新鏈接
    if(CHAT.socket.readyState==WebSocket.OPEN&&CHAT.socket!=null&&CHAT.socket!=undefined){
        socket.send(msg);
    }else{
        // 從新鏈接
        CHAT.init();
        // 延遲一會,重新發送
        setTimeout(1000);
        CHAT.send(msg);
    }
}
// 當鏈接創建完成後對調
myopen:function(){
    // 拉取鏈接創建以前的未簽收的消息記錄
    // 發送心跳包
}
mymessage:function(msg){
    // 由於服務端能夠主動的推送消息,咱們提早定義和後端統一msg的類型, 如,拉取好友信息的消息,或 聊天的消息
    if(msg==聊天內容){
    // 發送請求籤收消息,改變請求的狀態
    // 將消息緩存到本地
    // 將msg 轉換成消息對象, 植入html進行渲染
    }else if(msg==拉取好友列表){
    // 發送請求更新好友列表
    }
    
}
myerror:function(){
    console.log("鏈接出現異常...");
}
myclose:function(){
    console.log("鏈接關閉...");
}
keepalive: function() {
    // 構建對象
    var dataContent = new app.DataContent(app.KEEPALIVE, null, null);
    // 發送心跳
    CHAT.chat(JSON.stringify(dataContent));
    
    // 定時執行函數, 其餘操做
    // 拉取未讀消息
    // 拉取好友信息
}

}

對消息類型的約定

WebSocket對象經過send(msg);方法向後端提交數據,常見的數據以下:後端

  • 客戶端發送聊天消息
  • 客戶端簽收消息
  • 客戶端發送心跳包
  • 客戶端請求創建鏈接

爲了使後端接收到不一樣的類型的數據作出不一樣的動做, 因而咱們約定發送的msg的類型;瀏覽器

// 消息action的枚舉,這個枚舉和後端約定好,統一值
CONNECT: 1,     // 第一次(或重連)初始化鏈接
CHAT: 2,        // 聊天消息
SIGNED: 3,      // 消息簽收
KEEPALIVE: 4,   // 客戶端保持心跳
PULL_FRIEND:5,  // 從新拉取好友

// 消息模型的構造函數
ChatMsg: function(senderId, receiverId, msg, msgId){
    this.senderId = senderId;
    this.receiverId = receiverId;
    this.msg = msg;
    this.msgId = msgId;
}

//  進一步封裝兩個獲得最終版消息模型的構造函數
DataContent: function(action, chatMsg, extand){
    this.action = action;
    this.chatMsg = chatMsg;
    this.extand = extand;
}

如何發送數據?

咱們使用js,給發送按鈕綁定點擊事件,一經觸發,從緩存中獲取出咱們須要的參數,調用緩存

CHAT.chat(Json.stringify(dataContent));

後端netty會解析dataContent的類型,進一步處理

如何簽收未與服務器鏈接時好友發送的消息?

  • 消息的簽收時機:
    之因此會有未簽收的信息,是由於客戶端未與服務端創建WebSocket鏈接, 當服務端判斷他維護的channel組中沒有接受者的channel時,不會發送數據,而是把數據持久化到數據庫,而且標記flag=未讀, 因此咱們簽收信息天然放在客戶端和服務端創建起鏈接時的回調函數中執行

  • 步驟:
    • 客戶端經過js請求,拉取所有的和本身相關的flag=未讀的消息實體列表
    • 從回調函數數中,把列表中的數據取出,緩存在本地
    • 將列表中的數據回顯在html頁面中
    • 和後端約定,將該列表中全部的實例的id取出,用逗號分隔拼接成字符串, 以action=SIGNED的方式發送給後端,讓其進行簽收

Netty對WebSocket的支持

首先每個Netty服務端的程序都是神似的,想建立不一樣的服務端,就得給Netty裝配的pipeline不一樣的Handler

針對聊天程序,處理String類型的Json信息,咱們選取SimpleChannelInboundHandler, 他是個典型的入站處理器,而且若是咱們沒有出來數據,她會幫咱們回收 重寫它裏面未實現抽象方法,這些抽象方法一樣是回調方法, 當一個新的Channel進來, 它註冊進Selector上的過程當中,會回調不一樣的抽象方法

方法名 回調時機
handlerAdded(ChannelHandlerContext ctx) Pepiline中的Handler添加完成回調
channelRegistered(ChannelHandlerContext ctx) channel註冊進Selector後回調
channelActive(ChannelHandlerContext ctx) channel處於活動狀態回調
channelReadComplete(ChannelHandlerContext ctx) channel, read結束後回調
userEventTriggered(ChannelHandlerContext ctx, Object evt) 當出現用戶事件時回調,如 讀/寫
channelInactive(ChannelHandlerContext ctx) 客戶端斷開鏈接時回調
channelUnregistered(ChannelHandlerContext ctx) 客戶端斷開鏈接後,取消channel的註冊時回調
handlerRemoved(ChannelHandlerContext ctx) 取消channel的註冊後,將channel移除ChannelGroup後回調
exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 出現異常時回調

handler的設計編碼

要作到點對點的聊天,前提是服務端擁有所有的channel由於全部數據的讀寫都依賴於它,而 netty爲咱們提供了ChannelGroup 用來保存全部新添加進來的channel, 此外點對點的聊天,咱們須要將用戶信息和它所屬的channel進行一對一的綁定,才能夠精準的匹配出兩個channel進而數據交互, 所以添加UserChannel映射類

public class UserChanelRelationship {
    private static HashMap<String, Channel> manager = new HashMap<>();
    public static  void put(String sendId,Channel channel){
        manager.put(sendId,channel);
    }
    public static Channel get(String sendId){
        return  manager.get(sendId);
    }
    public static void outPut(){
        for (HashMap.Entry<String,Channel> entry:manager.entrySet()){
            System.out.println("UserId: "+entry.getKey() + "channelId: "+entry.getValue().id().asLongText());
        }
    }
}

咱們把User和Channel之間的關係以鍵值對的形式存放進Map中,服務端啓動後,程序就會維護這個map, 那麼問題來了? 何時添加二者之間的映射關係呢? 看上handler的回調函數,咱們選擇 channelRead0() 當咱們判斷出 客戶端發送過來的信息是 CONNECT類型時,添加映射關係

下面是handler的處理編碼

public class MyHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 用於管理整個客戶端的 組
public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame frame) throws Exception {
Channel currentChanenl = channelHandlerContext.channel();

// 1. 獲取客戶端發送的消息
String content = frame.text();
System.out.println("  content:  "+content);

// 2. 判斷不一樣的消息的類型, 根據不一樣的類型進行不一樣的處理
    // 當創建鏈接時, 第一次open , 初始化channel,將channel和數據庫中的用戶作一個惟一的關聯
DataContent dataContent = JsonUtils.jsonToPojo(content,DataContent.class);
Integer action = dataContent.getAction();

if (action == MsgActionEnum.CHAT.type) {

    // 3. 把聊天記錄保存到數據庫
    // 4. 同時標記消息的簽收狀態 [未簽收]
    // 5. 從咱們的映射中獲取接受方的chanel  發送消息
    // 6. 從 chanelGroup中查找 當前的channel是否存在於 group, 只有存在,咱們才進行下一步發送
    //  6.1 若是沒有接受者用戶channel就不writeAndFlush, 等着用戶上線後,經過js發起請求拉取未接受的信息
    //  6.2 若是沒有接受者用戶channel就不writeAndFlush, 能夠選擇推送

}else if (action == MsgActionEnum.CONNECT.type){
    // 當創建鏈接時, 第一次open , 初始化channel,將channel和數據庫中的用戶作一個惟一的關聯
    String sendId = dataContent.getChatMsg().getSenderId();
    UserChanelRelationship.put(sendId,currentChanenl);
    
}else if(action == MsgActionEnum.SINGNED.type){
    // 7. 當用戶沒有上線時,發送消息的人把要發送的消息持久化在數據庫,可是卻沒有把信息寫回到接受者的channel, 把這種消息稱爲未簽收的消息
    
    // 8. 簽收消息, 就是修改數據庫中消息的簽收狀態, 咱們和前端約定,前端如何簽收消息在上面有提到
    String extend = dataContent.getExtand();
    // 擴展字段在 signed類型表明 須要被簽收的消息的id, 用逗號分隔
    String[] msgIdList = extend.split(",");
    List<String> msgIds = new ArrayList<>();
    Arrays.asList(msgIdList).forEach(s->{
        if (null!=s){
            msgIds.add(s);
        }
    });
    if (!msgIds.isEmpty()&&null!=msgIds&&msgIds.size()>0){
        // 批量簽收
    }

}else if (action == MsgActionEnum.KEEPALIVE.type){
    // 6. 心跳類型
    System.out.println("收到來自channel 爲" +currentChanenl+" 的心跳包... ");
}

}

// handler 添加完成後回調
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 獲取連接, 而且若想要羣發的話,就得往每個channel中寫數據, 所以咱們得在建立鏈接時, 把channel保存起來
System.err.println("handlerAdded");
users .add(ctx.channel());
}

// 用戶關閉了瀏覽器回調
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 斷開鏈接後, channel會自動移除group
// 咱們主動的關閉進行, channel會被移除, 可是咱們若是是開啓的飛行模式,不會被移除
System.err.println("客戶端channel被移出: "+ctx.channel().id().asShortText());
users.remove(ctx.channel());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 發生異常關閉channel, 並從ChannelGroup中移除Channel
ctx.channel().close();
users.remove(ctx.channel());
}

... 其餘方法

先後端的心跳維持

雙方創建起WebSocket鏈接後,服務端須要明確的知道,本身維護的諸多channel中,誰已經掛掉了, 爲了提升性能,須要及早把廢棄的channel移除ChanelGroup

客戶端殺掉了進程,或者開啓了飛行模式, 這時服務端是感知不到它維護的channel中已經有一個不能使用了,首先來講,維護一個不能使用的channel會影響性能,並且當這個channel的好友給他發送消息時,服務端認爲用戶在線,因而向一個不存在的channel寫入刷新數據,會帶來額外的麻煩

這時咱們就須要添加心跳機制,客戶端設置定時任務,每一個一段時間就往服務端發送心跳包,心跳包的內容是什麼不是重點,它的做用就是告訴服務端本身還active, N多個客戶端都要向服務端發送心跳,這並不會增長服務端的請求,由於這個請求是經過WebSocket的send方法發送過去的,只不過dataContent的類型是 KEEPALIVE , 一樣這是咱們提早約定好的(此外,服務端向客戶端發送心跳看起來是沒有必要的)

因而對於後端來講,咱們發送的心跳包,會使得當前客戶端對應的channel的channelRead0()方法回調, netty爲咱們提供了心跳相關的handler, 每一次的chanelRead0()的回調,都是read/write事件, 下面是netty對心跳的支持的實現

/**
 * @Author: Changwu
 * @Date: 2019/7/2 9:33
 * 咱們的心跳handler不須要實現handler0方法,咱們選擇,直接繼承SimpleInboundHandler的父類
*/
public class HeartHandler extends ChannelInboundHandlerAdapter {
// 咱們重寫  EventTrigger 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 當出現read/write  讀寫寫空閒時觸發
if(evt instanceof IdleStateEvent){
    IdleStateEvent event = (IdleStateEvent) evt;

    if (event.state()== IdleState.READER_IDLE){ // 讀空閒
        System.out.println(ctx.channel().id().asShortText()+" 讀空閒... ");
    }else if (event.state()==IdleState.WRITER_IDLE){
        System.out.println(ctx.channel().id().asShortText()+" 寫空閒... ");
    }else if (event.state()==IdleState.ALL_IDLE){
        System.out.println("channel 讀寫空閒, 準備關閉當前channel  , 當前UsersChanel的數量: "+MyHandler.users.size());
        Channel channel = ctx.channel();
        channel.close();
        System.out.println("channel 關閉後, UsersChanel的數量: "+MyHandler.users.size());
    }
}
}

Handler咱們再也不使用SimpleChannelInboundHandler了,由於它當中的方法都是抽象方法,而咱們須要回調的函數時機是,每次當有用戶事件時回調, 好比read,write事件, 這些事件能夠證實channel還活着,對應的方法是userEventTriggered()

此外, ChannelInboundHandlerAdapter是netty中,適配器模式的體現, 它實現了全都抽象方法,而後他的實現方法中並非在幹活,而是把這個事件往下傳播下去了,如今咱們重寫userEventTriggered() 執行的就是咱們的邏輯

另外,咱們須要在pipeline中添加handler

... 
/ 添加netty爲咱們提供的 檢測空閒的處理器,  每 20 40 60 秒, 會觸發userEventTriggered事件的回調
pipeline.addLast(new IdleStateHandler(10,20,30));
// todo 添加心跳的支持
pipeline.addLast("heartHandler",new HeartHandler());

服務端主動向客戶端推送數據

如, 添加好友的操做中, A向B發送添加好友請求的過程,會通過以下幾步

  • A向服務端發送ajax請求,將本身的id, 目標朋友的id持久化到 數據庫,請求friend_request表
  • 用戶B上線,經過js,向後端拉取friend_request表中有沒有關於本身的信息,因而服務端把A的請求給B推送過去
  • 在B的前端回顯A的請求, B進一步處理這個信息, 此時兩種狀況
    • B拒絕了A的請求: 後端把friend_request表關於AB的信息清除
    • B贊成了A的請求: 後端在firend_List表中,將AB雙方的信息都持久化進去, 這時咱們能夠順勢在後端的方法中,給B推送最新的聯繫人信息, 可是這不屬於主動推送,由於此次會話是客戶端主動發起的

可是A殊不知道,B已經贊成了,因而須要給A主動的推送數據, 怎麼推送呢? 咱們須要在上面的UserChannel的關係中,拿出發送者的channel, 而後往回writeAndFlush內容,這時A就得知B已經贊成了,從新加載好友列表

相關文章
相關標籤/搜索