Netty FixedChannelPool

現在愈來愈多的應用採用Netty做爲服務端高性能異步通信框架,對於客戶端而言,大部分需求只需和服務端創建一條連接收發消息。但若是客戶端須要和服務端創建多條連接的例子就比較少了。 
最簡單的實現就是一個for循環,創建多個NioEventLoopGroup與服務端交互。另外還有若是要和多個服務端進行交互又該如何解決。java

其實Netty從4.0版本就提供了鏈接池ChannelPool,能夠解決與多個服務端交互以及與單個服務端創建鏈接池的問題。bootstrap

服務端代碼

首先咱們完成服務端的代碼,用戶測試客戶端的鏈接池。服務端不須要作任何特殊處理,代碼以下。ruby

package tk.yuqibit.nio.pool; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.concurrent.DefaultEventExecutorGroup;  public class NettyServer { public void run(final int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter)) .addLast(new StringDecoder()).addLast(new StringEncoder()) .addLast(new DefaultEventExecutorGroup(8), new NettyServerHandler()); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = b.bind(port).sync(); future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args.length > 0) { try { port = Integer.parseInt(args[0]); } catch (NumberFormatException e) { e.printStackTrace(); } } new NettyServer().run(port); } } 

服務端Handler,打印客戶端發送的字符串,並回復另外一個字符串框架

package tk.yuqibit.nio.pool; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import java.util.concurrent.atomic.AtomicInteger; /** * Created by YuQi on 2017/7/31. */ public class NettyServerHandler extends SimpleChannelInboundHandler<Object> { static AtomicInteger count = new AtomicInteger(1); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActived"); super.channelActive(ctx); } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { String body = (String) msg; System.out.println(count.getAndIncrement() + ":" + body); ctx.writeAndFlush("Welcome to Netty.$_"); } } 

客戶端代碼

重點是如下客戶端代碼,首先要實現本身的ChannelPoolHandler,主要是channelCreated,當連接建立的時候添加channelhandler。異步

package tk.yuqibit.nio.pool; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * Created by YuQi on 2017/7/31. */ public class NettyChannelPoolHandler implements ChannelPoolHandler { @Override public void channelReleased(Channel ch) throws Exception { System.out.println("channelReleased. Channel ID: " + ch.id()); } @Override public void channelAcquired(Channel ch) throws Exception { System.out.println("channelAcquired. Channel ID: " + ch.id()); } @Override public void channelCreated(Channel ch) throws Exception { ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); System.out.println("channelCreated. Channel ID: " + ch.id()); SocketChannel channel = (SocketChannel) ch; channel.config().setKeepAlive(true); channel.config().setTcpNoDelay(true); channel.pipeline() .addLast(new DelimiterBasedFrameDecoder(1024, delimiter)) .addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new NettyClientHander()); } } 

客戶端Handler,打印服務端的response。socket

package tk.yuqibit.nio.pool; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.atomic.AtomicInteger;  public class NettyClientHander extends ChannelInboundHandlerAdapter { static AtomicInteger count = new AtomicInteger(1); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(count.getAndIncrement() + ":" + msg); } } 

客戶端實現鏈接池,其中ChannelPoolMap可用於與多個服務端創建連接,本例中採用FixedChannelPool創建與單個服務端最大鏈接數爲2的鏈接池。在main函數裏經過向鏈接池獲取channel發送了十條消息。ide

package tk.yuqibit.nio.pool; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.AbstractChannelPoolMap; import io.netty.channel.pool.ChannelPoolMap; import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.pool.SimpleChannelPool; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import java.net.InetSocketAddress; /** * Created by YuQi on 2017/7/31. */ public class NettyPoolClient { final EventLoopGroup group = new NioEventLoopGroup(); final Bootstrap strap = new Bootstrap(); InetSocketAddress addr1 = new InetSocketAddress("127.0.0.1", 8080); InetSocketAddress addr2 = new InetSocketAddress("10.0.0.11", 8888); ChannelPoolMap<InetSocketAddress, SimpleChannelPool> poolMap; public void build() throws Exception { strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true); poolMap = new AbstractChannelPoolMap<InetSocketAddress, SimpleChannelPool>() { @Override protected SimpleChannelPool newPool(InetSocketAddress key) { return new FixedChannelPool(strap.remoteAddress(key), new NettyChannelPoolHandler(), 2); } }; } public static void main(String[] args) throws Exception { NettyPoolClient client = new NettyPoolClient(); client.build(); final String ECHO_REQ = "Hello Netty.$_"; for (int i = 0; i < 10; i++) { // depending on when you use addr1 or addr2 you will get different pools. final SimpleChannelPool pool = client.poolMap.get(client.addr1); Future<Channel> f = pool.acquire(); f.addListener((FutureListener<Channel>) f1 -> { if (f1.isSuccess()) { Channel ch = f1.getNow(); ch.writeAndFlush(ECHO_REQ); // Release back to pool pool.release(ch); } }); } } } 

 

輸出結果

首先啓動服務端,而後啓動客戶端,for循環裏向服務端發送了10條消息。 
服務端的輸出以下,能夠看到總共與服務端創建了兩個channel,收到10條消息。函數

channelActived
channelActived
1:Hello Netty. 2:Hello Netty. 3:Hello Netty. 4:Hello Netty. 5:Hello Netty. 6:Hello Netty. 7:Hello Netty. 8:Hello Netty. 9:Hello Netty. 10:Hello Netty.

客戶端輸入以下,能夠看到channelCreated了兩次,剩下都是從鏈接池裏請求鏈接和釋放鏈接oop

channelCreated. Channel ID: ea8504a8
channelCreated. Channel ID: 77c8857b channelReleased. Channel ID: ea8504a8 channelReleased. Channel ID: 77c8857b channelAcquired. Channel ID: ea8504a8 channelAcquired. Channel ID: 77c8857b channelReleased. Channel ID: ea8504a8 channelReleased. Channel ID: 77c8857b channelAcquired. Channel ID: 77c8857b channelAcquired. Channel ID: ea8504a8 channelReleased. Channel ID: ea8504a8 channelAcquired. Channel ID: ea8504a8 channelReleased. Channel ID: 77c8857b channelReleased. Channel ID: ea8504a8 channelAcquired. Channel ID: 77c8857b channelAcquired. Channel ID: ea8504a8 channelReleased. Channel ID: 77c8857b channelAcquired. Channel ID: 77c8857b channelReleased. Channel ID: ea8504a8 channelReleased. Channel ID: 77c8857b 1:Welcome to Netty. 2:Welcome to Netty. 3:Welcome to Netty. 4:Welcome to Netty. 5:Welcome to Netty. 6:Welcome to Netty. 7:Welcome to Netty. 8:Welcome to Netty. 9:Welcome to Netty. 10:Welcome to Netty.
相關文章
相關標籤/搜索