本文主要研究一下reactor-netty中TcpClient的newHandler過程java
<dependency> <groupId>io.projectreactor.ipc</groupId> <artifactId>reactor-netty</artifactId> <version>0.7.3.RELEASE</version> </dependency>
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpClient.javareact
/** * @param handler * @param address * @param secure * @param onSetup * * @return a new Mono to connect on subscribe */ protected Mono<NettyContext> newHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler, InetSocketAddress address, boolean secure, Consumer<? super Channel> onSetup) { final BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> targetHandler = null == handler ? ChannelOperations.noopHandler() : handler; return Mono.create(sink -> { SocketAddress remote = address != null ? address : options.getAddress(); ChannelPool pool = null; PoolResources poolResources = options.getPoolResources(); if (poolResources != null) { pool = poolResources.selectOrCreate(remote, options, doHandler(null, sink, secure, remote, null, null), options.getLoopResources().onClient(options.preferNative())); } ContextHandler<SocketChannel> contextHandler = doHandler(targetHandler, sink, secure, remote, pool, onSetup); sink.onCancel(contextHandler); if (pool == null) { Bootstrap b = options.get(); b.remoteAddress(remote); b.handler(contextHandler); contextHandler.setFuture(b.connect()); } else { contextHandler.setFuture(pool.acquire()); } }); }
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/tcp/TcpResources.javagit
public ChannelPool selectOrCreate(SocketAddress address, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { return defaultPools.selectOrCreate(address, bootstrap, onChannelCreate, group); }
這裏委託給DefaultPoolResources
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/resources/DefaultPoolResources.javagithub
public ChannelPool selectOrCreate(SocketAddress remote, Supplier<? extends Bootstrap> bootstrap, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { SocketAddress address = remote; for (; ; ) { Pool pool = channelPools.get(remote); if (pool != null) { return pool; } Bootstrap b = bootstrap.get(); if (remote != null) { b = b.remoteAddress(remote); } else { address = b.config() .remoteAddress(); } if (log.isDebugEnabled()) { log.debug("New {} client pool for {}", name, address); } pool = new Pool(b, provider, onChannelCreate, group); if (channelPools.putIfAbsent(address, pool) == null) { return pool; } pool.close(); } }
能夠看到這裏先get,get不到則new一個Pool而後放進channelPools中
final static class Pool extends AtomicBoolean implements ChannelPoolHandler, ChannelPool, ChannelHealthChecker { final ChannelPool pool; final Consumer<? super Channel> onChannelCreate; final EventLoopGroup defaultGroup; final AtomicInteger activeConnections = new AtomicInteger(); final Future<Boolean> HEALTHY; final Future<Boolean> UNHEALTHY; @SuppressWarnings("unchecked") Pool(Bootstrap bootstrap, PoolFactory provider, Consumer<? super Channel> onChannelCreate, EventLoopGroup group) { this.pool = provider.newPool(bootstrap, this, this); this.onChannelCreate = onChannelCreate; this.defaultGroup = group; HEALTHY = group.next() .newSucceededFuture(true); UNHEALTHY = group.next() .newSucceededFuture(false); } @Override public Future<Boolean> isHealthy(Channel channel) { return channel.isActive() ? HEALTHY : UNHEALTHY; } @Override public Future<Channel> acquire() { return pool.acquire(); } @Override public Future<Channel> acquire(Promise<Channel> promise) { return pool.acquire(promise); } @Override public Future<Void> release(Channel channel) { return pool.release(channel); } @Override public Future<Void> release(Channel channel, Promise<Void> promise) { return pool.release(channel, promise); } @Override public void close() { if(compareAndSet(false, true)) { pool.close(); } } @Override public void channelReleased(Channel ch) throws Exception { activeConnections.decrementAndGet(); if (log.isDebugEnabled()) { log.debug("Released {}, now {} active connections", ch.toString(), activeConnections); } } @Override public void channelAcquired(Channel ch) throws Exception { activeConnections.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("Acquired {}, now {} active connections", ch.toString(), activeConnections); } } @Override public void channelCreated(Channel ch) throws Exception { activeConnections.incrementAndGet(); if (log.isDebugEnabled()) { log.debug("Created {}, now {} active connections", ch.toString(), activeConnections); } if (onChannelCreate != null) { onChannelCreate.accept(ch); } } @Override public String toString() { return pool.getClass() .getSimpleName() + "{" + "activeConnections=" + activeConnections + '}'; } }
能夠看到這裏是使用provider.newPool來建立底層的ChannelPool
這裏的provider是個Lambda表達式,SimpleChannelPool::new
interface PoolFactory { ChannelPool newPool(Bootstrap b, ChannelPoolHandler handler, ChannelHealthChecker checker); }
使用的是SimpleChannelPool的Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck這三個參數的構造器
Pool自己則實現了ChannelPoolHandler以及ChannelHealthChecker接口
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.javabootstrap
/** * Creates a new instance. * * @param bootstrap the {@link Bootstrap} that is used for connections * @param handler the {@link ChannelPoolHandler} that will be notified for the different pool actions * @param healthCheck the {@link ChannelHealthChecker} that will be used to check if a {@link Channel} is * still healthy when obtain from the {@link ChannelPool} */ public SimpleChannelPool(Bootstrap bootstrap, final ChannelPoolHandler handler, ChannelHealthChecker healthCheck) { this(bootstrap, handler, healthCheck, true); }
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/ChannelPoolHandler.javapromise
/** * Handler which is called for various actions done by the {@link ChannelPool}. */ public interface ChannelPoolHandler { /** * Called once a {@link Channel} was released by calling {@link ChannelPool#release(Channel)} or * {@link ChannelPool#release(Channel, Promise)}. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ void channelReleased(Channel ch) throws Exception; /** * Called once a {@link Channel} was acquired by calling {@link ChannelPool#acquire()} or * {@link ChannelPool#acquire(Promise)}. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ void channelAcquired(Channel ch) throws Exception; /** * Called once a new {@link Channel} is created in the {@link ChannelPool}. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ void channelCreated(Channel ch) throws Exception; }
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/ChannelHealthChecker.javaasync
/** * Called before a {@link Channel} will be returned via {@link ChannelPool#acquire()} or * {@link ChannelPool#acquire(Promise)}. */ public interface ChannelHealthChecker { /** * {@link ChannelHealthChecker} implementation that checks if {@link Channel#isActive()} returns {@code true}. */ ChannelHealthChecker ACTIVE = new ChannelHealthChecker() { @Override public Future<Boolean> isHealthy(Channel channel) { EventLoop loop = channel.eventLoop(); return channel.isActive()? loop.newSucceededFuture(Boolean.TRUE) : loop.newSucceededFuture(Boolean.FALSE); } }; /** * Check if the given channel is healthy which means it can be used. The returned {@link Future} is notified once * the check is complete. If notified with {@link Boolean#TRUE} it can be used {@link Boolean#FALSE} otherwise. * * This method will be called by the {@link EventLoop} of the {@link Channel}. */ Future<Boolean> isHealthy(Channel channel); }
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.javamaven
/** * Simple {@link ChannelPool} implementation which will create new {@link Channel}s if someone tries to acquire * a {@link Channel} but none is in the pool atm. No limit on the maximal concurrent {@link Channel}s is enforced. * * This implementation uses LIFO order for {@link Channel}s in the {@link ChannelPool}. * */ public class SimpleChannelPool implements ChannelPool { private static final AttributeKey<SimpleChannelPool> POOL_KEY = AttributeKey.newInstance("channelPool"); private static final IllegalStateException FULL_EXCEPTION = ThrowableUtil.unknownStackTrace( new IllegalStateException("ChannelPool full"), SimpleChannelPool.class, "releaseAndOffer(...)"); private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque(); private final ChannelPoolHandler handler; private final ChannelHealthChecker healthCheck; private final Bootstrap bootstrap; private final boolean releaseHealthCheck; private final boolean lastRecentUsed; //...... /** * Poll a {@link Channel} out of the internal storage to reuse it. This will return {@code null} if no * {@link Channel} is ready to be reused. * * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that * implementations of these methods needs to be thread-safe! */ protected Channel pollChannel() { return lastRecentUsed ? deque.pollLast() : deque.pollFirst(); } /** * Offer a {@link Channel} back to the internal storage. This will return {@code true} if the {@link Channel} * could be added, {@code false} otherwise. * * Sub-classes may override {@link #pollChannel()} and {@link #offerChannel(Channel)}. Be aware that * implementations of these methods needs to be thread-safe! */ protected boolean offerChannel(Channel channel) { return deque.offer(channel); } }
SimpleChannelPool使用一個LIFO的Deque來維護Channel
netty-transport-4.1.20.Final-sources.jar!/io/netty/channel/pool/SimpleChannelPool.javatcp
@Override public final Future<Channel> acquire() { return acquire(bootstrap.config().group().next().<Channel>newPromise()); } @Override public Future<Channel> acquire(final Promise<Channel> promise) { checkNotNull(promise, "promise"); return acquireHealthyFromPoolOrNew(promise); } /** * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise. * @param promise the promise to provide acquire result. * @return future for acquiring a channel. */ private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) { try { final Channel ch = pollChannel(); if (ch == null) { // No Channel left in the pool bootstrap a new Channel Bootstrap bs = bootstrap.clone(); bs.attr(POOL_KEY, this); ChannelFuture f = connectChannel(bs); if (f.isDone()) { notifyConnect(f, promise); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { notifyConnect(future, promise); } }); } return promise; } EventLoop loop = ch.eventLoop(); if (loop.inEventLoop()) { doHealthCheck(ch, promise); } else { loop.execute(new Runnable() { @Override public void run() { doHealthCheck(ch, promise); } }); } } catch (Throwable cause) { promise.tryFailure(cause); } return promise; }
注意這裏調用了pollChannel從deque中獲取並進行healthCheck,若是爲null則新創建一個
@Override public final Future<Void> release(Channel channel) { return release(channel, channel.eventLoop().<Void>newPromise()); } @Override public Future<Void> release(final Channel channel, final Promise<Void> promise) { checkNotNull(channel, "channel"); checkNotNull(promise, "promise"); try { EventLoop loop = channel.eventLoop(); if (loop.inEventLoop()) { doReleaseChannel(channel, promise); } else { loop.execute(new Runnable() { @Override public void run() { doReleaseChannel(channel, promise); } }); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } return promise; } private void doReleaseChannel(Channel channel, Promise<Void> promise) { assert channel.eventLoop().inEventLoop(); // Remove the POOL_KEY attribute from the Channel and check if it was acquired from this pool, if not fail. if (channel.attr(POOL_KEY).getAndSet(null) != this) { closeAndFail(channel, // Better include a stacktrace here as this is an user error. new IllegalArgumentException( "Channel " + channel + " was not acquired from this ChannelPool"), promise); } else { try { if (releaseHealthCheck) { doHealthCheckOnRelease(channel, promise); } else { releaseAndOffer(channel, promise); } } catch (Throwable cause) { closeAndFail(channel, cause, promise); } } } private void doHealthCheckOnRelease(final Channel channel, final Promise<Void> promise) throws Exception { final Future<Boolean> f = healthCheck.isHealthy(channel); if (f.isDone()) { releaseAndOfferIfHealthy(channel, promise, f); } else { f.addListener(new FutureListener<Boolean>() { @Override public void operationComplete(Future<Boolean> future) throws Exception { releaseAndOfferIfHealthy(channel, promise, f); } }); } } /** * Adds the channel back to the pool only if the channel is healthy. * @param channel the channel to put back to the pool * @param promise offer operation promise. * @param future the future that contains information fif channel is healthy or not. * @throws Exception in case when failed to notify handler about release operation. */ private void releaseAndOfferIfHealthy(Channel channel, Promise<Void> promise, Future<Boolean> future) throws Exception { if (future.getNow()) { //channel turns out to be healthy, offering and releasing it. releaseAndOffer(channel, promise); } else { //channel not healthy, just releasing it. handler.channelReleased(channel); promise.setSuccess(null); } } private void releaseAndOffer(Channel channel, Promise<Void> promise) throws Exception { if (offerChannel(channel)) { handler.channelReleased(channel); promise.setSuccess(null); } else { closeAndFail(channel, FULL_EXCEPTION, promise); } }
在release的時候調用offerChannel將Channel放回deque中
使用三個參數的構造器建立的SimpleChannelPool,其releaseHealthCheck值爲true,即釋放的時候進行health check
/** * Create a {@link ContextHandler} for {@link Bootstrap#handler()} * * @param handler user provided in/out handler * @param sink user provided bind handler * @param secure if operation should be secured * @param pool if channel pool * @param onSetup if operation has local setup callback * * @return a new {@link ContextHandler} */ protected ContextHandler<SocketChannel> doHandler(BiFunction<? super NettyInbound, ? super NettyOutbound, ? extends Publisher<Void>> handler, MonoSink<NettyContext> sink, boolean secure, SocketAddress providedAddress, ChannelPool pool, Consumer<? super Channel> onSetup) { return ContextHandler.newClientContext(sink, options, loggingHandler, secure, providedAddress, pool, handler == null ? EMPTY : (ch, c, msg) -> ChannelOperations.bind(ch, handler, c)); }
這裏調用ContextHandler.newClientContext建立了一個ContextHandler<SocketChannel>
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/channel/ContextHandler.javaide
/** * Create a new client context with optional pool support * * @param sink * @param options * @param loggingHandler * @param secure * @param providedAddress * @param channelOpFactory * @param pool * @param <CHANNEL> * * @return a new {@link ContextHandler} for clients */ public static <CHANNEL extends Channel> ContextHandler<CHANNEL> newClientContext( MonoSink<NettyContext> sink, ClientOptions options, LoggingHandler loggingHandler, boolean secure, SocketAddress providedAddress, ChannelPool pool, ChannelOperations.OnNew<CHANNEL> channelOpFactory) { if (pool != null) { return new PooledClientContextHandler<>(channelOpFactory, options, sink, loggingHandler, secure, providedAddress, pool); } return new ClientContextHandler<>(channelOpFactory, options, sink, loggingHandler, secure, providedAddress); }
注意這裏將newHandler的Lambda表達式註冊爲ChannelOperations.OnNew<CHANNEL>的channelOpFactory
第一次調用doHandler的時候pool爲null,建立的是ClientContextHandler;等pool建立好了,第二次調用doHandler的時候,pool不爲null,建立的是PooledClientContextHandler
reactor-netty-0.7.3.RELEASE-sources.jar!/reactor/ipc/netty/channel/PooledClientContextHandler.java
@Override public void fireContextActive(NettyContext context) { if (!fired) { fired = true; if (context != null) { sink.success(context); } else { sink.success(); } } } @Override @SuppressWarnings("unchecked") public void setFuture(Future<?> future) { Objects.requireNonNull(future, "future"); Future<CHANNEL> f; for (; ; ) { f = this.future; if (f == DISPOSED) { if (log.isDebugEnabled()) { log.debug("Cancelled existing channel from pool: {}", pool.toString()); } sink.success(); return; } if (FUTURE.compareAndSet(this, f, future)) { break; } } if (log.isDebugEnabled()) { log.debug("Acquiring existing channel from pool: {} {}", future, pool.toString()); } ((Future<CHANNEL>) future).addListener(this); } final void connectOrAcquire(CHANNEL c) { if (DISPOSED == this.future) { if (log.isDebugEnabled()) { log.debug("Dropping acquisition {} because of {}", "asynchronous user cancellation"); } disposeOperationThenRelease(c); sink.success(); return; } if (!c.isActive()) { log.debug("Immediately aborted pooled channel, re-acquiring new " + "channel: {}", c.toString()); release(c); setFuture(pool.acquire()); return; } ChannelOperationsHandler op = c.pipeline() .get(ChannelOperationsHandler.class); if (op == null) { if (log.isDebugEnabled()) { log.debug("Created new pooled channel: " + c.toString()); } c.closeFuture() .addListener(ff -> release(c)); return; } if (log.isDebugEnabled()) { log.debug("Acquired active channel: " + c.toString()); } if (createOperations(c, null) == null) { setFuture(pool.acquire()); } } public void operationComplete(Future<CHANNEL> future) throws Exception { if (future.isCancelled()) { if (log.isDebugEnabled()) { log.debug("Cancelled {}", future.toString()); } return; } if (DISPOSED == this.future) { if (log.isDebugEnabled()) { log.debug("Dropping acquisition {} because of {}", future, "asynchronous user cancellation"); } if (future.isSuccess()) { disposeOperationThenRelease(future.get()); } sink.success(); return; } if (!future.isSuccess()) { if (future.cause() != null) { fireContextError(future.cause()); } else { fireContextError(new AbortedException("error while acquiring connection")); } return; } CHANNEL c = future.get(); if (c.eventLoop() .inEventLoop()) { connectOrAcquire(c); } else { c.eventLoop() .execute(() -> connectOrAcquire(c)); } }
fireContextActive,setFuture,connectOrAcquire,operationComplete這幾個方法都會調用MonoCreate的success方法來產生數據
reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/Mono.java
/** * Subscribe to this {@link Mono} and request unbounded demand. * <p> * This version doesn't specify any consumption behavior for the events from the * chain, especially no error handling, so other variants should usually be preferred. * * <p> * <img width="500" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/unbounded1.png" alt=""> * <p> * * @return a new {@link Disposable} that can be used to cancel the underlying {@link Subscription} */ public final Disposable subscribe() { if(this instanceof MonoProcessor){ MonoProcessor<T> s = (MonoProcessor<T>)this; s.connect(); return s; } else{ return subscribeWith(new LambdaMonoSubscriber<>(null, null, null, null)); } }
這裏建立的是LambdaMonoSubscriber,最後調用的是MonoCreate的subscribe(actual)方法
reactor-core-3.1.3.RELEASE-sources.jar!/reactor/core/publisher/MonoCreate.java
public void subscribe(CoreSubscriber<? super T> actual) { DefaultMonoSink<T> emitter = new DefaultMonoSink<>(actual); actual.onSubscribe(emitter); try { callback.accept(emitter); } catch (Throwable ex) { emitter.error(Operators.onOperatorError(ex, actual.currentContext())); } }
這裏的actual就是LambdaMonoSubscriber
這裏的callback.accept就是調用newHandler裏頭的Mono.create裏頭的Lambda表達式,也就是mono的sink,觸發創建鏈接發送請求
TcpClient.newHandler返回的是一個Mono,而在subscribe的時候觸發執行MonoCreate的Lambda表達式。
創建好鏈接
)