如題,只是爲了實驗,我將全部的客戶端的channel存在靜態變量ChannelGroup實例中,對消息進行羣發。java
固然,若是實際的環境中,我估計要將channel存在緩存數據庫中,具體怎麼作,後面再研究。git
如今,咱們來作此次簡單的實驗:github
源代碼:https://github.com/YangZhouChaoFan/netty-learn/tree/master/netty-start數據庫
1:NettyServerbootstrap
package com.netty.start.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 服務器類. */ public class NettyServer { public void start(int port) throws Exception { //建立接收者的事件循環組 EventLoopGroup parentGroup = new NioEventLoopGroup(); //建立訪問者的事件循環組 EventLoopGroup childGroup = new NioEventLoopGroup(); try { //建立服務器引導程序 ServerBootstrap b = new ServerBootstrap(); //設置消息循環 b.group(parentGroup, childGroup); //設置通道 b.channel(NioServerSocketChannel.class); //配置通道參數:鏈接隊列的鏈接數 b.option(ChannelOption.SO_BACKLOG, 1024); //設置客戶端請求的處理操做 b.childHandler(new ChildChannelHandler()); //綁定端口,並獲取通道io操做的結果 ChannelFuture f = b.bind(port).sync(); //等待服務端監聽端口關閉 f.channel().closeFuture().sync(); } finally { //關閉接收器事件循環 parentGroup.shutdownGracefully(); //關閉訪問者的事件循環 childGroup.shutdownGracefully(); } } }
2:ChildChannelHandlerpromise
package com.netty.start.server; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 客戶端通道處理類. */ public class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel e) throws Exception { ChannelPipeline pipeline = e.pipeline(); // 以("\n")爲結尾分割的 解碼器 pipeline.addLast(new LineBasedFrameDecoder(1024)); // 字符串解碼 和 編碼 pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); //添加消息處理 e.pipeline().addLast(new NettyServerHandler()); } }
3:NettyServerHandler緩存
package com.netty.start.server; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.SocketAddress; /** * 服務器處理類. */ public class NettyServerHandler extends ChannelHandlerAdapter { static private Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); //建立頻道組 public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 鏈接通道. * * @param ctx * @param remoteAddress * @param localAddress * @param promise * @throws Exception */ @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { logger.info(remoteAddress + ":鏈接通道"); super.connect(ctx, remoteAddress, localAddress, promise); } /** * 活躍通道. * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().remoteAddress() + ":通道激活"); super.channelActive(ctx); ctx.writeAndFlush("歡迎訪問服務器\r\n"); channels.add(ctx.channel()); } /** * 非活躍通道. * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().remoteAddress() + ":通道失效"); super.channelInactive(ctx); channels.remove(ctx.channel()); } /** * 接收消息. * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info(ctx.channel().remoteAddress() + ":" + msg); Channel currentChannel = ctx.channel(); for (Channel channel : channels) { if (channel != currentChannel) { channel.writeAndFlush("[" + currentChannel.remoteAddress() + "]" + msg + "\n"); } } } /** * 接收完畢. * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } /** * 關閉通道. * * @param ctx * @param promise * @throws Exception */ @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { logger.info(ctx.channel().remoteAddress() + ":關閉通道"); super.close(ctx, promise); } /** * 異常處理. * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.info("異常信息:" + cause.getMessage()); } }
4:NettyClient服務器
package com.netty.start.client; import com.netty.start.server.ChildChannelHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; import java.io.InputStreamReader; /** * 客戶端類. */ public class NettyClient { public void connect(String host, int port) throws Exception { //建立事件循環組 EventLoopGroup group = new NioEventLoopGroup(); try { //建立引導程序 Bootstrap b = new Bootstrap(); //設置消息循環 b.group(group); //設置通道 b.channel(NioSocketChannel.class); //配置通道參數:tcp不延遲 b.option(ChannelOption.TCP_NODELAY, true); //設置通道處理 b.handler(new ChannelHandler()); //發起異步連接,等待輸入參數 Channel channel = b.connect(host, port).sync().channel(); BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while (true) { channel.writeAndFlush(in.readLine() + "\r\n"); } } finally { //關閉 group.shutdownGracefully(); } } }
5:ChannelHandler異步
package com.netty.start.client; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LineBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 通道處理類. */ public class ChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); // 以("\n")爲結尾分割的 解碼器 pipeline.addLast(new LineBasedFrameDecoder(1024)); // 字符串解碼 和 編碼 pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); socketChannel.pipeline().addLast(new NettyClientHandler()); } }
6:NettyClientHandlersocket
package com.netty.start.client; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.SocketAddress; /** * 客戶端處理類. */ public class NettyClientHandler extends ChannelHandlerAdapter { static private Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); /** * 鏈接通道. * * @param ctx * @param remoteAddress * @param localAddress * @param promise * @throws Exception */ @Override public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { logger.info(remoteAddress + ":鏈接通道"); super.connect(ctx, remoteAddress, localAddress, promise); } /** * 活躍通道. * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().remoteAddress() + ":通道激活"); super.channelActive(ctx); } /** * 非活躍通道. * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { logger.info(ctx.channel().remoteAddress() + ":通道失效"); super.channelInactive(ctx); } /** * 接收消息. * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { logger.info(ctx.channel().remoteAddress() + ":" + msg); } /** * 接收完畢. * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } /** * 關閉通道. * * @param ctx * @param promise * @throws Exception */ @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { super.close(ctx, promise); } /** * 異常處理. * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.info("異常信息:" + cause.getMessage()); } }
7:ServerTest
package com.netty.start.test; import com.netty.start.server.NettyServer; /** * Created by chenhao on 2016/3/17. */ public class ServerTest { public static void main(String[] args) throws Exception { NettyServer server = new NettyServer(); server.start(3000); } }
8:ClientTest
package com.netty.start.test; import com.netty.start.client.NettyClient; /** * Created by chenhao on 2016/3/17. */ public class ClientTest { public static void main(String[] args) throws Exception { NettyClient client = new NettyClient(); client.connect("127.0.0.1", 3000); } }
經過7和8兩個類,啓動測試實例。