Netty搭建WebSocket服務端

Netty服務端

1.引入依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.9.RELEASE</version> <!-- 我這裏用的1.5.9 -->
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blaze</groupId>
    <artifactId>netty-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>netty-demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>

        <!--fastjson-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.50</version>
        </dependency>

        <!--netty依賴-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.43.Final</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <mainClass>com.blaze.nettydemo.server.WebSocketServer</mainClass>
                </configuration>
            </plugin>
        </plugins>
        <finalName>netty-server</finalName>
    </build>

</project>

2.服務端

WebSocketServerjavascript

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * create by zy 2019/11/28 14:50 * TODO */
public final class WebSocketServer { //static final boolean SSL = System.getProperty("ssl") != null; //static final int PORT = Integer.parseInt(System.getProperty("port", SSL ? "8443" : "8888"));
    static final boolean SSL = false; static final int PORT = 8888; public static void main(String[] args) throws Exception { // Configure SSL. 配置 SSL
        final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } /** * interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService * 配置服務端的 NIO 線程池,用於網絡事件處理,實質上他們就是 Reactor 線程組 * bossGroup 用於服務端接受客戶端鏈接,workerGroup 用於進行 SocketChannel 網絡讀寫 */ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // ServerBootstrap 是 Netty 用於啓動 NIO 服務端的輔助啓動類,用於下降開發難度
            ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new WebSocketServerInitializer(sslCtx)); //服務器啓動輔助類配置完成後,調用 bind 方法綁定監聽端口,調用 sync 方法同步等待綁定操做完成,服務開啓
            Channel ch = b.bind(PORT).sync().channel(); System.out.println("服務已開啓,等待客戶端鏈接......"); //下面會進行阻塞,等待服務器鏈接關閉以後 main 方法退出,程序結束
 ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //退出 釋放資源
 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

WebSocketServerInitializerhtml

import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.ssl.SslContext; /** * create by zy 2019/11/28 14:53 * TODO */
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private static final String WEBSOCKET_PATH = "/"; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); // 設置 https 相關
 } pipeline.addLast(new HttpServerCodec()); // http 編碼
        pipeline.addLast(new HttpObjectAggregator(65536)); // http 消息聚合器
        pipeline.addLast(new WebSocketServerCompressionHandler()); // 壓縮 能夠不設置
        pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true)); // 協議
        pipeline.addLast(new WebSocketFrameHandler()); // 處理WebSocketFrame
 } }

WebSocketFrameHandlerjava

import com.alibaba.fastjson.JSON; import com.rising.netty.model.RequestModel; import com.rising.netty.model.ResultModel; import com.rising.netty.util.MQUtil; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; /** * create by zy 2019/11/28 14:57 * TODO */
public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { if (frame instanceof TextWebSocketFrame) { String request = ((TextWebSocketFrame) frame).text(); System.out.println("接收消息:" + request); String msg = "接收成功"; //返回信息
            ctx.channel().writeAndFlush(new TextWebSocketFrame(msg)); } else if (frame instanceof BinaryWebSocketFrame) { //二進制
            ByteBuf content = frame.content(); byte[] reg = new byte[content.readableBytes()]; content.readBytes(reg); String request = new String(reg, "UTF-8"); System.out.println("接收消息:" + request); String msg = "接收成功"; //返回信息
            ByteBuf respByteBuf = Unpooled.copiedBuffer(msg.getBytes()); ctx.channel().writeAndFlush(new BinaryWebSocketFrame(respByteBuf)); } else { String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message); } } }

3.客戶端

WebSocketClientweb

