GitHub 地址
Server.java
啓動類javascript
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import java.net.InetSocketAddress; //websocket長鏈接示例 public class Server { public static void main(String[] args) throws Exception{ // 主線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 從線程組 EventLoopGroup wokerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ServerChannelInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } } }
ServerChannelInitializer.java
html
import com.example.nettydemo.protobuf.MessageData; import com.google.protobuf.MessageLite; import com.google.protobuf.MessageLiteOrBuilder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.MessageToMessageDecoder; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.stream.ChunkedWriteHandler; import java.util.List; import static io.netty.buffer.Unpooled.wrappedBuffer; public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // HTTP請求的解碼和編碼 pipeline.addLast(new HttpServerCodec()); // 把多個消息轉換爲一個單一的FullHttpRequest或是FullHttpResponse, // 緣由是HTTP解碼器會在每一個HTTP消息中生成多個消息對象HttpRequest/HttpResponse,HttpContent,LastHttpContent pipeline.addLast(new HttpObjectAggregator(65536)); // 主要用於處理大數據流,好比一個1G大小的文件若是你直接傳輸確定會撐暴jvm內存的; 增長以後就不用考慮這個問題了 pipeline.addLast(new ChunkedWriteHandler()); // WebSocket數據壓縮 pipeline.addLast(new WebSocketServerCompressionHandler()); // 協議包長度限制 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true)); // 協議包解碼 pipeline.addLast(new MessageToMessageDecoder<WebSocketFrame>() { @Override protected void decode(ChannelHandlerContext ctx, WebSocketFrame frame, List<Object> objs) throws Exception { ByteBuf buf = ((BinaryWebSocketFrame) frame).content(); objs.add(buf); buf.retain(); } }); // 協議包編碼 pipeline.addLast(new MessageToMessageEncoder<MessageLiteOrBuilder>() { @Override protected void encode(ChannelHandlerContext ctx, MessageLiteOrBuilder msg, List<Object> out) throws Exception { ByteBuf result = null; if (msg instanceof MessageLite) { result = wrappedBuffer(((MessageLite) msg).toByteArray()); } if (msg instanceof MessageLite.Builder) { result = wrappedBuffer(((MessageLite.Builder) msg).build().toByteArray()); } // ==== 上面代碼片斷是拷貝自TCP ProtobufEncoder 源碼 ==== // 而後下面再轉成websocket二進制流,由於客戶端不能直接解析protobuf編碼生成的 WebSocketFrame frame = new BinaryWebSocketFrame(result); out.add(frame); } }); // 協議包解碼時指定Protobuf字節數實例化爲CommonProtocol類型 pipeline.addLast(new ProtobufDecoder(MessageData.RequestUser.getDefaultInstance())); // websocket定義了傳遞數據的6中frame類型 pipeline.addLast(new ServerFrameHandler()); } }
ServerFrameHandler.java
前端
import com.example.nettydemo.protobuf.MessageData; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.List; //處理文本協議數據,處理TextWebSocketFrame類型的數據,websocket專門處理文本的frame就是TextWebSocketFrame public class ServerFrameHandler extends SimpleChannelInboundHandler<MessageData.RequestUser> { private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //讀到客戶端的內容而且向客戶端去寫內容 @Override protected void channelRead0(ChannelHandlerContext ctx, MessageData.RequestUser msg) throws Exception { // channelGroup.add(); Channel channel = ctx.channel(); System.out.println(msg.getUserName()); System.out.println(msg.getAge()); System.out.println(msg.getPassword()); MessageData.ResponseUser bank = MessageData .ResponseUser.newBuilder() .setUserName("你好,請問有什麼能夠幫助你!") .setAge(18).setPassword("11111").build(); channel.writeAndFlush(bank); } //每一個channel都有一個惟一的id值 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //打印出channel惟一值,asLongText方法是channel的id的全名 // System.out.println("handlerAdded:"+ctx.channel().id().asLongText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // System.out.println("handlerRemoved:" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常發生"); ctx.close(); } }
proto 文件
vue
syntax ="proto2"; package com.example.nettydemo.protobuf; //optimize_for 加快解析的速度 option optimize_for = SPEED; option java_package = "com.example.nettydemo.protobuf"; option java_outer_classname="MessageData"; // 客戶端發送過來的消息實體 message RequestUser{ optional string user_name = 1; optional int32 age = 2; optional string password = 3; } // 返回給客戶端的消息實體 message ResponseUser{ optional string user_name = 1; optional int32 age = 2; optional string password = 3; }
批量生成工具,直接找到這個 bat 或者 sh 文件,在對應的平臺執行就能夠了具體能夠自行百度 protobuf 怎麼使用
Windows 版本
java
set outPath=../../java set fileArray=(MessageDataProto ATestProto) # 將.proto文件生成java類 for %%i in %fileArray% do ( echo generate cli protocol java code: %%i.proto protoc --java_out=%outPath% ./%%i.proto ) pause
sh 版本
地址: https://github.com/lmxdawn/ne...node
#!/bin/bash outPath=../../java fileArray=(MessageDataProto ATestProto) for i in ${fileArray[@]}; do echo "generate cli protocol java code: ${i}.proto" protoc --java_out=$outPath ./$i.proto done
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>WebSocket客戶端</title> </head> <body> <script src="protobuf.min.js"></script> <script type="text/javascript"> var socket; //若是瀏覽器支持WebSocket if (window.WebSocket) { //參數就是與服務器鏈接的地址 socket = new WebSocket("ws://localhost:8899/ws"); //客戶端收到服務器消息的時候就會執行這個回調方法 socket.onmessage = function (event) { var ta = document.getElementById("responseText"); // 解碼 responseUserDecoder({ data: event.data, success: function (responseUser) { var content = "客服小姐姐: " + responseUser.userName + ", 小姐姐年齡: " + responseUser.age + ", 密碼: " + responseUser.password; ta.value = ta.value + "\n" + content; }, fail: function (err) { console.log(err); }, complete: function () { console.log("解碼所有完成") } }) } //鏈接創建的回調函數 socket.onopen = function (event) { var ta = document.getElementById("responseText"); ta.value = "鏈接開啓"; } //鏈接斷掉的回調函數 socket.onclose = function (event) { var ta = document.getElementById("responseText"); ta.value = ta.value + "\n" + "鏈接關閉"; } } else { alert("瀏覽器不支持WebSocket!"); } //發送數據 function send(message) { if (!window.WebSocket) { return; } // socket.binaryType = "arraybuffer"; // 判斷是否開啓 if (socket.readyState !== WebSocket.OPEN) { alert("鏈接沒有開啓"); return; } var data = { userName: message, age: 18, password: "11111" }; requestUserEncoder({ data: data, success: function (buffer) { console.log("編碼成功"); socket.send(buffer); }, fail: function (err) { console.log(err); }, complete: function () { console.log("編碼所有完成") } }); } /** * 發送的消息編碼成 protobuf */ function requestUserEncoder(obj) { var data = obj.data; var success = obj.success; // 成功的回調 var fail = obj.fail; // 失敗的回調 var complete = obj.complete; // 成功或者失敗都會回調 protobuf.load("../proto/MessageDataProto.proto", function (err, root) { if (err) { if (typeof fail === "function") { fail(err) } if (typeof complete === "function") { complete() } return; } // Obtain a message type var RequestUser = root.lookupType("com.example.nettydemo.protobuf.RequestUser"); // Exemplary payload var payload = data; // Verify the payload if necessary (i.e. when possibly incomplete or invalid) var errMsg = RequestUser.verify(payload); if (errMsg) { if (typeof fail === "function") { fail(errMsg) } if (typeof complete === "function") { complete() } return; } // Create a new message var message = RequestUser.create(payload); // or use .fromObject if conversion is necessary // Encode a message to an Uint8Array (browser) or Buffer (node) var buffer = RequestUser.encode(message).finish(); if (typeof success === "function") { success(buffer) } if (typeof complete === "function") { complete() } }); } /** * 接收到服務器二進制流的消息進行解碼 */ function responseUserDecoder(obj) { var data = obj.data; var success = obj.success; // 成功的回調 var fail = obj.fail; // 失敗的回調 var complete = obj.complete; // 成功或者失敗都會回調 protobuf.load("../proto/MessageDataProto.proto", function (err, root) { if (err) { if (typeof fail === "function") { fail(err) } if (typeof complete === "function") { complete() } return; } // Obtain a message type var ResponseUser = root.lookupType("com.example.nettydemo.protobuf.ResponseUser"); var reader = new FileReader(); reader.readAsArrayBuffer(data); reader.onload = function (e) { var buf = new Uint8Array(reader.result); var responseUser = ResponseUser.decode(buf); if (typeof success === "function") { success(responseUser) } if (typeof complete === "function") { complete() } } }); } </script> <h1>歡迎訪問客服系統</h1> <form onsubmit="return false"> <textarea name="message" style="width: 400px;height: 200px"></textarea> <input type="button" value="發送數據" onclick="send(this.form.message.value);"> <h3>回覆消息:</h3> <textarea id="responseText" style="width: 400px;height: 300px;"></textarea> <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空數據"> </form> </body> </html>
spring boot 實現的後臺管理系統
vue + element-ui 實現的後臺管理界面,接入 spring boot API接口git