本文主要研究一下NettyConnector的start及shutdownhtml
reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/NettyConnector.javajava
/** * A Netty connector is an inbound/outbound factory sharing configuration but usually no * runtime * (connection...) state at the exception of shared connection pool setups. Subscribing * to the returned {@link Mono} will effectively * create a new stateful "client" or "server" socket depending on the implementation. * It might also be working on top of a socket pool or connection pool as well, but the * state should be safely handled by the pool itself. * <p> * <p>Clients or Receivers will onSubscribe when their connection is established. They * will complete when the unique returned closing {@link Publisher} completes itself or if * the connection is remotely terminated. Calling the returned {@link * Disposable#dispose()} from {@link Mono#subscribe()} will terminate the subscription * and underlying connection from the local peer. * <p> * <p>Servers or Producers will onSubscribe when their socket is bound locally. They will * never complete as many {@link Publisher} close selectors will be expected. Disposing * the returned {@link Mono} will safely call shutdown. * * @param <INBOUND> incoming traffic API such as server request or client response * @param <OUTBOUND> outgoing traffic API such as server response or client request * @author Stephane Maldini * @since 0.6 */ public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> { /** * Prepare a {@link BiFunction} IO handler that will react on a new connected state * each * time * the returned {@link Mono} is subscribed. This {@link NettyConnector} shouldn't assume * any state related to the individual created/cleaned resources. * <p> * The IO handler will return {@link Publisher} to signal when to terminate the * underlying resource channel. * * @param ioHandler the in/out callback returning a closing publisher * * @return a {@link Mono} completing with a {@link Disposable} token to dispose * the active handler (server, client connection...) or failing with the connection * error. */ Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler); /** * Start a Client or Server in a blocking fashion, and wait for it to finish initializing. * The returned {@link BlockingNettyContext} class offers a simplified API around operating * the client/server in a blocking fashion, including to {@link BlockingNettyContext#shutdown() shut it down}. * * @param handler the handler to start the client or server with. * @param <T> * @return a {@link BlockingNettyContext} */ default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> BlockingNettyContext start(T handler) { return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName()); } /** * Start a Client or Server in a blocking fashion, and wait for it to finish initializing. * The returned {@link BlockingNettyContext} class offers a simplified API around operating * the client/server in a blocking fashion, including to {@link BlockingNettyContext#shutdown() shut it down}. * * @param handler the handler to start the client or server with. * @param timeout wait for Client/Server to start for the specified timeout. * @param <T> * @return a {@link BlockingNettyContext} */ default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> BlockingNettyContext start(T handler, Duration timeout) { return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName(), timeout); } /** * Start a Client or Server in a fully blocking fashion, not only waiting for it to * initialize but also blocking during the full lifecycle of the client/server. * Since most servers will be long-lived, this is more adapted to running a server * out of a main method, only allowing shutdown of the servers through sigkill. * <p> * Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added * by this method in order to properly disconnect the client/server upon receiving * a sigkill signal. * * @param handler the handler to execute. */ default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> void startAndAwait(T handler) { startAndAwait(handler, null); } /** * Start a Client or Server in a fully blocking fashion, not only waiting for it to * initialize but also blocking during the full lifecycle of the client/server. * Since most servers will be long-lived, this is more adapted to running a server * out of a main method, only allowing shutdown of the servers through sigkill. * <p> * Note that a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} is added * by this method in order to properly disconnect the client/server upon receiving * a sigkill signal. * * @param handler the handler to execute. * @param onStart an optional callback to be invoked once the client/server has finished * initializing. */ default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>> void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) { BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName()); facade.installShutdownHook(); if (onStart != null) { onStart.accept(facade); } facade.getContext() .onClose() .block(); } }
能夠看到這個類有5個方法,一個newHandler是non-blocking模式的,其餘的幾個start開頭的都是blocking模式的(
duration參數用於指定等待初始化完成的超時時間
),使用的是BlockingNettyContext
newHandler返回的是一個Mono<? extends NettyContext>,在這個mono完成的時候,會本身dispose。
實例以下react
@Test public void testNewHandler() throws InterruptedException { TcpClient client = TcpClient.create("localhost", 9090); Mono<? extends NettyContext> mono = client.newHandler((inbound,outbound) -> { return outbound.sendString(Mono.just("Hello World!")).then(); }); CountDownLatch latch = new CountDownLatch(1); Disposable disposable = mono .doFinally(e -> { System.out.println("finish:"+e); latch.countDown(); }) .subscribe(); latch.await(); System.out.println(disposable.isDisposed()); }
start方法返回的是BlockingNettyContext,用戶能夠調用BlockingNettyContext的shutdown方法來dispose nettyContext,好比api
@Test public void testShutdown(){ TcpClient client = TcpClient.create("localhost", 9090); CountDownLatch latch = new CountDownLatch(1); BlockingNettyContext context = client.start((inbound, outbound) -> { latch.countDown(); return outbound.sendString(Mono.just("hello world")) .then(); }); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } finally { context.shutdown(); } }
reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/tcp/BlockingNettyContext.javaapp
/** * Shut down the {@link NettyContext} and wait for its termination, up to the * {@link #setLifecycleTimeout(Duration) lifecycle timeout}. */ public void shutdown() { if (context.isDisposed()) { return; } removeShutdownHook(); //only applies if not called from the hook's thread context.dispose(); context.onClose() .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e)) .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address())) .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms"))) .block(); } /** * Remove a {@link Runtime#removeShutdownHook(Thread) JVM shutdown hook} if one was * {@link #installShutdownHook() installed} by this {@link BlockingNettyContext}. * * @return true if there was a hook and it was removed, false otherwise. */ public boolean removeShutdownHook() { if (this.shutdownHook != null && Thread.currentThread() != this.shutdownHook) { Thread sdh = this.shutdownHook; this.shutdownHook = null; return Runtime.getRuntime().removeShutdownHook(sdh); } return false; }
這裏的shutdown主要是移除當前的shutdownHook,而後dispose nettyContext
startAndAwait方法調用了BlockingNettyContext的installShutdownHook來進行關閉
reactor-netty-0.7.6.RELEASE-sources.jar!/reactor/ipc/netty/tcp/BlockingNettyContext.javasocket
/** * Install a {@link Runtime#addShutdownHook(Thread) JVM shutdown hook} that will * shutdown this {@link BlockingNettyContext} if the JVM is terminated externally. * <p> * The hook is removed if shutdown manually, and subsequent calls to this method are * no-op. */ public void installShutdownHook() { //don't return the hook to discourage uninstalling it externally if (this.shutdownHook != null) { return; } this.shutdownHook = new Thread(this::shutdownFromJVM); Runtime.getRuntime().addShutdownHook(this.shutdownHook); } protected void shutdownFromJVM() { if (context.isDisposed()) { return; } final String hookDesc = Thread.currentThread().toString(); context.dispose(); context.onClose() .doOnError(e -> LOG.error("Stopped {} on {} with an error {} from JVM hook {}", description, context.address(), e, hookDesc)) .doOnTerminate(() -> LOG.info("Stopped {} on {} from JVM hook {}", description, context.address(), hookDesc)) .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms"))) .block(); }
在shutdownHook裏頭註冊了shutdownFromJVM方法,用於關閉NettyContext。
實例tcp
@Test public void testStartAndAwait(){ TcpClient client = TcpClient.create("localhost", 9090); client.startAndAwait((inbound, outbound) -> { return outbound.sendString(Mono.just("hello world")) .then(); }); }
NettyConnector提供了non-blocking及blocking兩種使用方式,non-blocking的話,使用newHandler返回一個Mono<? extends NettyContext>,在它會在完成的時候,本身dispose nettyContext;blocking的話,startAndAwait方法會自動幫你註冊shutdownHook來dispose nettyContext,而start方法則返回BlockingNettyContext,容許調用shutdown方法來dispose nettyContext。this