Java編程方法論-Spring WebFlux篇 Reactor-Netty下TcpServer的功能實現 1

前言

本系列爲本人Java編程方法論 響應式解讀系列的Webflux部分,現分享出來,前置知識Rxjava2 ,Reactor的相關解讀已經錄製分享視頻,併發布在b站,地址以下:java

Rxjava源碼解讀與分享:https://www.bilibili.com/video/av34537840react

Reactor源碼解讀與分享:https://www.bilibili.com/video/av35326911web

NIO源碼解讀相關視頻分享: https://www.bilibili.com/video/av43230997編程

NIO源碼解讀視頻相關配套文章:bootstrap

BIO到NIO源碼的一些事兒之BIO服務器

BIO到NIO源碼的一些事兒之NIO 上併發

BIO到NIO源碼的一些事兒之NIO 中app

BIO到NIO源碼的一些事兒之NIO 下 之 Selectoride

BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 上oop

BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 下

Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 上

Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 下

Java編程方法論-Spring WebFlux篇 Reactor-Netty下HttpServer 的封裝

其中,Rxjava與Reactor做爲本人書中內容將不對外開放,你們感興趣能夠花點時間來觀看視頻,本人對着兩個庫進行了全面完全細緻的解讀,包括其中的設計理念和相關的方法論,也但願你們能夠留言糾正我其中的錯誤。

本書主要針對Netty服務器來說,因此讀者應具有有關Netty的基本知識和應用技能。接下來,咱們將對Reactor-netty從設計到實現的細節一一探究,讓你們真的從中學習到好的封裝設計理念。本書在寫時所參考的最新版本是Reactor-netty 0.7.8.Release這個版本,但如今已有0.8版本,並且0.70.8版本在源碼細節有不小的變更,這點給你們提醒下。我會針對0.8版本進行全新的解讀並在將來出版的書中進行展現。

TcpServer的功能實現

這裏,咱們會首先解讀Reactor Netty是如何針對NettyBootstrapChildHandler進行封裝以及響應式拓展等一些細節探究。接着,咱們會涉及到HttpHandler的引入,以此來對接咱們上層web服務。

針對Bootstrap的ChildHandler的封裝

由於這是咱們切入自定義邏輯的地方,因此,咱們首先來關注下與其相關的ChannelHandler,以及前文並未提到的,服務器究竟是如何啓動以及如何經過響應式來作到優雅的關閉,首先咱們會接觸關閉服務器的設定。

ChannelHandler引入與使用響應式優雅關閉服務器

咱們再回到reactor.ipc.netty.http.server.HttpServer#HttpServer這個構造器中,由上一章咱們知道請求是HTTP層面的(應用層),必須依賴於TCP的鏈接實現,因此這裏就要有一個TCPServer的實現,其實就是ChannelPipeline的操做。

//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
    HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
            ...
    this.options = serverOptionsBuilder.build();
    this.server = new TcpBridgeServer(this.options);
}
複製代碼

這裏的話在DiscardServer Demo中,TCPServer咱們主要針對childHandler的內容的封裝,也就是以下內容:

b.group(bossGroup, workerGroup)
    ...
 .childHandler(new ChannelInitializer<SocketChannel>() { 
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new DiscardServerHandler());
            }
        })
    ...
複製代碼

childHandler到底表明什麼類型,咱們能夠在io.netty.bootstrap.ServerBootstrap找到其相關定義:

//io.netty.bootstrap.ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

    private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
    private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
    private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
    private volatile EventLoopGroup childGroup;
    private volatile ChannelHandler childHandler;

    public ServerBootstrap() { }

    private ServerBootstrap(ServerBootstrap bootstrap) {
        super(bootstrap);
        childGroup = bootstrap.childGroup;
        childHandler = bootstrap.childHandler;
        synchronized (bootstrap.childOptions) {
            childOptions.putAll(bootstrap.childOptions);
        }
        synchronized (bootstrap.childAttrs) {
            childAttrs.putAll(bootstrap.childAttrs);
        }
    }
...
}
複製代碼

由字段定義可知,childHandler表明的是ChannelHandler,顧名思義,是關於Channel的一個處理類,這裏經過查看其定義可知它是用來攔截處理Channel中的I/O事件,並經過Channel下的ChannelPipeline將處理後的事件轉發到其下一個處理程序中。 那這裏如何實現DiscardServer Demo中的b.childHandler(xxx)行爲,經過DiscardServer Demo咱們能夠知道,咱們最關注的實際上是ch.pipeline().addLast(new DiscardServerHandler());中的DiscardServerHandler實現,可是咱們發現,這個核心語句是包含在ChannelInitializer內,其繼承了ChannelInboundHandlerAdapter,它的最頂層的父類接口就是ChannelHandler,也就對應了io.netty.bootstrap.ServerBootstrap在執行b.childHandler(xxx)方法時,其須要傳入ChannelHandler類型的設定。這裏就能夠分拆成兩步來作,一個是b.childHandler(xxx)行爲包裝,一個是此ChannelHandler的定義拓展實現。 那麼,爲了API的通用性,咱們先來看Netty的客戶端的創建的一個Demo(摘自本人RPC項目的一段代碼):

