Java編程方法論-Spring WebFlux篇 Reactor-Netty下HttpServer 的封裝

前言

本系列爲本人Java編程方法論 響應式解讀系列的Webflux部分,現分享出來,前置知識Rxjava2 ,Reactor的相關解讀已經錄製分享視頻,併發布在b站,地址以下:java

Rxjava源碼解讀與分享:www.bilibili.com/video/av345…react

Reactor源碼解讀與分享:www.bilibili.com/video/av353…web

NIO源碼解讀相關視頻分享: www.bilibili.com/video/av432…編程

NIO源碼解讀視頻相關配套文章:bootstrap

BIO到NIO源碼的一些事兒之BIO服務器

BIO到NIO源碼的一些事兒之NIO 上多線程

BIO到NIO源碼的一些事兒之NIO 中併發

BIO到NIO源碼的一些事兒之NIO 下 之 Selectorapp

BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 上socket

BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 下

Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 上

Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 下

其中,Rxjava與Reactor做爲本人書中內容將不對外開放,你們感興趣能夠花點時間來觀看視頻,本人對着兩個庫進行了全面完全細緻的解讀,包括其中的設計理念和相關的方法論,也但願你們能夠留言糾正我其中的錯誤。

HttpServer 的封裝

本書主要針對Netty服務器來說,因此讀者應具有有關Netty的基本知識和應用技能。接下來,咱們將對Reactor-netty從設計到實現的細節一一探究,讓你們真的從中學習到好的封裝設計理念。本書在寫時所參考的最新版本是Reactor-netty 0.7.8.Release這個版本,但如今已有0.8版本,並且0.70.8版本在源碼細節有不小的變更,這點給你們提醒下。我會針對0.8版本進行全新的解讀。

HttpServer 的引入

咱們由上一章可知Tomcat使用Connector來接收和響應鏈接請求,這裏,對於Netty來說,若是咱們想讓其作爲一個web服務器,咱們先來看一個Netty常見的一個用法(這裏摘自官方文檔一個例子DiscardServer Demo):

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/** * 丟棄任何進入的數據 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(new DiscardServerHandler());
                 }
             })
             .option(ChannelOption.SO_BACKLOG, 128)          // (5)
             .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

            // 綁定端口,開始接收進來的鏈接
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 等待服務器 socket 關閉 。
            // 在這個例子中,這不會發生,但你能夠優雅地關閉你的服務器。
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}
複製代碼
  1. NioEventLoopGroup 是用來處理I/O操做的多線程事件循環器,Netty 提供了許多不一樣的 EventLoopGroup 的實現用來處理不一樣的傳輸。在這個例子中咱們實現了一個服務端的應用,所以會有2個 NioEventLoopGroup 會被使用。第一個常常被叫作BossGroup,用來接收進來的鏈接。第二個常常被叫作WorkerGroup,用來處理已經被接收的鏈接,一旦BossGroup接收到鏈接,就會把鏈接信息註冊到WorkerGroup上。如何知道多少個線程已經被使用,如何映射到已經建立的 Channel上都須要依賴於 EventLoopGroup 的實現,而且能夠經過構造函數來配置他們的關係。
  2. ServerBootstrap 是一個啓動 NIO 服務的輔助啓動類。你能夠在這個服務中直接使用 Channel,可是這會是一個複雜的處理過程,在不少狀況下你並不須要這樣作。
  3. 這裏咱們經過指定使用 NioServerSocketChannel來舉例說明一個新的 Channel 如何接收傳進來的鏈接。
  4. 這裏的事件處理類常常會被用來處理一個最近已經接收的 ChannelChannelInitializer 是一個特殊的處理類,目的是幫助使用者配置一個新的 Channel。 使用其對應的ChannelPipeline 來加入你的服務邏輯處理(這裏是DiscardServerHandler)。當你的程序變的複雜時,可能你會增長更多的處理類到 pipline 上,而後提取這些匿名類到最頂層的類上(匿名類即ChannelInitializer實例咱們能夠將其當作是一個代理模式的設計,相似於ReactorSubscriber的設計實現,一層又一層的包裝,最後獲得一個咱們須要的一個能夠層層處理的Subscriber)。
  5. 你能夠設置這裏指定的 Channel 實現的配置參數。若是咱們寫一個TCP/IP 的服務端,咱們能夠設置 socket 的參數選項,如tcpNoDelaykeepAlive。請參考 ChannelOptionChannelConfig實現的接口文檔來對ChannelOption 的有一個大概的認識。
  6. 接着咱們來看 option()childOption() :option() 是提供給NioServerSocketChannel用來接收進來的鏈接。childOption() 是提供給由父管道 ServerChannel 接收到的鏈接,在這個例子中也是 NioServerSocketChannel
  7. 剩下的就是綁定端口而後啓動服務。這裏咱們在服務器上綁定了其 8080 端口。固然如今你能夠屢次調用 bind() 方法(基於不一樣綁定地址)。

針對bootstrap的option的封裝

在看了常見的Netty的一個服務器建立用法以後,咱們來看Reactor Netty給咱們提供的Http服務器的一個封裝:reactor.ipc.netty.http.server.HttpServer。由上面DiscardServer Demo可知,首先是定義一個服務器,方便設定一些條件對其進行配置,而後啓動的話是調用其run方法啓動,爲作到更好的可配置性,這裏使用了建造器模式,以便咱們自定義或直接使用默認配置(有些是必須配置,不然會拋出異常,這也是咱們這裏面所設定的內容之一):

//reactor.ipc.netty.http.server.HttpServer.Builder
public static final class Builder {
    private String bindAddress = null;
    private int port = 8080;
    private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port);
    private Consumer<? super HttpServerOptions.Builder> options;

    private Builder() {
    }
    ...
    public final Builder port(int port) {
        this.port = port;
        return this;
    }

    /** * The options for the server, including bind address and port. * * @param options the options for the server, including bind address and port. * @return {@code this} */
    public final Builder options(Consumer<? super HttpServerOptions.Builder> options) {
        this.options = Objects.requireNonNull(options, "options");
        return this;
    }

    public HttpServer build() {
        return new HttpServer(this);
    }
}
複製代碼

