聊聊reactor-netty的PoolResources的兩種模式

本文主要研究下reactor-netty的PoolResources的兩種模式elastic及fixed。html

LoopResources與PoolResources

TcpResources是個工具類,能夠用來建立loopResources和poolResources。java

loopResources

主要是建立NioEventLoopGroup,以及該group下面的workerCount個NioEventLoop(這裏涉及兩個參數,一個是worker thread count,一個是selector thread count)react

  • DEFAULT_IO_WORKER_COUNT:若是環境變量有設置reactor.ipc.netty.workerCount,則用該值;沒有設置則取Math.max(Runtime.getRuntime().availableProcessors(), 4)))
  • DEFAULT_IO_SELECT_COUNT:若是環境變量有設置reactor.ipc.netty.selectCount,則用該值;沒有設置則取-1,表示沒有selector thread
  • DEFAULT_MAX_PENDING_TASKS: 指定NioEventLoop的taskQueue的大小,Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE))
  • NioEventLoop繼承了SingleThreadEventLoop,而SingleThreadEventLoop則繼承SingleThreadEventExecutor,而其代理的executor是ThreadPerTaskExecutor,rejectHandler是RejectedExecutionHandlers.reject(),默認的taskQueue是LinkedBlockingQueue,其大小爲Integer.MAX_VALUE

poolResources

主要是建立channelPools,類型是ConcurrentMap<SocketAddress, Pool>,這裏主要研究下它的兩種模式elastic及fixedlinux

DefaultPoolResources

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.javabootstrap

它實現了netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/ChannelPool.java的接口,重點看以下的幾個方法:
@Override
        public Future<Channel> acquire() {
            return acquire(defaultGroup.next().newPromise());
        }

        @Override
        public Future<Channel> acquire(Promise<Channel> promise) {
            return pool.acquire(promise).addListener(this);
        }

        @Override
        public Future<Void> release(Channel channel) {
            return pool.release(channel);
        }

        @Override
        public Future<Void> release(Channel channel, Promise<Void> promise) {
            return pool.release(channel, promise);
        }

        @Override
        public void close() {
            if(compareAndSet(false, true)) {
                pool.close();
            }
        }
這裏的幾個接口基本是委託爲具體的pool來進行操做,其實現主要有SimpleChannelPool及FixedChannelPool。

PoolResources.elastic(SimpleChannelPool)

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.javasegmentfault

/**
     * Create an uncapped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>An elastic {@link PoolResources} will never wait before opening a new
     * connection. The reuse window is limited but it cannot starve an undetermined volume
     * of clients using it.
     *
     * @param name the channel pool map name
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources elastic(String name) {
        return new DefaultPoolResources(name, SimpleChannelPool::new);
    }
這個是TcpClient.create過程當中,默認使用的方法,默認使用的是SimpleChannelPool,建立的是DefaultPoolResources

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.javapromise

static <T extends TcpResources> T create(T previous,
            LoopResources loops,
            PoolResources pools,
            String name,
            BiFunction<LoopResources, PoolResources, T> onNew) {
        if (previous == null) {
            loops = loops == null ? LoopResources.create("reactor-" + name) : loops;
            pools = pools == null ? PoolResources.elastic(name) : pools;
        }
        else {
            loops = loops == null ? previous.defaultLoops : loops;
            pools = pools == null ? previous.defaultPools : pools;
        }
        return onNew.apply(loops, pools);
    }

SimpleChannelPool

netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.java服務器

/**
 * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
 * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
 *
 * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
 *
 */
public class SimpleChannelPool implements ChannelPool {

    @Override
    public final Future<Channel> acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        checkNotNull(promise, "promise");
        return acquireHealthyFromPoolOrNew(promise);
    }

    /**
     * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
     * @param promise the promise to provide acquire result.
     * @return future for acquiring a channel.
     */
    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

    @Override
    public final Future<Void> release(Channel channel) {
        return release(channel, channel.eventLoop().<Void>newPromise());
    }

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    @Override
    public void close() {
        for (;;) {
            Channel channel = pollChannel();
            if (channel == null) {
                break;
            }
            channel.close();
        }
    }

    //......
}
這個鏈接池的實現若是沒有鏈接則會建立一個( 沒有限制),取出鏈接( 鏈接池使用一個LIFO的Deque來維護Channel)的時候會檢測鏈接的有效性。

PoolResources.fixed(FixedChannelPool)

