本文主要研究下reactor-netty的PoolResources的兩種模式elastic及fixed。html
TcpResources是個工具類,能夠用來建立loopResources和poolResources。java
主要是建立NioEventLoopGroup,以及該group下面的workerCount個NioEventLoop(這裏涉及兩個參數,一個是worker thread count,一個是selector thread count)react
主要是建立channelPools,類型是ConcurrentMap<SocketAddress, Pool>,這裏主要研究下它的兩種模式elastic及fixedlinux
reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.javabootstrap
它實現了netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/ChannelPool.java的接口,重點看以下的幾個方法:
@Override public Future<Channel> acquire() { return acquire(defaultGroup.next().newPromise()); } @Override public Future<Channel> acquire(Promise<Channel> promise) { return pool.acquire(promise).addListener(this); } @Override public Future<Void> release(Channel channel) { return pool.release(channel); } @Override public Future<Void> release(Channel channel, Promise<Void> promise) { return pool.release(channel, promise); } @Override public void close() { if(compareAndSet(false, true)) { pool.close(); } }
這裏的幾個接口基本是委託爲具體的pool來進行操做,其實現主要有SimpleChannelPool及FixedChannelPool。
SimpleChannelPool
)reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.javasegmentfault
/** * Create an uncapped {@link PoolResources} to provide automatically for {@link * ChannelPool}. * <p>An elastic {@link PoolResources} will never wait before opening a new * connection. The reuse window is limited but it cannot starve an undetermined volume * of clients using it. * * @param name the channel pool map name * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources elastic(String name) { return new DefaultPoolResources(name, SimpleChannelPool::new); }
這個是TcpClient.create過程當中,默認使用的方法,默認使用的是SimpleChannelPool,建立的是DefaultPoolResources
reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.javapromise
static <T extends TcpResources> T create(T previous, LoopResources loops, PoolResources pools, String name, BiFunction<LoopResources, PoolResources, T> onNew) { if (previous == null) { loops = loops == null ? LoopResources.create("reactor-" + name) : loops; pools = pools == null ? PoolResources.elastic(name) : pools; } else { loops = loops == null ? previous.defaultLoops : loops; pools = pools == null ? previous.defaultPools : pools; } return onNew.apply(loops, pools); }
netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java服務器
/** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. * * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. * */ public class SimpleChannelPool implements ChannelPool { @Override public final Future<Channel> acquire() { return acquire(bootstrap.config().group().next().<Channel>newPromise()); } @Override public Future<Channel> acquire(final Promise<Channel> promise) { checkNotNull(promise, "promise"); return acquireHealthyFromPoolOrNew(promise); } /** * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. * @param promise the promise to provide acquire result. * @return future for acquiring a channel. */ private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) { try { final Channel ch = pollChannel(); if (ch == null) { // No Channel left in the pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY, this); ChannelFuture f = connectChannel(bs); if (f.isDone()) { notifyConnect(f, promise); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future, promise); } }); } return promise; } EventLoop loop = ch.eventLoop(); if (loop.inEventLoop()) { doHealthCheck(ch, promise); } else { loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch, promise); } }); } } catch (Throwable cause) { promise.tryFailure(cause); } return promise; } @Override public final Future<Void> release(Channel channel) { return release(channel, channel.eventLoop().<Void>newPromise()); } @Override public Future<Void> release(final Channel channel, final Promise<Void> promise) { checkNotNull(channel, "channel"); checkNotNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); if (loop.inEventLoop()) { doReleaseChannel(channel, promise); } else { loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel, promise); } }); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } return promise; } @Override public void close() { for (;;) { Channel channel = pollChannel(); if (channel == null) { break; } channel.close(); } } //...... }
這個鏈接池的實現若是沒有鏈接則會建立一個(沒有限制
),取出鏈接(鏈接池使用一個LIFO的Deque來維護Channel
)的時候會檢測鏈接的有效性。
FixedChannelPool
)reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.javaapp
/** * Default max connection, if -1 will never wait to acquire before opening new * connection in an unbounded fashion. Fallback to * available number of processors. */ int DEFAULT_POOL_MAX_CONNECTION = Integer.parseInt(System.getProperty("reactor.ipc.netty.pool.maxConnections", "" + Math.max(Runtime.getRuntime() .availableProcessors(), 8) * 2)); /** * Default acquisition timeout before error. If -1 will never wait to * acquire before opening new * connection in an unbounded fashion. Fallback to * available number of processors. */ long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty( "reactor.ipc.netty.pool.acquireTimeout", "" + 45000)); /** * Create a capped {@link PoolResources} to provide automatically for {@link * ChannelPool}. * <p>A Fixed {@link PoolResources} will open up to the given max number of * processors observed by this jvm (minimum 4). * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name) { return fixed(name, DEFAULT_POOL_MAX_CONNECTION); } /** * Create a capped {@link PoolResources} to provide automatically for {@link * ChannelPool}. * <p>A Fixed {@link PoolResources} will open up to the given max connection value. * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * @param maxConnections the maximum number of connections before starting pending * acquisition on existing ones * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name, int maxConnections) { return fixed(name, maxConnections, DEFAULT_POOL_ACQUIRE_TIMEOUT); } /** * Create a capped {@link PoolResources} to provide automatically for {@link * ChannelPool}. * <p>A Fixed {@link PoolResources} will open up to the given max connection value. * Further connections will be pending acquisition indefinitely. * * @param name the channel pool map name * @param maxConnections the maximum number of connections before starting pending * @param acquireTimeout the maximum time in millis to wait for aquiring * * @return a new {@link PoolResources} to provide automatically for {@link * ChannelPool} */ static PoolResources fixed(String name, int maxConnections, long acquireTimeout) { if (maxConnections == -1) { return elastic(name); } if (maxConnections <= 0) { throw new IllegalArgumentException("Max Connections value must be strictly " + "positive"); } if (acquireTimeout != -1L && acquireTimeout < 0) { throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive"); } return new DefaultPoolResources(name, (bootstrap, handler, checker) -> new FixedChannelPool(bootstrap, handler, checker, FixedChannelPool.AcquireTimeoutAction.FAIL, acquireTimeout, maxConnections, Integer.MAX_VALUE )); }
最後調用的fixed方法有三個參數,一個是name,一個是maxConnections,一個是acquireTimeout。能夠看到這裏建立的是FixedChannelPool。
netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/FixedChannelPool.javajvm
/** * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum * number of concurrent connections. */ public class FixedChannelPool extends SimpleChannelPool { @Override public Future<Channel> acquire(final Promise<Channel> promise) { try { if (executor.inEventLoop()) { acquire0(promise); } else { executor.execute(new Runnable() { @Override public void run() { acquire0(promise); } }); } } catch (Throwable cause) { promise.setFailure(cause); } return promise; } private void acquire0(final Promise<Channel> promise) { assert executor.inEventLoop(); if (closed) { promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION); return; } if (acquiredChannelCount < maxConnections) { assert acquiredChannelCount >= 0; // We need to create a new promise as we need to ensure the AcquireListener runs in the correct // EventLoop Promise<Channel> p = executor.newPromise(); AcquireListener l = new AcquireListener(promise); l.acquired(); p.addListener(l); super.acquire(p); } else { if (pendingAcquireCount >= maxPendingAcquires) { promise.setFailure(FULL_EXCEPTION); } else { AcquireTask task = new AcquireTask(promise); if (pendingAcquireQueue.offer(task)) { ++pendingAcquireCount; if (timeoutTask != null) { task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS); } } else { promise.setFailure(FULL_EXCEPTION); } } assert pendingAcquireCount > 0; } } @Override public Future<Void> release(final Channel channel, final Promise<Void> promise) { ObjectUtil.checkNotNull(promise, "promise"); final Promise<Void> p = executor.newPromise(); super.release(channel, p.addListener(new FutureListener<Void>() { @Override public void operationComplete(Future<Void> future) throws Exception { assert executor.inEventLoop(); if (closed) { // Since the pool is closed, we have no choice but to close the channel channel.close(); promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION); return; } if (future.isSuccess()) { decrementAndRunTaskQueue(); promise.setSuccess(null); } else { Throwable cause = future.cause(); // Check if the exception was not because of we passed the Channel to the wrong pool. if (!(cause instanceof IllegalArgumentException)) { decrementAndRunTaskQueue(); } promise.setFailure(future.cause()); } } })); return promise; } @Override public void close() { executor.execute(new Runnable() { @Override public void run() { if (!closed) { closed = true; for (;;) { AcquireTask task = pendingAcquireQueue.poll(); if (task == null) { break; } ScheduledFuture<?> f = task.timeoutFuture; if (f != null) { f.cancel(false); } task.promise.setFailure(new ClosedChannelException()); } acquiredChannelCount = 0; pendingAcquireCount = 0; FixedChannelPool.super.close(); } } }); } //...... }
這裏的acquire,若是當前線程不是在eventLoop中,則放入隊列中等待執行acquire0,這裏可能撐爆eventLoop的taskQueue,不過其隊列大小的值取Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)),默認是Integer.MAX_VALUE。
FixedChannelPool繼承了SimpleChannelPool,並重寫了acquire、release、close方法。它對獲取鏈接進行了限制,主要有以下幾個參數:
該值先從系統變量reactor.ipc.netty.pool.maxConnections取(若是設置爲-1,表示無限制,回到elastic模式
),若是沒有設置,則取Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2,即核數與8的最大值的2倍。
該值先從系統變量reactor.ipc.netty.pool.acquireTimeout取(若是設置爲-1,表示當即執行不等待
),若是沒有設置,則爲45000毫秒
這裏設置的是Integer.MAX_VALUE
這裏設置爲FixedChannelPool.AcquireTimeoutAction.FAIL,即timeoutTask爲
timeoutTask = new TimeoutTask() { @Override public void onTimeout(AcquireTask task) { // Fail the promise as we timed out. task.promise.setFailure(TIMEOUT_EXCEPTION); } };
若是當前鏈接超過maxConnections,則進入pendingAcquireQueue等待獲取鏈接,而在進入pendingAcquireQueue以前,若是當前等待數量超過了maxPendingAcquires,則返回FULL_EXCEPTION(Too many outstanding acquire operations
),這裏設置的是Integer.MAX_VALUE,因此不會有這個異常。進入pendingAcquireQueue以後,還有一個acquireTimeout參數,即進入pendingAcquireQueue等待acquireTimeout時間,若是尚未獲取到鏈接則返回TIMEOUT_EXCEPTION(Acquire operation took longer then configured maximum time
)。
默認TcpClient建立的PoolResources使用的是elastic模式,即鏈接池的實現是SimpleChannelPool,默認使用一個LIFO的Deque來維護Channel,若是從鏈接池取不到鏈接則會建立新的鏈接,上限應該是系統設置的可以打開的文件資源數量,超過則報SocketException: Too many open files。PoolResources還提供了FixedChannelPool實現,使用的是fixed模式,即限定了鏈接池最大鏈接數及最大等待超時,避免鏈接建立數量過多撐爆內存或者報SocketException: Too many open files異常。
注意,對於fixed模式,若是reactor.ipc.netty.pool.maxConnections設置爲-1,則回退到elastic模式。