import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.URI; /** * create by zy 2019/11/28 14:57 * TODO */
public final class WebSocketClient { static final String URL = System.getProperty("url", "ws://127.0.0.1:8888/"); public static void main(String[] args) throws Exception { URI uri = new URI(URL); String scheme = uri.getScheme() == null ? "ws" : uri.getScheme(); final String host = uri.getHost() == null ? "127.0.0.1" : uri.getHost(); final int port; if (uri.getPort() == -1) { if ("ws".equalsIgnoreCase(scheme)) { port = 80; } else if ("wss".equalsIgnoreCase(scheme)) { port = 443; } else { port = -1; } } else { port = uri.getPort(); } if (!"ws".equalsIgnoreCase(scheme) && !"wss".equalsIgnoreCase(scheme)) { System.err.println("Only WS(S) is supported."); return; } final boolean ssl = "wss".equalsIgnoreCase(scheme); final SslContext sslCtx; if (ssl) { sslCtx = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE).build(); } else { sslCtx = null; } //配置客戶端 NIO 線程組/池
        EventLoopGroup group = new NioEventLoopGroup(); try { // Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00. // If you change it to V00, ping is not supported and remember to change // HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
            final WebSocketClientHandler handler = new WebSocketClientHandler( WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, true, new DefaultHttpHeaders())); /** * Bootstrap 與 ServerBootstrap 都繼承(extends)於 AbstractBootstrap * 建立客戶端輔助啓動類,並對其配置,與服務器稍微不一樣,這裏的 Channel 設置爲 NioSocketChannel * 而後爲其添加 Handler,這裏直接使用匿名內部類,實現 initChannel 方法 * 做用是當建立 NioSocketChannel 成功後,在進行初始化時,將它的ChannelHandler設置到ChannelPipeline中,用於處理網絡I/O事件 */ Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); if (sslCtx != null) { p.addLast(sslCtx.newHandler(ch.alloc(), host, port)); } p.addLast( new HttpClientCodec(), new HttpObjectAggregator(8192), WebSocketClientCompressionHandler.INSTANCE, handler); } }); //客戶端與服務端創建鏈接
            Channel ch = b.connect(uri.getHost(), port).sync().channel(); handler.handshakeFuture().sync(); /** * 將輸入信息傳輸到 server 端 */ BufferedReader console = new BufferedReader(new InputStreamReader(System.in)); while (true) { String msg = console.readLine(); if (msg == null) { break; } else if ("bye".equals(msg.toLowerCase())) { //輸入bye 斷開鏈接
                    ch.writeAndFlush(new CloseWebSocketFrame()); ch.closeFuture().sync(); break; } else if ("ping".equals(msg.toLowerCase())) { WebSocketFrame frame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1})); ch.writeAndFlush(frame); } else { WebSocketFrame frame = new TextWebSocketFrame(msg); ch.writeAndFlush(frame); } } } finally { group.shutdownGracefully(); } } }

WebSocketClientHandlerspring

import io.netty.channel.*; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; /** * create by zy 2019/11/28 14:58 * TODO */
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { private final WebSocketClientHandshaker handshaker; private ChannelPromise handshakeFuture; public WebSocketClientHandler(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; } public ChannelFuture handshakeFuture() { return handshakeFuture; } @Override public void handlerAdded(ChannelHandlerContext ctx) { handshakeFuture = ctx.newPromise(); } @Override public void channelActive(ChannelHandlerContext ctx) { handshaker.handshake(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) { System.out.println("WebSocket Client disconnected!"); } @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { Channel ch = ctx.channel(); if (!handshaker.isHandshakeComplete()) { try { //握手成功 創建鏈接
 handshaker.finishHandshake(ch, (FullHttpResponse) msg); System.out.println("WebSocket Client connected!"); handshakeFuture.setSuccess(); } catch (WebSocketHandshakeException e) { //握手失敗
                System.out.println("WebSocket Client failed to connect"); handshakeFuture.setFailure(e); } return; } if (msg instanceof FullHttpResponse) { FullHttpResponse response = (FullHttpResponse) msg; throw new IllegalStateException( "Unexpected FullHttpResponse (getStatus=" + response.status() +
                            ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } WebSocketFrame frame = (WebSocketFrame) msg; if (frame instanceof TextWebSocketFrame) { //接收客戶端返回消息
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame; System.out.println("WebSocket Client received message: " + textFrame.text()); } else if (frame instanceof PongWebSocketFrame) { System.out.println("WebSocket Client received pong"); } else if (frame instanceof CloseWebSocketFrame) { System.out.println("WebSocket Client received closing"); ch.close(); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); if (!handshakeFuture.isDone()) { handshakeFuture.setFailure(cause); } ctx.close(); } }

4.web客戶端

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:9898/"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+"\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。鏈接 \r\n"; login(); }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n"; }; }else{ alert("您的瀏覽器不支持WebSocket協議!"); } function send(msg){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(msg); }else{ alert("WebSocket 鏈接沒有創建成功!"); } } function login(){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send("創建鏈接成功!"); }else{ alert("WebSocket 鏈接沒有創建成功!"); } } function closeSocket(){ if(!window.WebSocket){return;} socket.close(); } </script>
</head>
<body>
<form onSubmit="return false;">   
    <label>TEXT</label><input type="text" name="blaze" value="" /> <br />
    <br /> <input type="button" value="發送ws消息" onClick="send(this.form.blaze.value)" />
    <hr color="black" />
    <br /> <input type="button" value="斷開鏈接" onClick="closeSocket()" />
    <hr color="black" />
    <h3>服務端返回的應答消息</h3>
    <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>
相關文章
相關標籤/搜索