reactor-netty-0.7.5.RELEASE-sources.jar!/reactor/ipc/netty/resources/PoolResources.javaapp

/**
     * Default max connection, if -1 will never wait to acquire before opening new
     * connection in an unbounded fashion. Fallback to
     * available number of processors.
     */
    int DEFAULT_POOL_MAX_CONNECTION =
            Integer.parseInt(System.getProperty("reactor.ipc.netty.pool.maxConnections",
            "" + Math.max(Runtime.getRuntime()
                        .availableProcessors(), 8) * 2));

    /**
     * Default acquisition timeout before error. If -1 will never wait to
     * acquire before opening new
     * connection in an unbounded fashion. Fallback to
     * available number of processors.
     */
    long DEFAULT_POOL_ACQUIRE_TIMEOUT = Long.parseLong(System.getProperty(
            "reactor.ipc.netty.pool.acquireTimeout",
            "" + 45000));

    /**
     * Create a capped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>A Fixed {@link PoolResources} will open up to the given max number of
     * processors observed by this jvm (minimum 4).
     * Further connections will be pending acquisition indefinitely.
     *
     * @param name the channel pool map name
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources fixed(String name) {
        return fixed(name, DEFAULT_POOL_MAX_CONNECTION);
    }

    /**
     * Create a capped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>A Fixed {@link PoolResources} will open up to the given max connection value.
     * Further connections will be pending acquisition indefinitely.
     *
     * @param name the channel pool map name
     * @param maxConnections the maximum number of connections before starting pending
     * acquisition on existing ones
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources fixed(String name, int maxConnections) {
        return fixed(name, maxConnections, DEFAULT_POOL_ACQUIRE_TIMEOUT);
    }

    /**
     * Create a capped {@link PoolResources} to provide automatically for {@link
     * ChannelPool}.
     * <p>A Fixed {@link PoolResources} will open up to the given max connection value.
     * Further connections will be pending acquisition indefinitely.
     *
     * @param name the channel pool map name
     * @param maxConnections the maximum number of connections before starting pending
     * @param acquireTimeout the maximum time in millis to wait for aquiring
     *
     * @return a new {@link PoolResources} to provide automatically for {@link
     * ChannelPool}
     */
    static PoolResources fixed(String name, int maxConnections, long acquireTimeout) {
        if (maxConnections == -1) {
            return elastic(name);
        }
        if (maxConnections <= 0) {
            throw new IllegalArgumentException("Max Connections value must be strictly " + "positive");
        }
        if (acquireTimeout != -1L && acquireTimeout < 0) {
            throw new IllegalArgumentException("Acquire Timeout value must " + "be " + "positive");
        }
        return new DefaultPoolResources(name,
                (bootstrap, handler, checker) -> new FixedChannelPool(bootstrap,
                        handler,
                        checker,
                        FixedChannelPool.AcquireTimeoutAction.FAIL,
                        acquireTimeout,
                        maxConnections,
                        Integer.MAX_VALUE
                        ));
    }
最後調用的fixed方法有三個參數,一個是name,一個是maxConnections,一個是acquireTimeout。能夠看到這裏建立的是FixedChannelPool。

FixedChannelPool

netty-transport-4.1.22.Final-sources.jar!/io/netty/channel/pool/FixedChannelPool.javajvm

/**
 * {@link ChannelPool} implementation that takes another {@link ChannelPool} implementation and enforce a maximum
 * number of concurrent connections.
 */
