netty 基於 protobuf 協議 實現 websocket 版本的簡易客服系統

結構

  • netty 做爲服務端
  • protobuf 做爲序列化數據的協議
  • websocket 前端通信

演示

GitHub 地址

clipboard.png

netty 服務端實現

Server.java 啓動類javascript

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

import java.net.InetSocketAddress;

//websocket長鏈接示例
public class Server {
    public static void main(String[] args) throws Exception{
        
        // 主線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 從線程組
        EventLoopGroup wokerGroup = new NioEventLoopGroup();
        
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,wokerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ServerChannelInitializer());
            
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync();
            channelFuture.channel().closeFuture().sync();
        }finally {
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
        
    }
}

ServerChannelInitializer.javahtml

import com.example.nettydemo.protobuf.MessageData;
import com.google.protobuf.MessageLite;
import com.google.protobuf.MessageLiteOrBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.util.List;

import static io.netty.buffer.Unpooled.wrappedBuffer;


public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // HTTP請求的解碼和編碼
        pipeline.addLast(new HttpServerCodec());
        // 把多個消息轉換爲一個單一的FullHttpRequest或是FullHttpResponse,
        // 緣由是HTTP解碼器會在每一個HTTP消息中生成多個消息對象HttpRequest/HttpResponse,HttpContent,LastHttpContent
        pipeline.addLast(new HttpObjectAggregator(65536));
        // 主要用於處理大數據流,好比一個1G大小的文件若是你直接傳輸確定會撐暴jvm內存的; 增長以後就不用考慮這個問題了
        pipeline.addLast(new ChunkedWriteHandler());
        // WebSocket數據壓縮
        pipeline.addLast(new WebSocketServerCompressionHandler());
        // 協議包長度限制
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true));
        // 協議包解碼
        pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() {
            @Override
            protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception {
                ByteBuf buf = ((BinaryWebSocketFrame) frame).content();
                objs.add(buf);
                buf.retain();
            }
        });
        // 協議包編碼
        pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() {
            @Override
            protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception {
                ByteBuf result = null;
                if (msg instanceof MessageLite) {
                    result = wrappedBuffer(((MessageLite) msg).toByteArray());
                }
                if (msg instanceof MessageLite.Builder) {
                    result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray());
                }

                // ==== 上面代碼片斷是拷貝自TCP ProtobufEncoder 源碼 ====
                // 而後下面再轉成websocket二進制流,由於客戶端不能直接解析protobuf編碼生成的

                WebSocketFrame frame = new BinaryWebSocketFrame(result);
                out.add(frame);
            }
        });
    
        // 協議包解碼時指定Protobuf字節數實例化爲CommonProtocol類型
        pipeline.addLast(new ProtobufDecoder(MessageData.RequestUser.getDefaultInstance()));
        
        // websocket定義了傳遞數據的6中frame類型
        pipeline.addLast(new ServerFrameHandler());
        
    }
}

ServerFrameHandler.java前端

import com.example.nettydemo.protobuf.MessageData;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.util.List;

//處理文本協議數據,處理TextWebSocketFrame類型的數據,websocket專門處理文本的frame就是TextWebSocketFrame
public class ServerFrameHandler extends SimpleChannelInboundHandler<MessageData.RequestUser> {
    
    private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    
    //讀到客戶端的內容而且向客戶端去寫內容
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MessageData.RequestUser msg) throws Exception {
        // channelGroup.add();
    
        Channel channel = ctx.channel();
        System.out.println(msg.getUserName());
        System.out.println(msg.getAge());
        System.out.println(msg.getPassword());
        MessageData.ResponseUser bank = MessageData
                .ResponseUser.newBuilder()
                .setUserName("你好,請問有什麼能夠幫助你!")
                .setAge(18).setPassword("11111").build();

        channel.writeAndFlush(bank);
    }
    
    //每一個channel都有一個惟一的id值
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        //打印出channel惟一值,asLongText方法是channel的id的全名
        // System.out.println("handlerAdded:"+ctx.channel().id().asLongText());
    }
    
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // System.out.println("handlerRemoved:" + ctx.channel().id().asLongText());
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("異常發生");
        ctx.close();
    }
    
}

protobuf 文件的使用

proto 文件vue

syntax ="proto2";

package com.example.nettydemo.protobuf;

//optimize_for 加快解析的速度
option optimize_for = SPEED;
option java_package = "com.example.nettydemo.protobuf";
option java_outer_classname="MessageData";

// 客戶端發送過來的消息實體
message RequestUser{
    optional string user_name = 1;
    optional int32 age = 2;
    optional string password = 3;
}

// 返回給客戶端的消息實體
message ResponseUser{
    optional string user_name = 1;
    optional int32 age = 2;
    optional string password = 3;
}

生成 proto 的Java 類

批量生成工具,直接找到這個 bat 或者 sh 文件,在對應的平臺執行就能夠了具體能夠自行百度 protobuf 怎麼使用

Windows 版本java

set outPath=../../java
set fileArray=(MessageDataProto ATestProto)

# 將.proto文件生成java類
for %%i in %fileArray% do (
    echo generate cli protocol java code: %%i.proto
    protoc --java_out=%outPath% ./%%i.proto
)

pause

