Netty 即時通訊 後端 (五)

基於netty 和 websocket 作一個即時通訊 聊天的小應用練習前端

 

先來了解即時通訊 , 通常會使用三種實現方式:java

  • Ajax 輪訓
  • Long pull
  • websocket

有不少的例子,好比一些電腦上羣組聊天室,手遊中的聊天平臺等等,都須要一個實時通訊,如何實現雙向通訊web

Ajax輪訓,是制定每過幾秒鐘,去ajax異步請求同步服務器新的數據ajax

Long pull也是採用循環的方式,是一種阻塞的模式,當發出請求,若是服務器不響應,他就會一直卡住不動,早期的通訊方式bootstrap

websocket最初由H5提起,是一種協議,http1.1是支持長連接,http1.0是不支持長連接的,websocket基於TCP協議之上,提供具備持久性的協議後端

對比http每發起一個,必然存在request和response,且是1:1對應的服務器

websocket的優勢使得客戶端和服務器之間的數據交換變得更加簡單,容許服務端主動向客戶端推送數據, 只須要一次連接便可保持長久連接傳輸數據,除非本身退出遊戲了,從新上線websocket

 

Web即時通訊異步

基於以前的經驗,先寫一個server服務端socket

package com.yus.netty.server;

import io.netty.bootstrap.BootstrapConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * websocket 服務端
 */
public class WebSocketServer {

    public static void main(String[] args) throws InterruptedException {
        //採用主從線程組模型
        //主線程組
        EventLoopGroup primaryGroup = new NioEventLoopGroup();

        //從線程組
        EventLoopGroup subGroup = new NioEventLoopGroup();

        try {
            //服務啓動器
            ServerBootstrap bootstrap = new ServerBootstrap();

            //創建通道,管道以及助手處理類 入口
            bootstrap.group(primaryGroup, subGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new WebSocketChannelInit());

            //綁定端口
            ChannelFuture future = bootstrap.bind(8081).sync();
            future.channel().closeFuture().sync();
        } finally {
            //關閉
            primaryGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }

    }
}

 

初始化器

package com.yus.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
 * 通道初始化器
 */
public class WebSocketChannelInit extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //獲取管道
        ChannelPipeline pipeline = ch.pipeline();

        //=====開始============用於支持Http協議的處理類=================
        //通訊 netty提供的編解碼處理類
        pipeline.addLast(new HttpServerCodec());

        //對處理數量大的數據流的支持
        pipeline.addLast(new ChunkedWriteHandler());

        //聚合器,方便處理http消息 1024*64 爲消息最大長度(byte)支持http協議的必要處理類
        pipeline.addLast(new HttpObjectAggregator(1024*64));
        //=====結束============用於支持Http協議的處理類=================

        // 支持websocket協議的處理類,創建連接時使用
        // /ws指定客戶端訪問服務端的路由,可隨便自定義,這邊寫ws是websocket簡寫
        // 該處理類幫咱們處理繁重的事情並run websocket服務端,
        // 並管理通訊握手動做(包括close關閉,Ping請求,Pong響應)Ping+Pong=心跳,關於心跳後續再作說明
        // 並以frame進行數據傳輸,不一樣的數據類型,frame也不一樣
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //自定義 處理類,主要用於讀取客戶端消息,而後對消息進行處理,最後能夠返回給客戶端
        pipeline.addLast("myHandle", new MyHandle());
    }
}

 

自定義處理類

package com.yus.netty.server;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * 自定義處理類
 * 在寫初識化器時有說明,關於websocket傳輸時,主要以frames方式傳輸
 * 在Netty中frame會專門爲websocket處理 文本 的對象 - TextWebSocketFrame
 * frame是消息的載體
 */
public class MyHandle extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 第四步
    // ChannelGroup:記錄和管理Channel,使用DefaultChannelGroup默認實現,GlobalEventExecutor全局初始化
    private static ChannelGroup channelClient = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        //第一步:從消息載體中獲取客戶端的消息
        String content = msg.text();
        System.out.println("消息:" + content);

        //第二步:
        //拿到消息文本,而後將消息發給全部客戶端,這時無論有多少個客戶端
        //均可以將此客戶端的消息給全部的客戶端,每個客戶端會註冊一個channel進來
        //經過channel通道進行消息推送出去,這時候就用到了上次學習的Channel的方法週期,
        //生命週期 重寫handlerAdded 和 handlerRemoved
        Channel channel = ctx.channel();

        //第七步
        //將數據 刷到全部的客戶端 第一種方式
        for (Channel channels : channelClient){
            //注意 這邊的載體是泛型TextWebSocketFrame ,不能直接String扔出去
            //要將消息放入載體,再送出去
            channels.writeAndFlush(new TextWebSocketFrame("短ID爲" + channel.id().asShortText() + "用戶 發送消息:" + content));
        }

        //將數據 刷到全部的客戶端 第二種 方式直接用ChannelGroup的writeAndFlush
        //channelClient.writeAndFlush(new TextWebSocketFrame("我又在哪裏,我被送出去了嗎?"));
    }

    /**
     * 第三步
     * 客戶端建立了,觸發
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //獲取客戶端的channel雙向通道
        Channel channel = ctx.channel();

        //第五步
        //添加到ChannelGroup,方便全局管理
        channelClient.add(channel);
    }

    /**
     * 第六步
     * 客戶端離開了,觸發
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //當觸發這個handlerRemoved時,其實ChannelGroup會自動移除對應客戶端的通道channel

        //因此不須要咱們去調remove的方法,測試發現是多餘的
        //channelClient.remove(ctx.channel());

        //ctx.channel().id()中存在兩個ID,一個長ID,一個短ID,若是學習過zookeeper的同窗會熟悉一些
        //服務少的時候,短ID衝突的可能性小,會用短ID進行選擇,反之就是長ID
        System.out.println("Channel 長ID爲 " + ctx.channel().id().asLongText() + "客戶端離開了");
        System.out.println("Channel 短ID爲 " + ctx.channel().id().asShortText() + "客戶端離開了");
    }
}

 

編寫完後端,已經快1點鐘了,睡覺了,明天繼續寫個測試前端頁面

--------------------------------------------------

相關文章
相關標籤/搜索