public class FixedChannelPool extends SimpleChannelPool {

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        try {
            if (executor.inEventLoop()) {
                acquire0(promise);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        acquire0(promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.setFailure(cause);
        }
        return promise;
    }

    private void acquire0(final Promise<Channel> promise) {
        assert executor.inEventLoop();

        if (closed) {
            promise.setFailure(POOL_CLOSED_ON_ACQUIRE_EXCEPTION);
            return;
        }
        if (acquiredChannelCount < maxConnections) {
            assert acquiredChannelCount >= 0;

            // We need to create a new promise as we need to ensure the AcquireListener runs in the correct
            // EventLoop
            Promise<Channel> p = executor.newPromise();
            AcquireListener l = new AcquireListener(promise);
            l.acquired();
            p.addListener(l);
            super.acquire(p);
        } else {
            if (pendingAcquireCount >= maxPendingAcquires) {
                promise.setFailure(FULL_EXCEPTION);
            } else {
                AcquireTask task = new AcquireTask(promise);
                if (pendingAcquireQueue.offer(task)) {
                    ++pendingAcquireCount;

                    if (timeoutTask != null) {
                        task.timeoutFuture = executor.schedule(timeoutTask, acquireTimeoutNanos, TimeUnit.NANOSECONDS);
                    }
                } else {
                    promise.setFailure(FULL_EXCEPTION);
                }
            }

            assert pendingAcquireCount > 0;
        }
    }

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        final Promise<Void> p = executor.newPromise();
        super.release(channel, p.addListener(new FutureListener<Void>() {

            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                assert executor.inEventLoop();

                if (closed) {
                    // Since the pool is closed, we have no choice but to close the channel
                    channel.close();
                    promise.setFailure(POOL_CLOSED_ON_RELEASE_EXCEPTION);
                    return;
                }

                if (future.isSuccess()) {
                    decrementAndRunTaskQueue();
                    promise.setSuccess(null);
                } else {
                    Throwable cause = future.cause();
                    // Check if the exception was not because of we passed the Channel to the wrong pool.
                    if (!(cause instanceof IllegalArgumentException)) {
                        decrementAndRunTaskQueue();
                    }
                    promise.setFailure(future.cause());
                }
            }
        }));
        return promise;
    }

    @Override
    public void close() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                if (!closed) {
                    closed = true;
                    for (;;) {
                        AcquireTask task = pendingAcquireQueue.poll();
                        if (task == null) {
                            break;
                        }
                        ScheduledFuture<?> f = task.timeoutFuture;
                        if (f != null) {
                            f.cancel(false);
                        }
                        task.promise.setFailure(new ClosedChannelException());
                    }
                    acquiredChannelCount = 0;
                    pendingAcquireCount = 0;
                    FixedChannelPool.super.close();
                }
            }
        });
    }
    //......
}

這裏的acquire,若是當前線程不是在eventLoop中,則放入隊列中等待執行acquire0,這裏可能撐爆eventLoop的taskQueue,不過其隊列大小的值取Math.max(16,SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE)),默認是Integer.MAX_VALUE。

FixedChannelPool繼承了SimpleChannelPool,並重寫了acquire、release、close方法。它對獲取鏈接進行了限制,主要有以下幾個參數:
  • maxConnections

該值先從系統變量reactor.ipc.netty.pool.maxConnections取(若是設置爲-1,表示無限制,回到elastic模式),若是沒有設置,則取Math.max(Runtime.getRuntime().availableProcessors(), 8) * 2,即核數與8的最大值的2倍。

  • acquireTimeout

該值先從系統變量reactor.ipc.netty.pool.acquireTimeout取(若是設置爲-1,表示當即執行不等待),若是沒有設置,則爲45000毫秒

  • maxPendingAcquires

這裏設置的是Integer.MAX_VALUE

  • AcquireTimeoutAction

這裏設置爲FixedChannelPool.AcquireTimeoutAction.FAIL,即timeoutTask爲

timeoutTask = new TimeoutTask() {
                    @Override
                    public void onTimeout(AcquireTask task) {
                        // Fail the promise as we timed out.
                        task.promise.setFailure(TIMEOUT_EXCEPTION);
                    }
                };
若是當前鏈接超過maxConnections,則進入pendingAcquireQueue等待獲取鏈接,而在進入pendingAcquireQueue以前,若是當前等待數量超過了maxPendingAcquires,則返回FULL_EXCEPTION( Too many outstanding acquire operations),這裏設置的是Integer.MAX_VALUE,因此不會有這個異常。進入pendingAcquireQueue以後,還有一個acquireTimeout參數,即進入pendingAcquireQueue等待acquireTimeout時間,若是尚未獲取到鏈接則返回TIMEOUT_EXCEPTION( Acquire operation took longer then configured maximum time)。

小結

默認TcpClient建立的PoolResources使用的是elastic模式,即鏈接池的實現是SimpleChannelPool,默認使用一個LIFO的Deque來維護Channel,若是從鏈接池取不到鏈接則會建立新的鏈接,上限應該是系統設置的可以打開的文件資源數量,超過則報SocketException: Too many open files。PoolResources還提供了FixedChannelPool實現,使用的是fixed模式,即限定了鏈接池最大鏈接數及最大等待超時,避免鏈接建立數量過多撐爆內存或者報SocketException: Too many open files異常。

注意,對於fixed模式,若是reactor.ipc.netty.pool.maxConnections設置爲-1,則回退到elastic模式。

doc

相關文章
相關標籤/搜索