使用Netty框架實現簡單WebSocket服務器javascript
NettyServer.javahtml
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class NettyServer { public static void main(String[] args) { new NettyServer().run(); } public void run(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup); b.channel(NioServerSocketChannel.class); b.childHandler(new ChildChannelHandler()); System.out.println("服務端開啓等待客戶端鏈接 ... ..."); Channel ch = b.bind(7397).sync().channel(); ch.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
ChildChannelHandler.javajava
import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; public class ChildChannelHandler extends ChannelInitializer<SocketChannel>{ @Override protected void initChannel(SocketChannel e) throws Exception { e.pipeline().addLast("http-codec",new HttpServerCodec()); e.pipeline().addLast("aggregator",new HttpObjectAggregator(65536)); e.pipeline().addLast("http-chunked",new ChunkedWriteHandler()); e.pipeline().addLast("handler",new MyWebSocketServerHandler()); } }
Global.javaweb
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class Global { public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); }
MyWebSocketServerHandler.javabootstrap
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.util.CharsetUtil; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object> { private static final Logger logger = Logger .getLogger(WebSocketServerHandshaker.class.getName()); private static final Map<String,Channel> allContext=new HashMap<String, Channel>(16); private WebSocketServerHandshaker handshaker; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 添加 Global.group.add(ctx.channel()); System.out.println("客戶端與服務端鏈接開啓"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 移除 Global.group.remove(ctx.channel()); System.out.println("客戶端與服務端鏈接關閉"); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, ((FullHttpRequest) msg)); } else if (msg instanceof WebSocketFrame) { handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判斷是否關閉鏈路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame .retain()); } // 判斷是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } // 本例程僅支持文本消息,不支持二進制消息 if (!(frame instanceof TextWebSocketFrame)) { System.out.println("本例程僅支持文本消息,不支持二進制消息"); throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } // 返回應答消息 String request = ((TextWebSocketFrame) frame).text(); allContext.put(request, ctx.channel()); System.out.println("服務端收到:" + request); if (logger.isLoggable(Level.FINE)) { logger .fine(String.format("%s received %s", ctx.channel(), request)); } TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); TextWebSocketFrame twsToAdmin = new TextWebSocketFrame("Message from " + ctx.channel().id() + ":" + request); // 羣發 // Global.group.writeAndFlush(tws); Channel admin=allContext.get("admin"); admin.writeAndFlush(twsToAdmin); // 返回【誰發的發給誰】 ctx.channel().writeAndFlush(tws); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:7397/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedWebSocketVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回應答給客戶端 if (res.getStatus().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } // 若是是非Keep-Alive,關閉鏈接 ChannelFuture f = ctx.channel().writeAndFlush(res); if (!isKeepAlive(req) || res.getStatus().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } private static boolean isKeepAlive(FullHttpRequest req) { return false; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>無標題文檔</title> </head> </head> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:7397/websocket"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+"\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "打開WebSoket 服務正常,瀏覽器支持WebSoket!"+"\r\n"; }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = ""; ta.value = "WebSocket 關閉"+"\r\n"; }; }else{ alert("您的瀏覽器不支持WebSocket協議!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 鏈接沒有創建成功!"); } } </script> <body> <form onSubmit="return false;"> <input type = "text" name="message" value="Netty The Sinper"/> <br/><br/> <input type="button" value="發送 WebSocket 請求消息" onClick="send(this.form.message.value)"/> <hr color="blue"/> <h3>服務端返回的應答消息</h3> <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea> </form> </body> </html>