能夠看到,此處的HttpServer.Builder#options是一個函數式動做Consumer,其傳入的參數是HttpServerOptions.Builder,在HttpServerOptions.Builder內能夠針對咱們在DiscardServer Demo中的bootstrap.option進行一系列的默認配置或者自行調控配置,咱們的對於option的自定義設置主要仍是針對於ServerBootstrap#childOption。由於在reactor.ipc.netty.options.ServerOptions.Builder#option這個方法中,有對它的父類reactor.ipc.netty.options.NettyOptions.Builder#option進行了相應的重寫:

//reactor.ipc.netty.options.ServerOptions.Builder
public static class Builder<BUILDER extends Builder<BUILDER>> extends NettyOptions.Builder<ServerBootstrap, ServerOptions, BUILDER>{...}
	
//reactor.ipc.netty.options.ServerOptions.Builder#option
/** * Set a {@link ChannelOption} value for low level connection settings like * SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote * peer. * * @param key the option key * @param <T> the option type * @return {@code this} * @see ServerBootstrap#childOption(ChannelOption, Object) */
@Override
public final <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.childOption(key, value);
return get();
}
//reactor.ipc.netty.options.NettyOptions.Builder#option
/** * Set a {@link ChannelOption} value for low level connection settings like * SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote * peer. * * @param key the option key * @param value the option value * @param <T> the option type * @return {@code this} * @see Bootstrap#option(ChannelOption, Object) */
public <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.option(key, value);
return get();
}
複製代碼

這是咱們須要注意的地方。而後,咱們再回到reactor.ipc.netty.http.server.HttpServer.Builder,從其build這個方法可知,其返回一個HttpServer實例,經過對所傳入的HttpServer.Builder實例的options進行判斷,接着,就是對bootstrap.group的判斷,由於要使用構造器配置的話,首先得獲取到ServerBootstrap,因此要先判斷是否有可用EventLoopGroup,這個咱們是能夠自行設定的,這裏設定一次,bossGroupworkerGroup可能都會調用這一個,這點要注意下(loopResources源碼註釋已經講的很明確了):

