本系列爲本人Java編程方法論 響應式解讀系列的Webflux
部分,現分享出來,前置知識Rxjava2 ,Reactor的相關解讀已經錄製分享視頻,併發布在b站,地址以下:java
Rxjava源碼解讀與分享:https://www.bilibili.com/video/av34537840react
Reactor源碼解讀與分享:https://www.bilibili.com/video/av35326911web
NIO源碼解讀相關視頻分享: https://www.bilibili.com/video/av43230997編程
NIO源碼解讀視頻相關配套文章:bootstrap
BIO到NIO源碼的一些事兒之NIO 下 之 Selectoride
BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 上oop
BIO到NIO源碼的一些事兒之NIO 下 Buffer解讀 下
Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 上
Java編程方法論-Spring WebFlux篇 01 爲何須要Spring WebFlux 下
Java編程方法論-Spring WebFlux篇 Reactor-Netty下HttpServer 的封裝
其中,Rxjava與Reactor做爲本人書中內容將不對外開放,你們感興趣能夠花點時間來觀看視頻,本人對着兩個庫進行了全面完全細緻的解讀,包括其中的設計理念和相關的方法論,也但願你們能夠留言糾正我其中的錯誤。
本書主要針對Netty
服務器來說,因此讀者應具有有關Netty
的基本知識和應用技能。接下來,咱們將對Reactor-netty
從設計到實現的細節一一探究,讓你們真的從中學習到好的封裝設計理念。本書在寫時所參考的最新版本是Reactor-netty 0.7.8.Release
這個版本,但如今已有0.8
版本,並且0.7
與0.8
版本在源碼細節有不小的變更,這點給你們提醒下。我會針對0.8
版本進行全新的解讀並在將來出版的書中進行展現。
這裏,咱們會首先解讀Reactor Netty
是如何針對Netty
中Bootstrap
的ChildHandler
進行封裝以及響應式拓展等一些細節探究。接着,咱們會涉及到HttpHandler
的引入,以此來對接咱們上層web服務。
由於這是咱們切入自定義邏輯的地方,因此,咱們首先來關注下與其相關的ChannelHandler
,以及前文並未提到的,服務器究竟是如何啓動以及如何經過響應式來作到優雅的關閉,首先咱們會接觸關閉服務器的設定。
咱們再回到reactor.ipc.netty.http.server.HttpServer#HttpServer
這個構造器中,由上一章咱們知道請求是HTTP
層面的(應用層),必須依賴於TCP
的鏈接實現,因此這裏就要有一個TCPServer
的實現,其實就是Channel
上Pipeline
的操做。
//reactor.ipc.netty.http.server.HttpServer#HttpServer
private HttpServer(HttpServer.Builder builder) {
HttpServerOptions.Builder serverOptionsBuilder = HttpServerOptions.builder();
...
this.options = serverOptionsBuilder.build();
this.server = new TcpBridgeServer(this.options);
}
複製代碼
這裏的話在DiscardServer Demo
中,TCPServer
咱們主要針對childHandler
的內容的封裝,也就是以下內容:
b.group(bossGroup, workerGroup)
...
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
...
複製代碼
那childHandler
到底表明什麼類型,咱們能夠在io.netty.bootstrap.ServerBootstrap
找到其相關定義:
//io.netty.bootstrap.ServerBootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
public ServerBootstrap() { }
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
synchronized (bootstrap.childOptions) {
childOptions.putAll(bootstrap.childOptions);
}
synchronized (bootstrap.childAttrs) {
childAttrs.putAll(bootstrap.childAttrs);
}
}
...
}
複製代碼
由字段定義可知,childHandler
表明的是ChannelHandler
,顧名思義,是關於Channel
的一個處理類,這裏經過查看其定義可知它是用來攔截處理Channel
中的I/O
事件,並經過Channel
下的ChannelPipeline
將處理後的事件轉發到其下一個處理程序中。 那這裏如何實現DiscardServer Demo
中的b.childHandler(xxx)
行爲,經過DiscardServer Demo
咱們能夠知道,咱們最關注的實際上是ch.pipeline().addLast(new DiscardServerHandler());
中的DiscardServerHandler
實現,可是咱們發現,這個核心語句是包含在ChannelInitializer
內,其繼承了ChannelInboundHandlerAdapter
,它的最頂層的父類接口就是ChannelHandler
,也就對應了io.netty.bootstrap.ServerBootstrap
在執行b.childHandler(xxx)
方法時,其須要傳入ChannelHandler
類型的設定。這裏就能夠分拆成兩步來作,一個是b.childHandler(xxx)
行爲包裝,一個是此ChannelHandler
的定義拓展實現。 那麼,爲了API
的通用性,咱們先來看Netty的客戶端的創建的一個Demo(摘自本人RPC項目的一段代碼):
private Channel createNewConChannel() {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class)
.group(new NioEventLoopGroup(1))
.handler(new ChannelInitializer<Channel>() {
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new RpcDecoder(10 * 1024 * 1024))
.addLast(new RpcEncoder())
.addLast(new RpcClientHandler())
;
}
});
try {
final ChannelFuture f =
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
.option(ChannelOption.TCP_NODELAY, true)
.connect(ip, port).sync(); // <1>
f.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
LOGGER.info("Connect success {} ", f);
}
});
final Channel channel = f.channel();
channel.closeFuture().addListener((ChannelFutureListener) future -> LOGGER.info("Channel Close {} {}", ip, port));
return channel;
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
複製代碼
將Netty
的客戶端與服務端的創建進行對比,咱們能夠發現b.childHandler(xxx)
與相應的啓動(Server
端的話是serverBootstrap.bind(port).sync();
,客戶端的話是上述Demo中<1>
處的內容)均可以抽取出來做爲一個接口來進行功能的聚合,而後和相應的Server
(如TcpServer
)或Client
(如TcpClient
)進行其特有的實現。在Reactor Netty
內的話,就是定義一個reactor.ipc.netty.NettyConnector
接口,除了作到上述的功能以外,爲了適配響應式的理念,也進行了響應式的設計。即在netty
客戶端與服務端在啓動時,能夠保存其狀態,以及提供結束的對外接口方法,這種在響應式中能夠很優雅的實現。接下來,咱們來看此reactor.ipc.netty.NettyConnector
的接口定義:
//reactor.ipc.netty.NettyConnector
public interface NettyConnector<INBOUND extends NettyInbound, OUTBOUND extends NettyOutbound> {
Mono<? extends NettyContext> newHandler(BiFunction<? super INBOUND, ? super OUTBOUND, ? extends Publisher<Void>> ioHandler);
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
BlockingNettyContext start(T handler) {
return new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
}
}
...
default <T extends BiFunction<INBOUND, OUTBOUND, ? extends Publisher<Void>>>
void startAndAwait(T handler, @Nullable Consumer<BlockingNettyContext> onStart) {
BlockingNettyContext facade = new BlockingNettyContext(newHandler(handler), getClass().getSimpleName());
facade.installShutdownHook();
if (onStart != null) {
onStart.accept(facade);
}
facade.getContext()
.onClose()
.block();
}
複製代碼
其中,newHandler
能夠是咱們上層web處理,裏面包含了INBOUND, OUTBOUND
,具體的話就是request,response
,後面會專門來涉及到這點。 接着就是提供了一個啓動方法start
,其內建立了一個BlockingNettyContext
實例,而邏輯的核心就在其構造方法內,就是要將配置好的服務器啓動,整個啓動過程仍是放在newHandler(handler)
中,其返回的Mono<? extends NettyContext>
中的NettyContext
類型元素是管理io.netty.channel.Channel
上下文信息的一個對象,這個對象更多的是一些無狀態的操做,並不會對此對象作什麼樣的改變,也是經過對此對象的一個Mono<? extends NettyContext>
包裝而後經過block
產生訂閱,來作到sync()
的效果,經過,經過block
產生訂閱後返回的NettyContext
對象,可使中斷關閉服務器的操做也能夠作到更優雅:
public class BlockingNettyContext {
private static final Logger LOG = Loggers.getLogger(BlockingNettyContext.class);
private final NettyContext context;
private final String description;
private Duration lifecycleTimeout;
private Thread shutdownHook;
public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description) {
this(contextAsync, description, Duration.ofSeconds(45));
}
public BlockingNettyContext(Mono<? extends NettyContext> contextAsync, String description, Duration lifecycleTimeout) {
this.description = description;
this.lifecycleTimeout = lifecycleTimeout;
this.context = contextAsync
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be started within " + lifecycleTimeout.toMillis() + "ms")))
.doOnNext(ctx -> LOG.info("Started {} on {}", description, ctx.address()))
.block();
}
...
/** * Shut down the {@link NettyContext} and wait for its termination, up to the * {@link #setLifecycleTimeout(Duration) lifecycle timeout}. */
public void shutdown() {
if (context.isDisposed()) {
return;
}
removeShutdownHook(); //only applies if not called from the hook's thread
context.dispose();
context.onClose()
.doOnError(e -> LOG.error("Stopped {} on {} with an error {}", description, context.address(), e))
.doOnTerminate(() -> LOG.info("Stopped {} on {}", description, context.address()))
.timeout(lifecycleTimeout, Mono.error(new TimeoutException(description + " couldn't be stopped within " + lifecycleTimeout.toMillis() + "ms")))
.block();
}
...
}
複製代碼
這裏,咱們來接觸下在Reactor中並無深刻接觸的blockXXX()
操做,其實整個邏輯仍是比較簡單的,這裏拿reactor.core.publisher.Mono#block()
來說,就是獲取並返回這個下發的元素:
//reactor.core.publisher.Mono#block()
@Nullable
public T block() {
BlockingMonoSubscriber<T> subscriber = new BlockingMonoSubscriber<>();
onLastAssembly(this).subscribe(Operators.toCoreSubscriber(subscriber));
return subscriber.blockingGet();
}
//reactor.core.publisher.BlockingMonoSubscriber
final class BlockingMonoSubscriber<T> extends BlockingSingleSubscriber<T> {
@Override
public void onNext(T t) {
if (value == null) {
value = t;
countDown();
}
}
@Override
public void onError(Throwable t) {
if (value == null) {
error = t;
}
countDown();
}
}
//reactor.core.publisher.BlockingSingleSubscriber
abstract class BlockingSingleSubscriber<T> extends CountDownLatch implements InnerConsumer<T>, Disposable {
T value;
Throwable error;
Subscription s;
volatile boolean cancelled;
BlockingSingleSubscriber() {
super(1);
}
...
@Nullable
final T blockingGet() {
if (Schedulers.isInNonBlockingThread()) {
throw new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread " + Thread.currentThread().getName());
}
if (getCount() != 0) {
try {
await();
}
catch (InterruptedException ex) {
dispose();
throw Exceptions.propagate(ex);
}
}
Throwable e = error;
if (e != null) {
RuntimeException re = Exceptions.propagate(e);
//this is ok, as re is always a new non-singleton instance
re.addSuppressed(new Exception("#block terminated with an error"));
throw re;
}
return value;
}
...
@Override
public final void onComplete() {
countDown();
}
}
複製代碼
能夠看到,此處使用的CountDownLatch
的一個特性,在元素下發賦值以後,等待數值減1,這裏恰好也就這一個限定(由super(1)
定義),解除所調用的blockingGet
中的等待,獲得所需的值,這裏,爲了保證block()
的語義,其onComplete
方法也調用了countDown();
,即當上遊爲Mono<Void>
時,作到匹配。