reactor-netty中TcpClient的newHandler過程

本文主要研究一下reactor-netty中TcpClient的newHandler過程java

maven

<dependency>
            <groupId>io.projectreactor.ipc</groupId>
            <artifactId>reactor-netty</artifactId>
            <version>0.7.3.RELEASE</version>
        </dependency>

TcpClient.newHandler

reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpClient.javareact

/**
     * @param handler
     * @param address
     * @param secure
     * @param onSetup
     *
     * @return a new Mono to connect on subscribe
     */
    protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
            InetSocketAddress address,
            boolean secure,
            Consumer<? super Channel> onSetup) {

        final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>>
                targetHandler =
                null == handler ? ChannelOperations.noopHandler() : handler;

        return Mono.create(sink -> {
            SocketAddress remote = address != null ? address : options.getAddress();

            ChannelPool pool = null;

            PoolResources poolResources = options.getPoolResources();
            if (poolResources != null) {
                pool = poolResources.selectOrCreate(remote, options,
                        doHandler(null, sink, secure, remote, null, null),
                        options.getLoopResources().onClient(options.preferNative()));
            }

            ContextHandler<SocketChannel> contextHandler =
                    doHandler(targetHandler, sink, secure, remote, pool, onSetup);
            sink.onCancel(contextHandler);

            if (pool == null) {
                Bootstrap b = options.get();
                b.remoteAddress(remote);
                b.handler(contextHandler);
                contextHandler.setFuture(b.connect());
            }
            else {
                contextHandler.setFuture(pool.acquire());
            }
        });
    }
  • 這裏使用了Mono的sink來建立返回Mono<NettyContext>
  • 這裏使用poolResources.selectOrCreate來獲取一個channelPool
  • 而後建立一個contextHandler
  • 最後調用contextHandler.setFuture設置channel
  • 注意這裏調用了兩次doHandler方法,第一次調用pool參數爲null,第二次調用傳入了新建立的pool

TcpResources.selectOrCreate

reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.javagit

public ChannelPool selectOrCreate(SocketAddress address,
            Supplier<? extends Bootstrap> bootstrap,
            Consumer<? super Channel> onChannelCreate,
            EventLoopGroup group) {
        return defaultPools.selectOrCreate(address, bootstrap, onChannelCreate, group);
    }
這裏委託給DefaultPoolResources

DefaultPoolResources.selectOrCreate

reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.javagithub

public ChannelPool selectOrCreate(SocketAddress remote,
            Supplier<? extends Bootstrap> bootstrap,
            Consumer<? super Channel> onChannelCreate,
            EventLoopGroup group) {
        SocketAddress address = remote;
        for (; ; ) {
            Pool pool = channelPools.get(remote);
            if (pool != null) {
                return pool;
            }
            Bootstrap b = bootstrap.get();
            if (remote != null) {
                b = b.remoteAddress(remote);
            }
            else {
                address = b.config()
                          .remoteAddress();
            }
            if (log.isDebugEnabled()) {
                log.debug("New {} client pool for {}", name, address);
            }
            pool = new Pool(b, provider, onChannelCreate, group);
            if (channelPools.putIfAbsent(address, pool) == null) {
                return pool;
            }
            pool.close();
        }
    }
能夠看到這裏先get,get不到則new一個Pool而後放進channelPools中

DefaultPoolResources#Pool

final static class Pool extends AtomicBoolean
            implements ChannelPoolHandler, ChannelPool, ChannelHealthChecker {

        final ChannelPool               pool;
        final Consumer<? super Channel> onChannelCreate;
        final EventLoopGroup            defaultGroup;

        final AtomicInteger activeConnections = new AtomicInteger();

        final Future<Boolean> HEALTHY;
        final Future<Boolean> UNHEALTHY;

        @SuppressWarnings("unchecked")
        Pool(Bootstrap bootstrap,
                PoolFactory provider,
                Consumer<? super Channel> onChannelCreate,
                EventLoopGroup group) {
            this.pool = provider.newPool(bootstrap, this, this);
            this.onChannelCreate = onChannelCreate;
            this.defaultGroup = group;
            HEALTHY = group.next()
                           .newSucceededFuture(true);
            UNHEALTHY = group.next()
                             .newSucceededFuture(false);
        }

        @Override
        public Future<Boolean> isHealthy(Channel channel) {
            return channel.isActive() ? HEALTHY : UNHEALTHY;
        }

        @Override
        public Future<Channel> acquire() {
            return pool.acquire();
        }

        @Override
        public Future<Channel> acquire(Promise<Channel> promise) {
            return pool.acquire(promise);
        }

        @Override
        public Future<Void> release(Channel channel) {
            return pool.release(channel);
        }

        @Override
        public Future<Void> release(Channel channel, Promise<Void> promise) {
            return pool.release(channel, promise);
        }

        @Override
        public void close() {
            if(compareAndSet(false, true)) {
                pool.close();
            }
        }

        @Override
        public void channelReleased(Channel ch) throws Exception {
            activeConnections.decrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Released {}, now {} active connections",
                        ch.toString(),
                        activeConnections);
            }
        }

        @Override
        public void channelAcquired(Channel ch) throws Exception {
            activeConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Acquired {}, now {} active connections",
                        ch.toString(),
                        activeConnections);
            }
        }

        @Override
        public void channelCreated(Channel ch) throws Exception {
            activeConnections.incrementAndGet();
            if (log.isDebugEnabled()) {
                log.debug("Created {}, now {} active connections",
                        ch.toString(),
                        activeConnections);
            }
            if (onChannelCreate != null) {
                onChannelCreate.accept(ch);
            }
        }

        @Override
        public String toString() {
            return pool.getClass()
                       .getSimpleName() + "{" + "activeConnections=" + activeConnections + '}';
        }
    }
