public class MyServer { public static void main(String[] args) throws Exception{ // 負責鏈接的NioEventLoopGroup線程數爲1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup wokerGroup = new NioEventLoopGroup(); try{ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class) // 加入日誌 .handler(new LoggingHandler(LogLevel.INFO)) // 自定義channel初始化器 .childHandler(new WebSocketChannelInitializer()); // 綁定本機的8005端口 ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8005)).sync(); // 異步回調-關閉事件 channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } }
channel初始化器javascript
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket協議自己是基於http協議的,因此這邊也要使用http解編碼器 pipeline.addLast(new HttpServerCodec()); //以塊的方式來寫的處理器 pipeline.addLast(new ChunkedWriteHandler()); //netty是基於分段請求的,HttpObjectAggregator的做用是將請求分段再聚合,參數是聚合字節的最大長度 pipeline.addLast(new HttpObjectAggregator(8192)); // 使用websocket協議 //ws://server:port/context_path //參數指的是contex_path pipeline.addLast(new WebSocketServerProtocolHandler("/world")); //websocket定義了傳遞數據的中frame類型 pipeline.addLast(new TextWebSocketFrameHandler()); } }
WebSocketChannelInitializer
channel初始化器html
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //websocket協議自己是基於http協議的,因此這邊也要使用http解編碼器 pipeline.addLast(new HttpServerCodec()); //以塊的方式來寫的處理器 pipeline.addLast(new ChunkedWriteHandler()); //netty是基於分段請求的,HttpObjectAggregator的做用是將請求分段再聚合,參數是聚合字節的最大長度 pipeline.addLast(new HttpObjectAggregator(8192)); // 使用websocket協議 //ws://server:port/context_path //參數指的是contex_path pipeline.addLast(new WebSocketServerProtocolHandler("/world")); //websocket定義了傳遞數據的中frame類型,這裏使用TextWebSocketFrame並自定義一個handler pipeline.addLast(new TextWebSocketFrameHandler()); } }
public class TextWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { /** * 通道列表 用於存放通道 */ public static CopyOnWriteArrayList<Channel> channelList = new CopyOnWriteArrayList<Channel>(); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { /** * writeAndFlush接收的參數類型是Object類型,可是通常咱們都是要傳入管道中傳輸數據的類型,好比咱們當前的demo * 傳輸的就是TextWebSocketFrame類型的數據 */ System.out.println(msg.text()); // 遍歷通道list,向非當前通道發送消息 channelList.forEach(channel -> { if (channel != ctx.channel()){ channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+":" +msg.text())); } else { channel.writeAndFlush(new TextWebSocketFrame("我:" +msg.text())); } } ); } /** * channel add後觸發,向channelList添加新加入的通道 * * @param ctx ctx * @throws Exception 異常 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { channelList.forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+"上線了"))); channelList.add(ctx.channel()); } /** * channel鏈接斷開時觸發,猜想是TCP斷開時觸發回調 發送鏈接斷開事件 * * @param ctx ctx * @throws Exception 異常 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { channelList.forEach(channel -> channel.writeAndFlush(new TextWebSocketFrame(ctx.channel().id().asShortText()+"下線了"))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("異常發生"); ctx.close(); }
TextWebSocketFrameHandler
繼承了SimpleChannelInboundHandler
,SimpleChannelInboundHandler
其實是一個ChannelHandlerAdapter
,其方法會在入站的時候被調用。這裏咱們經過重寫handlerAdded
方法,在通道建立後將其加入當channelList中,並向其他通道發送成員上線的提示信息。重寫channelRead0
方法,向全部非當前channel發送讀取到消息。前端
前端
java
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>WebSocket客戶端</title> </head> <body> <script type="text/javascript"> var socket; //若是瀏覽器支持WebSocket if(window.WebSocket){ //參數就是與服務器鏈接的地址 socket = new WebSocket("ws://localhost:8005/world"); //客戶端收到服務器消息的時候就會執行這個回調方法 socket.onmessage = function (event) { var ta = document.getElementById("responseText"); ta.value = ta.value + "n"+event.data; } //鏈接創建的回調函數 socket.onopen = function(event){ var ta = document.getElementById("responseText"); ta.value = "鏈接開啓"; } //鏈接斷掉的回調函數 socket.onclose = function (event) { var ta = document.getElementById("responseText"); ta.value = ta.value +"n"+"鏈接關閉"; } }else{ alert("瀏覽器不支持WebSocket!"); } //發送數據 function send(message){ if(!window.WebSocket){ return; } //當websocket狀態打開 if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("鏈接沒有開啓"); } } </script> <form onsubmit="return false"> <textarea name = "message" style="width: 400px;height: 200px"></textarea> <input type ="button" value="發送數據" onclick="send(this.form.message.value);"> <h3>服務器輸出:</h3> <textarea id ="responseText" style="width: 400px;height: 300px;"></textarea> <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空數據"> </form> </body> </html>