本系列爲本人Java編程方法論 響應式解讀系列的Webflux
部分,現分享出來,前置知識Rxjava2 ,Reactor的相關解讀已經錄製分享視頻,併發布在b站,地址以下:java
Rxjava源碼解讀與分享:www.bilibili.com/video/av345…react
Reactor源碼解讀與分享:www.bilibili.com/video/av353…web
NIO源碼解讀相關視頻分享: www.bilibili.com/video/av432…編程
NIO源碼解讀視頻相關配套文章:bootstrap
BIO到NIO源碼的一些事兒之NIO 下 之 Selectorapp
BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 上socket
BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 下
Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 上
Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 下
其中,Rxjava與Reactor做爲本人書中內容將不對外開放,你們感興趣能夠花點時間來觀看視頻,本人對着兩個庫進行了全面完全細緻的解讀,包括其中的設計理念和相關的方法論,也但願你們能夠留言糾正我其中的錯誤。
本書主要針對Netty
服務器來說,因此讀者應具有有關Netty
的基本知識和應用技能。接下來,咱們將對Reactor-netty
從設計到實現的細節一一探究,讓你們真的從中學習到好的封裝設計理念。本書在寫時所參考的最新版本是Reactor-netty 0.7.8.Release
這個版本,但如今已有0.8
版本,並且0.7
與0.8
版本在源碼細節有不小的變更,這點給你們提醒下。我會針對0.8
版本進行全新的解讀。
咱們由上一章可知Tomcat使用Connector
來接收和響應鏈接請求,這裏,對於Netty
來說,若是咱們想讓其作爲一個web
服務器,咱們先來看一個Netty
常見的一個用法(這裏摘自官方文檔一個例子DiscardServer Demo
):
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/** * 丟棄任何進入的數據 */
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 綁定端口,開始接收進來的鏈接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服務器 socket 關閉 。
// 在這個例子中,這不會發生,但你能夠優雅地關閉你的服務器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
複製代碼
NioEventLoopGroup
是用來處理I/O
操做的多線程事件循環器,Netty
提供了許多不一樣的 EventLoopGroup
的實現用來處理不一樣的傳輸。在這個例子中咱們實現了一個服務端的應用,所以會有2個 NioEventLoopGroup
會被使用。第一個常常被叫作BossGroup
,用來接收進來的鏈接。第二個常常被叫作WorkerGroup
,用來處理已經被接收的鏈接,一旦BossGroup
接收到鏈接,就會把鏈接信息註冊到WorkerGroup
上。如何知道多少個線程已經被使用,如何映射到已經建立的 Channel
上都須要依賴於 EventLoopGroup
的實現,而且能夠經過構造函數來配置他們的關係。ServerBootstrap
是一個啓動 NIO
服務的輔助啓動類。你能夠在這個服務中直接使用 Channel
,可是這會是一個複雜的處理過程,在不少狀況下你並不須要這樣作。NioServerSocketChannel
來舉例說明一個新的 Channel
如何接收傳進來的鏈接。Channel
。ChannelInitializer
是一個特殊的處理類,目的是幫助使用者配置一個新的 Channel
。 使用其對應的ChannelPipeline
來加入你的服務邏輯處理(這裏是DiscardServerHandler
)。當你的程序變的複雜時,可能你會增長更多的處理類到 pipline
上,而後提取這些匿名類到最頂層的類上(匿名類即ChannelInitializer
實例咱們能夠將其當作是一個代理模式的設計,相似於Reactor
中Subscriber
的設計實現,一層又一層的包裝,最後獲得一個咱們須要的一個能夠層層處理的Subscriber
)。Channel
實現的配置參數。若是咱們寫一個TCP/IP
的服務端,咱們能夠設置 socket
的參數選項,如tcpNoDelay
和 keepAlive
。請參考 ChannelOption
和ChannelConfig
實現的接口文檔來對ChannelOption
的有一個大概的認識。option()
和 childOption()
:option()
是提供給NioServerSocketChannel
用來接收進來的鏈接。childOption()
是提供給由父管道 ServerChannel
接收到的鏈接,在這個例子中也是 NioServerSocketChannel
。8080
端口。固然如今你能夠屢次調用 bind()
方法(基於不一樣綁定地址)。在看了常見的Netty
的一個服務器建立用法以後,咱們來看Reactor Netty
給咱們提供的Http服務器的一個封裝:reactor.ipc.netty.http.server.HttpServer
。由上面DiscardServer Demo
可知,首先是定義一個服務器,方便設定一些條件對其進行配置,而後啓動的話是調用其run
方法啓動,爲作到更好的可配置性,這裏使用了建造器模式,以便咱們自定義或直接使用默認配置(有些是必須配置,不然會拋出異常,這也是咱們這裏面所設定的內容之一):
//reactor.ipc.netty.http.server.HttpServer.Builder
public static final class Builder {
private String bindAddress = null;
private int port = 8080;
private Supplier<InetSocketAddress> listenAddress = () -> new InetSocketAddress(NetUtil.LOCALHOST, port);
private Consumer<? super HttpServerOptions.Builder> options;
private Builder() {
}
...
public final Builder port(int port) {
this.port = port;
return this;
}
/** * The options for the server, including bind address and port. * * @param options the options for the server, including bind address and port. * @return {@code this} */
public final Builder options(Consumer<? super HttpServerOptions.Builder> options) {
this.options = Objects.requireNonNull(options, "options");
return this;
}
public HttpServer build() {
return new HttpServer(this);
}
}
複製代碼
能夠看到,此處的HttpServer.Builder#options
是一個函數式動做Consumer
,其傳入的參數是HttpServerOptions.Builder
,在HttpServerOptions.Builder
內能夠針對咱們在DiscardServer Demo
中的bootstrap.option
進行一系列的默認配置或者自行調控配置,咱們的對於option
的自定義設置主要仍是針對於ServerBootstrap#childOption
。由於在reactor.ipc.netty.options.ServerOptions.Builder#option
這個方法中,有對它的父類reactor.ipc.netty.options.NettyOptions.Builder#option
進行了相應的重寫:
//reactor.ipc.netty.options.ServerOptions.Builder
public static class Builder<BUILDER extends Builder<BUILDER>> extends NettyOptions.Builder<ServerBootstrap, ServerOptions, BUILDER>{...}
//reactor.ipc.netty.options.ServerOptions.Builder#option
/** * Set a {@link ChannelOption} value for low level connection settings like * SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote * peer. * * @param key the option key * @param <T> the option type * @return {@code this} * @see ServerBootstrap#childOption(ChannelOption, Object) */
@Override
public final <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.childOption(key, value);
return get();
}
//reactor.ipc.netty.options.NettyOptions.Builder#option
/** * Set a {@link ChannelOption} value for low level connection settings like * SO_TIMEOUT or SO_KEEPALIVE. This will apply to each new channel from remote * peer. * * @param key the option key * @param value the option value * @param <T> the option type * @return {@code this} * @see Bootstrap#option(ChannelOption, Object) */
public <T> BUILDER option(ChannelOption<T> key, T value) {
this.bootstrapTemplate.option(key, value);
return get();
}
複製代碼
這是咱們須要注意的地方。而後,咱們再回到reactor.ipc.netty.http.server.HttpServer.Builder
,從其build
這個方法可知,其返回一個HttpServer
實例,經過對所傳入的HttpServer.Builder
實例的options
進行判斷,接着,就是對bootstrap.group
的判斷,由於要使用構造器配置的話,首先得獲取到ServerBootstrap
,因此要先判斷是否有可用EventLoopGroup
,這個咱們是能夠自行設定的,這裏設定一次,bossGroup
和workerGroup
可能都會調用這一個,這點要注意下(loopResources
源碼註釋已經講的很明確了):
//reactor.ipc.netty.http.server.HttpServer.Builder#build
public HttpServer build() {
return new HttpServer(this);
}
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
if (Objects.isNull(builder.options)) {
if (Objects.isNull(builder.bindAddress)) {
serverOptionsBuilder.listenAddress(builder.listenAddress.get());
}
else {
serverOptionsBuilder.host(builder.bindAddress).port(builder.port);
}
}
else {
builder.options.accept(serverOptionsBuilder);
}
if (!serverOptionsBuilder.isLoopAvailable()) {
serverOptionsBuilder.loopResources(HttpResources.get());
}
this.options = serverOptionsBuilder.build();
this.server = new TcpBridgeServer(this.options);
}
//reactor.ipc.netty.options.NettyOptions.Builder
public static abstract class Builder<BOOTSTRAP extends AbstractBootstrap<BOOTSTRAP, ?>, SO extends NettyOptions<BOOTSTRAP, SO>, BUILDER extends Builder<BOOTSTRAP, SO, BUILDER>> implements Supplier<BUILDER> {
...
/** * Provide a shared {@link EventLoopGroup} each Connector handler. * * @param eventLoopGroup an eventLoopGroup to share * @return {@code this} */
public final BUILDER eventLoopGroup(EventLoopGroup eventLoopGroup) {
Objects.requireNonNull(eventLoopGroup, "eventLoopGroup");
return loopResources(preferNative -> eventLoopGroup);
}
/** * Provide an {@link EventLoopGroup} supplier. * Note that server might call it twice for both their selection and io loops. * * @param channelResources a selector accepting native runtime expectation and * returning an eventLoopGroup * @return {@code this} */
public final BUILDER loopResources(LoopResources channelResources) {
this.loopResources = Objects.requireNonNull(channelResources, "loopResources");
return get();
}
public final boolean isLoopAvailable() {
return this.loopResources != null;
}
...
}
複製代碼
能夠看到,這個類是Supplier
實現,其是一個對象提取器,即屬於一個函數式動做對象,適合用於懶加載的場景。這裏的LoopResources
也是一個函數式接口(@FunctionalInterface
),其設計的初衷就是爲io.netty.channel.Channel
的工廠方法服務的:
//reactor.ipc.netty.resources.LoopResources
@FunctionalInterface
public interface LoopResources extends Disposable {
/** * Default worker thread count, fallback to available processor */
int DEFAULT_IO_WORKER_COUNT = Integer.parseInt(System.getProperty(
"reactor.ipc.netty.workerCount",
"" + Math.max(Runtime.getRuntime()
.availableProcessors(), 4)));
/** * Default selector thread count, fallback to -1 (no selector thread) */
int DEFAULT_IO_SELECT_COUNT = Integer.parseInt(System.getProperty(
"reactor.ipc.netty.selectCount",
"" + -1));
/** * Create a simple {@link LoopResources} to provide automatically for {@link * EventLoopGroup} and {@link Channel} factories * * @param prefix the event loop thread name prefix * * @return a new {@link LoopResources} to provide automatically for {@link * EventLoopGroup} and {@link Channel} factories */
static LoopResources create(String prefix) {
return new DefaultLoopResources(prefix, DEFAULT_IO_SELECT_COUNT,
DEFAULT_IO_WORKER_COUNT,
true);
}
static LoopResources create(String prefix, int selectCount, int workerCount, boolean daemon) {
...
return new DefaultLoopResources(prefix, selectCount, workerCount, daemon);
}
...
/** * Callback for server {@link EventLoopGroup} creation. * * @param useNative should use native group if current {@link #preferNative()} is also * true * * @return a new {@link EventLoopGroup} */
EventLoopGroup onServer(boolean useNative);
...
}
複製代碼
咱們在自定義的時候,能夠藉助此類的靜態方法create
方法來快速建立一個LoopResources
實例。另外經過LoopResources
的函數式特性,能夠作到懶加載(將咱們想要實現的業務藏到一個方法內),即,只有在使用的時候纔會生成所須要的對象實例,即在使用reactor.ipc.netty.options.NettyOptions.Builder#loopResources(LoopResources channelResources)
方法時,可進行loopResources(true -> new NioEventLoopGroup())
,即在拿到LoopResources
實例後,只有調用其onServer
方法,才能拿到EventLoopGroup
。這樣就能夠大大節省內存資源,提升性能。
至此,咱們將由netty
的普通使用到HttpServer
的封裝完成經過本章給你們展現出來,目的也是告訴你們這個東西是怎麼來的,基於什麼樣的目的,接下來,咱們會依照這個思路一步步給你們揭開Reactor-netty
的面紗以及其與Spring webflux
是如何對接設計的。