能夠看到這裏是使用provider.newPool來建立底層的ChannelPool
這裏的provider是個Lambda表達式,SimpleChannelPool::new
interface PoolFactory {

        ChannelPool newPool(Bootstrap b,
                ChannelPoolHandler handler,
                ChannelHealthChecker checker);
    }
使用的是SimpleChannelPool的Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck這三個參數的構造器
Pool自己則實現了ChannelPoolHandler以及ChannelHealthChecker接口

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.javabootstrap

/**
     * Creates a new instance.
     *
     * @param bootstrap         the {@link Bootstrap} that is used for connections
     * @param handler           the {@link ChannelPoolHandler} that will be notified for the different pool actions
     * @param healthCheck       the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is
     *                          still healthy when obtain from the {@link ChannelPool}
     */
    public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) {
        this(bootstrap, handler, healthCheck, true);
    }

ChannelPoolHandler

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/ChannelPoolHandler.javapromise

/**
 * Handler which is called for various actions done by the {@link ChannelPool}.
 */
public interface ChannelPoolHandler {
    /**
     * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or
     * {@link ChannelPool#release(Channel, Promise)}.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    void channelReleased(Channel ch) throws Exception;

    /**
     * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or
     * {@link ChannelPool#acquire(Promise)}.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    void channelAcquired(Channel ch) throws Exception;

    /**
     * Called once a new {@link Channel} is created in the {@link ChannelPool}.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    void channelCreated(Channel ch) throws Exception;
}

ChannelHealthChecker

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/ChannelHealthChecker.javaasync

/**
 * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or
 * {@link ChannelPool#acquire(Promise)}.
 */
public interface ChannelHealthChecker {

    /**
     * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}.
     */
    ChannelHealthChecker ACTIVE = new ChannelHealthChecker() {
        @Override
        public Future<Boolean> isHealthy(Channel channel) {
            EventLoop loop = channel.eventLoop();
            return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE);
        }
    };

    /**
     * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once
     * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise.
     *
     * This method will be called by the {@link EventLoop} of the {@link Channel}.
     */
    Future<Boolean> isHealthy(Channel channel);
}

SimpleChannelPool

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.javamaven

/**
 * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire
 * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced.
 *
 * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}.
 *
 */
public class SimpleChannelPool implements ChannelPool {
    private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool");
    private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace(
            new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)");

    private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
    private final ChannelPoolHandler handler;
    private final ChannelHealthChecker healthCheck;
    private final Bootstrap bootstrap;
    private final boolean releaseHealthCheck;
    private final boolean lastRecentUsed;

    //......
    /**
     * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no
     * {@link Channel} is ready to be reused.
     *
     * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
     * implementations of these methods needs to be thread-safe!
     */
    protected Channel pollChannel() {
        return lastRecentUsed ? deque.pollLast() : deque.pollFirst();
    }

    /**
     * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel}
     * could be added, {@code false} otherwise.
     *
     * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that
     * implementations of these methods needs to be thread-safe!
     */
    protected boolean offerChannel(Channel channel) {
        return deque.offer(channel);
    }
}
SimpleChannelPool使用一個LIFO的Deque來維護Channel

SimpleChannelPool.acquire

netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.javatcp

@Override
    public final Future<Channel> acquire() {
        return acquire(bootstrap.config().group().next().<Channel>newPromise());
    }

    @Override
    public Future<Channel> acquire(final Promise<Channel> promise) {
        checkNotNull(promise, "promise");
        return acquireHealthyFromPoolOrNew(promise);
    }

    /**
     * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
     * @param promise the promise to provide acquire result.
     * @return future for acquiring a channel.
     */
    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }
注意這裏調用了pollChannel從deque中獲取並進行healthCheck,若是爲null則新創建一個

SimpleChannelPool.release