private Channel createNewConChannel() {
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.channel(NioSocketChannel.class)
                .group(new NioEventLoopGroup(1))
                .handler(new ChannelInitializer<Channel>() {
                    protected void initChannel(Channel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
                        .addLast(new RpcDecoder(10 * 1024 * 1024))
                        .addLast(new RpcEncoder())
                        .addLast(new RpcClientHandler())
                        ;
                    }
                });
    try {
        final ChannelFuture f =
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
                  .option(ChannelOption.TCP_NODELAY, true)
                  .connect(ip, port).sync(); // <1>
        f.addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                LOGGER.info("Connect success {} ", f);
            }
        });
        final Channel channel = f.channel();
        channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port));
        return channel;
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return null;
}
複製代碼

Netty的客戶端與服務端的創建進行對比,咱們能夠發現b.childHandler(xxx)與相應的啓動(Server端的話是serverBootstrap.bind(port).sync();,客戶端的話是上述Demo中<1>處的內容)均可以抽取出來做爲一個接口來進行功能的聚合,而後和相應的Server(如TcpServer)或Client(如TcpClient)進行其特有的實現。在Reactor Netty內的話,就是定義一個reactor.ipc.netty.NettyConnector接口,除了作到上述的功能以外,爲了適配響應式的理念,也進行了響應式的設計。即在netty客戶端與服務端在啓動時,能夠保存其狀態,以及提供結束的對外接口方法,這種在響應式中能夠很優雅的實現。接下來,咱們來看此reactor.ipc.netty.NettyConnector的接口定義:

//reactor.ipc.netty.NettyConnector
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> {

Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
BlockingNettyContext start(T handler) {
    return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
}
}
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
    BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());

    facade.installShutdownHook();

    if (onStart != null) {
        onStart.accept(facade);
    }

    facade.getContext()
            .onClose()
            .block();
}
複製代碼

其中,newHandler能夠是咱們上層web處理,裏面包含了INBOUND, OUTBOUND,具體的話就是request,response,後面會專門來涉及到這點。 接着就是提供了一個啓動方法start,其內建立了一個BlockingNettyContext實例,而邏輯的核心就在其構造方法內,就是要將配置好的服務器啓動,整個啓動過程仍是放在newHandler(handler)中,其返回的Mono<? extends NettyContext>中的NettyContext類型元素是管理io.netty.channel.Channel上下文信息的一個對象,這個對象更多的是一些無狀態的操做,並不會對此對象作什麼樣的改變,也是經過對此對象的一個Mono<? extends NettyContext>包裝而後經過block產生訂閱,來作到sync()的效果,經過,經過block產生訂閱後返回的NettyContext對象,可使中斷關閉服務器的操做也能夠作到更優雅:

public class BlockingNettyContext {

	private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class);

	private final NettyContext context;
	private final String description;

	private Duration lifecycleTimeout;
	private Thread shutdownHook;

	public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description) {
		this(contextAsync, description, Duration.ofSeconds(45));
	}

	public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description, Duration lifecycleTimeout) {
		this.description = description;
		this.lifecycleTimeout = lifecycleTimeout;
		this.context = contextAsync
				.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
				.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
				.block();
	}
    ...
    /** * Shut down the {@link NettyContext} and wait for its termination, up to the * {@link #setLifecycleTimeout(Duration) lifecycle timeout}. */
	public void shutdown() {
		if (context.isDisposed()) {
			return;
		}

		removeShutdownHook(); //only applies if not called from the hook's thread

		context.dispose();
		context.onClose()
		       .doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
		       .doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
		       .timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
		       .block();
	}

	...
}
複製代碼

這裏,咱們來接觸下在Reactor中並無深刻接觸的blockXXX()操做,其實整個邏輯仍是比較簡單的,這裏拿reactor.core.publisher.Mono#block()來說,就是獲取並返回這個下發的元素:

//reactor.core.publisher.Mono#block()
@Nullable
public T block() {
    BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
    onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
    return subscriber.blockingGet();
}

//reactor.core.publisher.BlockingMonoSubscriber
final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> {

	@Override
	public void onNext(T t) {
		if (value == null) {
			value = t;
			countDown();
		}
	}

	@Override
	public void onError(Throwable t) {
		if (value == null) {
			error = t;
		}
		countDown();
	}
}
//reactor.core.publisher.BlockingSingleSubscriber
abstract class BlockingSingleSubscriber<T> extends CountDownLatch implements InnerConsumer<T>, Disposable {

T         value;
Throwable error;

Subscription s;

volatile boolean cancelled;

BlockingSingleSubscriber() {
super(1);
}
...
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
    throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}
if (getCount() != 0) {
    try {
        await();
    }
    catch (InterruptedException ex) {
        dispose();
        throw Exceptions.propagate(ex);
    }
}

Throwable e = error;
if (e != null) {
    RuntimeException re = Exceptions.propagate(e);
    //this is ok, as re is always a new non-singleton instance
    re.addSuppressed(new Exception("#block terminated with an error"));
    throw re;
}
return value;
}

...
@Override
public final void onComplete() {
    countDown();
}
}
複製代碼

能夠看到,此處使用的CountDownLatch的一個特性,在元素下發賦值以後,等待數值減1,這裏恰好也就這一個限定(由super(1)定義),解除所調用的blockingGet中的等待,獲得所需的值,這裏,爲了保證block()的語義,其onComplete方法也調用了countDown();,即當上遊爲Mono<Void>時,作到匹配。

相關文章
相關標籤/搜索