sh 版本 地址: https://github.com/lmxdawn/ne...node

#!/bin/bash

outPath=../../java
fileArray=(MessageDataProto ATestProto)

for i in ${fileArray[@]};
do
    echo "generate cli protocol java code: ${i}.proto"
    protoc --java_out=$outPath ./$i.proto
done

websocket 實現

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>WebSocket客戶端</title>
</head>
<body>

<script src="protobuf.min.js"></script>

<script type="text/javascript">
    var socket;
    //若是瀏覽器支持WebSocket
    if (window.WebSocket) {
        //參數就是與服務器鏈接的地址
        socket = new WebSocket("ws://localhost:8899/ws");
        //客戶端收到服務器消息的時候就會執行這個回調方法
        socket.onmessage = function (event) {
            var ta = document.getElementById("responseText");
            // 解碼
            responseUserDecoder({
                data: event.data,
                success: function (responseUser) {
                    var content = "客服小姐姐: " + responseUser.userName +
                        ", 小姐姐年齡: " + responseUser.age +
                        ", 密碼: " + responseUser.password;
                    ta.value = ta.value + "\n" + content;
                },
                fail: function (err) {
                    console.log(err);
                },
                complete: function () {
                    console.log("解碼所有完成")
                }
            })
        }
        //鏈接創建的回調函數
        socket.onopen = function (event) {
            var ta = document.getElementById("responseText");
            ta.value = "鏈接開啓";
        }
        //鏈接斷掉的回調函數
        socket.onclose = function (event) {
            var ta = document.getElementById("responseText");
            ta.value = ta.value + "\n" + "鏈接關閉";
        }
    } else {
        alert("瀏覽器不支持WebSocket!");
    }
    //發送數據
    function send(message) {
        if (!window.WebSocket) {
            return;
        }
        // socket.binaryType = "arraybuffer";
        // 判斷是否開啓
        if (socket.readyState !== WebSocket.OPEN) {
            alert("鏈接沒有開啓");
            return;
        }
        var data = {
            userName: message,
            age: 18,
            password: "11111"
        };
        requestUserEncoder({
            data: data,
            success: function (buffer) {
                console.log("編碼成功");
                socket.send(buffer);
            },
            fail: function (err) {
                console.log(err);
            },
            complete: function () {
                console.log("編碼所有完成")
            }
        });
    }
    /**
     * 發送的消息編碼成 protobuf
     */
    function requestUserEncoder(obj) {
        var data = obj.data;
        var success = obj.success; // 成功的回調
        var fail = obj.fail; // 失敗的回調
        var complete = obj.complete; // 成功或者失敗都會回調
        protobuf.load("../proto/MessageDataProto.proto", function (err, root) {
            if (err) {
                if (typeof fail === "function") {
                    fail(err)
                }
                if (typeof complete === "function") {
                    complete()
                }
                return;
            }
            // Obtain a message type
            var RequestUser = root.lookupType("com.example.nettydemo.protobuf.RequestUser");
            // Exemplary payload
            var payload = data;
            // Verify the payload if necessary (i.e. when possibly incomplete or invalid)
            var errMsg = RequestUser.verify(payload);
            if (errMsg) {
                if (typeof fail === "function") {
                    fail(errMsg)
                }
                if (typeof complete === "function") {
                    complete()
                }
                return;
            }
            // Create a new message
            var message = RequestUser.create(payload); // or use .fromObject if conversion is necessary
            // Encode a message to an Uint8Array (browser) or Buffer (node)
            var buffer = RequestUser.encode(message).finish();
            if (typeof success === "function") {
                success(buffer)
            }
            if (typeof complete === "function") {
                complete()
            }
        });
    }
    /**
     * 接收到服務器二進制流的消息進行解碼
     */
    function responseUserDecoder(obj) {
        var data = obj.data;
        var success = obj.success; // 成功的回調
        var fail = obj.fail; // 失敗的回調
        var complete = obj.complete; // 成功或者失敗都會回調
        protobuf.load("../proto/MessageDataProto.proto", function (err, root) {
            if (err) {
                if (typeof fail === "function") {
                    fail(err)
                }
                if (typeof complete === "function") {
                    complete()
                }
                return;
            }
            // Obtain a message type
            var ResponseUser = root.lookupType("com.example.nettydemo.protobuf.ResponseUser");
            var reader = new FileReader();
            reader.readAsArrayBuffer(data);
            reader.onload = function (e) {
                var buf = new Uint8Array(reader.result);
                var responseUser = ResponseUser.decode(buf);
                if (typeof success === "function") {
                    success(responseUser)
                }
                if (typeof complete === "function") {
                    complete()
                }
            }
        });
    }
</script>

<h1>歡迎訪問客服系統</h1>

<form onsubmit="return false">

    <textarea name="message" style="width: 400px;height: 200px"></textarea>

    <input type="button" value="發送數據" onclick="send(this.form.message.value);">

    <h3>回覆消息:</h3>

    <textarea id="responseText" style="width: 400px;height: 300px;"></textarea>

    <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空數據">
</form>
</body>
</html>

擴展閱讀

spring boot 實現的後臺管理系統
vue + element-ui 實現的後臺管理界面,接入 spring boot API接口git

相關文章
相關標籤/搜索