@Override
    public final Future<Void> release(Channel channel) {
        return release(channel, channel.eventLoop().<Void>newPromise());
    }

    @Override
    public Future<Void> release(final Channel channel, final Promise<Void> promise) {
        checkNotNull(channel, "channel");
        checkNotNull(promise, "promise");
        try {
            EventLoop loop = channel.eventLoop();
            if (loop.inEventLoop()) {
                doReleaseChannel(channel, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doReleaseChannel(channel, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            closeAndFail(channel, cause, promise);
        }
        return promise;
    }

    private void doReleaseChannel(Channel channel, Promise<Void> promise) {
        assert channel.eventLoop().inEventLoop();
        // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail.
        if (channel.attr(POOL_KEY).getAndSet(null) != this) {
            closeAndFail(channel,
                         // Better include a stacktrace here as this is an user error.
                         new IllegalArgumentException(
                                 "Channel " + channel + " was not acquired from this ChannelPool"),
                         promise);
        } else {
            try {
                if (releaseHealthCheck) {
                    doHealthCheckOnRelease(channel, promise);
                } else {
                    releaseAndOffer(channel, promise);
                }
            } catch (Throwable cause) {
                closeAndFail(channel, cause, promise);
            }
        }
    }

    private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception {
        final Future<Boolean> f = healthCheck.isHealthy(channel);
        if (f.isDone()) {
            releaseAndOfferIfHealthy(channel, promise, f);
        } else {
            f.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    releaseAndOfferIfHealthy(channel, promise, f);
                }
            });
        }
    }

    /**
     * Adds the channel back to the pool only if the channel is healthy.
     * @param channel the channel to put back to the pool
     * @param promise offer operation promise.
     * @param future the future that contains information fif channel is healthy or not.
     * @throws Exception in case when failed to notify handler about release operation.
     */
    private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future)
            throws Exception {
        if (future.getNow()) { //channel turns out to be healthy, offering and releasing it.
            releaseAndOffer(channel, promise);
        } else { //channel not healthy, just releasing it.
            handler.channelReleased(channel);
            promise.setSuccess(null);
        }
    }

    private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception {
        if (offerChannel(channel)) {
            handler.channelReleased(channel);
            promise.setSuccess(null);
        } else {
            closeAndFail(channel, FULL_EXCEPTION, promise);
        }
    }
在release的時候調用offerChannel將Channel放回deque中
使用三個參數的構造器建立的SimpleChannelPool,其releaseHealthCheck值爲true,即釋放的時候進行health check

TcpClient.doHandler

/**
     * Create a {@link ContextHandler} for {@link Bootstrap#handler()}
     *
     * @param handler user provided in/out handler
     * @param sink user provided bind handler
     * @param secure if operation should be secured
     * @param pool if channel pool
     * @param onSetup if operation has local setup callback
     *
     * @return a new {@link ContextHandler}
     */
    protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler,
            MonoSink<NettyContext> sink,
            boolean secure,
            SocketAddress providedAddress,
            ChannelPool pool,
            Consumer<? super Channel> onSetup) {
        return ContextHandler.newClientContext(sink,
                options,
                loggingHandler,
                secure,
                providedAddress,
                pool,
                handler == null ? EMPTY :
                        (ch, c, msg) -> ChannelOperations.bind(ch, handler, c));
    }
這裏調用ContextHandler.newClientContext建立了一個ContextHandler<SocketChannel>

ContextHandler.newClientContext

reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/channel/ContextHandler.javaide

/**
     * Create a new client context with optional pool support
     *
     * @param sink
     * @param options
     * @param loggingHandler
     * @param secure
     * @param providedAddress
     * @param channelOpFactory
     * @param pool
     * @param <CHANNEL>
     *
     * @return a new {@link ContextHandler} for clients
     */
    public static <CHANNEL extends Channel> ContextHandler<CHANNEL> newClientContext(
            MonoSink<NettyContext> sink,
            ClientOptions options,
            LoggingHandler loggingHandler,
            boolean secure,
            SocketAddress providedAddress,
            ChannelPool pool, ChannelOperations.OnNew<CHANNEL> channelOpFactory) {
        if (pool != null) {
            return new PooledClientContextHandler<>(channelOpFactory,
                    options,
                    sink,
                    loggingHandler,
                    secure,
                    providedAddress,
                    pool);
        }
        return new ClientContextHandler<>(channelOpFactory,
                options,
                sink,
                loggingHandler,
                secure,
                providedAddress);
    }
注意這裏將newHandler的Lambda表達式註冊爲ChannelOperations.OnNew<CHANNEL>的channelOpFactory
第一次調用doHandler的時候pool爲null,建立的是ClientContextHandler;等pool建立好了,第二次調用doHandler的時候,pool不爲null,建立的是PooledClientContextHandler

PooledClientContextHandler

reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/channel/PooledClientContextHandler.java

@Override
    public void fireContextActive(NettyContext context) {
        if (!fired) {
            fired = true;
            if (context != null) {
                sink.success(context);
            }
            else {
                sink.success();
            }
        }
    }

    @Override
    @SuppressWarnings("unchecked")
    public void setFuture(Future<?> future) {
        Objects.requireNonNull(future, "future");

        Future<CHANNEL> f;
        for (; ; ) {
            f = this.future;

            if (f == DISPOSED) {
                if (log.isDebugEnabled()) {
                    log.debug("Cancelled existing channel from pool: {}",
                            pool.toString());
                }
                sink.success();
                return;
            }

            if (FUTURE.compareAndSet(this, f, future)) {
                break;
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Acquiring existing channel from pool: {} {}",
                    future,
                    pool.toString());
        }
        ((Future<CHANNEL>) future).addListener(this);
    }

    final void connectOrAcquire(CHANNEL c) {
        if (DISPOSED == this.future) {
            if (log.isDebugEnabled()) {
                log.debug("Dropping acquisition {} because of {}",
                        "asynchronous user cancellation");
            }
            disposeOperationThenRelease(c);
            sink.success();
            return;
        }

        if (!c.isActive()) {
            log.debug("Immediately aborted pooled channel, re-acquiring new " + "channel: {}",
                    c.toString());
            release(c);
            setFuture(pool.acquire());
            return;
        }

        ChannelOperationsHandler op = c.pipeline()
                                       .get(ChannelOperationsHandler.class);

        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("Created new pooled channel: " + c.toString());
            }
            c.closeFuture()
             .addListener(ff -> release(c));
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("Acquired active channel: " + c.toString());
        }
        if (createOperations(c, null) == null) {
            setFuture(pool.acquire());
        }
    }

    public void operationComplete(Future<CHANNEL> future) throws Exception {
        if (future.isCancelled()) {
            if (log.isDebugEnabled()) {
                log.debug("Cancelled {}", future.toString());
            }
            return;
        }

        if (DISPOSED == this.future) {
            if (log.isDebugEnabled()) {
                log.debug("Dropping acquisition {} because of {}",
                        future,
                        "asynchronous user cancellation");
            }
            if (future.isSuccess()) {
                disposeOperationThenRelease(future.get());
            }
            sink.success();
            return;
        }

        if (!future.isSuccess()) {
            if (future.cause() != null) {
                fireContextError(future.cause());
            }
            else {
                fireContextError(new AbortedException("error while acquiring connection"));
            }
            return;
        }

        CHANNEL c = future.get();

        if (c.eventLoop()
             .inEventLoop()) {
            connectOrAcquire(c);
        }
        else {
            c.eventLoop()
             .execute(() -> connectOrAcquire(c));
        }
    }
fireContextActive,setFuture,connectOrAcquire,operationComplete這幾個方法都會調用MonoCreate的success方法來產生數據

Mono.subscribe

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/Mono.java

/**
     * Subscribe to this {@link Mono} and request unbounded demand.
     * <p>
     * This version doesn't specify any consumption behavior for the events from the
     * chain, especially no error handling, so other variants should usually be preferred.
     *
     * <p>
     * <img width="500" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/unbounded1.png" alt="">
     * <p>
     *
     * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription}
     */
    public final Disposable subscribe() {
        if(this instanceof MonoProcessor){
            MonoProcessor<T> s = (MonoProcessor<T>)this;
            s.connect();
            return s;
        }
        else{
            return subscribeWith(new LambdaMonoSubscriber<>(null, null, null, null));
        }
    }
這裏建立的是LambdaMonoSubscriber,最後調用的是MonoCreate的subscribe(actual)方法

reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/MonoCreate.java

public void subscribe(CoreSubscriber<? super T> actual) {
        DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual);

        actual.onSubscribe(emitter);

        try {
            callback.accept(emitter);
        }
        catch (Throwable ex) {
            emitter.error(Operators.onOperatorError(ex, actual.currentContext()));
        }
    }
這裏的actual就是LambdaMonoSubscriber
這裏的callback.accept就是調用newHandler裏頭的Mono.create裏頭的Lambda表達式,也就是mono的sink,觸發創建鏈接發送請求

小結

TcpClient.newHandler返回的是一個Mono,而在subscribe的時候觸發執行MonoCreate的Lambda表達式。

  • 裏頭從channelPools獲取或新建一個channelPool
  • 將newHandler裏頭的Lambda表達式註冊爲ChannelOperations.OnNew的channelOpFactory,在鏈接創建以後執行,即發送數據
  • 而後調用channelPool的acquire方法(創建好鏈接)
  • 最後鏈接釋放的時候將channel歸還回對應地址的channelPool。
相關文章
相關標籤/搜索