//reactor.ipc.netty.http.server.HttpServer.Builder#build
public HttpServer build() {
    return new HttpServer(this);
}
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
    HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
    if (Objects.isNull(builder.options)) {
        if (Objects.isNull(builder.bindAddress)) {
            serverOptionsBuilder.listenAddress(builder.listenAddress.get());
        }
        else {
            serverOptionsBuilder.host(builder.bindAddress).port(builder.port);
        }
    }
    else {
        builder.options.accept(serverOptionsBuilder);
    }
    if (!serverOptionsBuilder.isLoopAvailable()) {
        serverOptionsBuilder.loopResources(HttpResources.get());
    }
    this.options = serverOptionsBuilder.build();
    this.server = new TcpBridgeServer(this.options);
}
//reactor.ipc.netty.options.NettyOptions.Builder
public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>, SO extends NettyOptions<BOOTSTRAP, SO>, BUILDER extends Builder<BOOTSTRAP, SO, BUILDER>> implements Supplier<BUILDER> {
    ...
/** * Provide a shared {@link EventLoopGroup} each Connector handler. * * @param eventLoopGroup an eventLoopGroup to share * @return {@code this} */
public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) {
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
return loopResources(preferNative -> eventLoopGroup);
}
/** * Provide an {@link EventLoopGroup} supplier. * Note that server might call it twice for both their selection and io loops. * * @param channelResources a selector accepting native runtime expectation and * returning an eventLoopGroup * @return {@code this} */
public final BUILDER loopResources(LoopResources channelResources) {
this.loopResources = Objects.requireNonNull(channelResources, "loopResources");
return get();
}

public final boolean isLoopAvailable() {
return this.loopResources != null;
}
...
}
複製代碼

能夠看到,這個類是Supplier實現,其是一個對象提取器,即屬於一個函數式動做對象,適合用於懶加載的場景。這裏的LoopResources也是一個函數式接口(@FunctionalInterface),其設計的初衷就是爲io.netty.channel.Channel的工廠方法服務的:

//reactor.ipc.netty.resources.LoopResources
@FunctionalInterface
public interface LoopResources extends Disposable {

/** * Default worker thread count, fallback to available processor */
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.workerCount",
    "" + Math.max(Runtime.getRuntime()
                .availableProcessors(), 4)));
/** * Default selector thread count, fallback to -1 (no selector thread) */
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
    "reactor.ipc.netty.selectCount",
    "" + -1));
/** * Create a simple {@link LoopResources} to provide automatically for {@link * EventLoopGroup} and {@link Channel} factories * * @param prefix the event loop thread name prefix * * @return a new {@link LoopResources} to provide automatically for {@link * EventLoopGroup} and {@link Channel} factories */
static LoopResources create(String prefix) {
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
        DEFAULT_IO_WORKER_COUNT,
        true);
}
static LoopResources create(String prefix, int selectCount, int workerCount, boolean daemon) {
		...
		return new DefaultLoopResources(prefix, selectCount, workerCount, daemon);
	}
...
/** * Callback for server {@link EventLoopGroup} creation. * * @param useNative should use native group if current {@link #preferNative()} is also * true * * @return a new {@link EventLoopGroup} */
EventLoopGroup onServer(boolean useNative);
...
}
複製代碼

咱們在自定義的時候,能夠藉助此類的靜態方法create方法來快速建立一個LoopResources實例。另外經過LoopResources的函數式特性,能夠作到懶加載(將咱們想要實現的業務藏到一個方法內),即,只有在使用的時候纔會生成所須要的對象實例,即在使用reactor.ipc.netty.options.NettyOptions.Builder#loopResources(LoopResources channelResources)方法時,可進行loopResources(true -> new NioEventLoopGroup()),即在拿到LoopResources實例後,只有調用其onServer方法,才能拿到EventLoopGroup。這樣就能夠大大節省內存資源,提升性能。

小結

至此,咱們將由netty的普通使用到HttpServer的封裝完成經過本章給你們展現出來,目的也是告訴你們這個東西是怎麼來的,基於什麼樣的目的,接下來,咱們會依照這個思路一步步給你們揭開Reactor-netty的面紗以及其與Spring webflux是如何對接設計的。

相關文章
相關標籤/搜索