本文主要研究一下webclient的超時時間配置java
好比這樣設置react
SslContext sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); ClientHttpConnector httpConnector = new ReactorClientHttpConnector(options -> { options.sslContext(sslContext); options.option(ChannelOption.SO_TIMEOUT, this.applicationConfig.getHttpClientRequestTimeout()); options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.applicationConfig.getHttpClientConnectTimeout()); options.poolResources(PoolResources.fixed("myPool", this.applicationConfig.getHttpClientMaxPoolSize())); }); return WebClient.builder().clientConnector(httpConnector).defaultHeader("Authorization", "xxxx") .baseUrl(this.config.getBaseURL()).build();
這個SO_TIMEOUT只適用於OIO,對於NIO不適用
ReactorClientHttpConnector connector = new ReactorClientHttpConnector( options -> options.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000) .compression(true) .afterNettyContextInit(ctx -> { ctx.addHandlerLast(new ReadTimeoutHandler(5000, TimeUnit.MILLISECONDS)); })); return WebClient.builder() .clientConnector(connector) .build();
這種方式才適用於NIO
客戶端輸出web
21:25:06.013 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x7c18870e, L:/127.0.0.1:49812 - R:localhost/127.0.0.1:8080] FLUSH io.netty.handler.timeout.ReadTimeoutException 21:25:10.948 [reactor-http-nio-4] ERROR reactor.ipc.netty.channel.ChannelOperations - [HttpClient] Error processing connection. Requesting close the channel io.netty.handler.timeout.ReadTimeoutException: null 21:25:10.948 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x7c18870e, L:/127.0.0.1:49812 - R:localhost/127.0.0.1:8080] CLOSE 21:25:10.951 [reactor-http-nio-4] ERROR org.springframework.web.reactive.function.client - onError(io.netty.handler.timeout.ReadTimeoutException) 21:25:10.951 [reactor-http-nio-4] ERROR org.springframework.web.reactive.function.client - io.netty.handler.timeout.ReadTimeoutException: null 21:25:10.953 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080] CLOSE 21:25:10.954 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080] INACTIVE 21:25:10.954 [reactor-http-nio-4] DEBUG reactor.ipc.netty.ReactorNetty - Non Removed handler: [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080], context: ReadTimeoutHandler, pipeline: ChannelHandlerContext(ReadTimeoutHandler, [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080]) 21:25:10.954 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080] USER_EVENT: [Handler Terminated] io.netty.handler.timeout.ReadTimeoutException Suppressed: java.lang.Exception: #block terminated with an error at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:89) at reactor.core.publisher.Mono.block(Mono.java:1162) at com.example.demo.WebClientTest.testSocketTimeout(WebClientTest.java:329) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 21:25:10.955 [reactor-http-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperationsHandler - [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@498959c2 21:25:10.955 [reactor-http-nio-4] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Releasing channel: [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080] 21:25:10.955 [reactor-http-nio-4] DEBUG reactor.ipc.netty.resources.DefaultPoolResources - Released [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080], now 0 active connections 21:25:10.955 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0x7c18870e, L:/127.0.0.1:49812 ! R:localhost/127.0.0.1:8080] UNREGISTERED
@Test public void testBlockTimeout() throws InterruptedException { Mono<ClientResponse> resp = WebClient.builder() .build().get() .uri("http://localhost:8080/timeout/download") .accept(MediaType.IMAGE_PNG) .exchange() .doOnError(e -> { e.printStackTrace(); }); try{ resp.block(Duration.ofSeconds(5)); }catch (Exception e){ e.printStackTrace(); } TimeUnit.SECONDS.sleep(120); }
使用Mono<ClientResponse>的block(Duration)也能起到效果
輸出以下spring
21:22:12.039 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0xca842ea7, L:/127.0.0.1:49649 - R:localhost/127.0.0.1:8080] FLUSH 21:22:16.728 [main] DEBUG org.springframework.web.reactive.function.client - cancel() java.lang.IllegalStateException: Timeout on blocking read for 5000 MILLISECONDS at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:110) at reactor.core.publisher.Mono.block(Mono.java:1186) at com.example.demo.WebClientTest.testBlockTimeout(WebClientTest.java:305) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69) at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234) at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) 21:22:16.733 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0xca842ea7, L:/127.0.0.1:49649 - R:localhost/127.0.0.1:8080] CLOSE 21:22:16.737 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0xca842ea7, L:/127.0.0.1:49649 ! R:localhost/127.0.0.1:8080] INACTIVE 21:22:16.740 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0xca842ea7, L:/127.0.0.1:49649 ! R:localhost/127.0.0.1:8080] USER_EVENT: [Handler Terminated] 21:22:16.741 [reactor-http-nio-4] DEBUG reactor.ipc.netty.channel.ChannelOperationsHandler - [id: 0xca842ea7, L:/127.0.0.1:49649 ! R:localhost/127.0.0.1:8080] Disposing context reactor.ipc.netty.channel.PooledClientContextHandler@e3add1a 21:22:16.741 [reactor-http-nio-4] DEBUG reactor.ipc.netty.channel.PooledClientContextHandler - Releasing channel: [id: 0xca842ea7, L:/127.0.0.1:49649 ! R:localhost/127.0.0.1:8080] 21:22:16.741 [reactor-http-nio-4] DEBUG reactor.ipc.netty.resources.DefaultPoolResources - Released [id: 0xca842ea7, L:/127.0.0.1:49649 ! R:localhost/127.0.0.1:8080], now 0 active connections 21:22:16.742 [reactor-http-nio-4] DEBUG reactor.ipc.netty.http.client.HttpClient - [id: 0xca842ea7, L:/127.0.0.1:49649 ! R:localhost/127.0.0.1:8080] UNREGISTERED
好比app
@GetMapping("/download") public ResponseEntity<Resource> resource() throws IOException, InterruptedException { Resource resource = new ClassPathResource("parallel.png"); TimeUnit.SECONDS.sleep(30); return ResponseEntity.ok() .contentType(MediaType.IMAGE_PNG) .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=parallel.png") .body(resource); }
輸出socket
2018-02-14 21:29:35.568 ERROR 9451 --- [ctor-http-nio-2] c.e.demo.exception.ExceptionHandlers : Broken pipe java.io.IOException: Broken pipe at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_71] at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_71] at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_71] at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[na:1.8.0_71] at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_71] at sun.nio.ch.FileChannelImpl.transferToTrustedChannel(FileChannelImpl.java:515) ~[na:1.8.0_71] at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:611) ~[na:1.8.0_71] at io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:145) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:533) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:358) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at reactor.ipc.netty.channel.ChannelOperationsHandler.doWrite(ChannelOperationsHandler.java:294) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE] at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:463) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE] at reactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:183) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:831) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at reactor.ipc.netty.NettyOutbound.lambda$sendFile$1(NettyOutbound.java:232) ~[reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE] at reactor.core.publisher.MonoUsing.subscribe(MonoUsing.java:81) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatIterable$ConcatIterableSubscriber.onComplete(FluxConcatIterable.java:141) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatIterable.subscribe(FluxConcatIterable.java:60) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:172) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:169) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1092) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1092) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:290) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onSubscribe(MonoIgnoreThen.java:279) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:161) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1649) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1463) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1337) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3008) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) [reactor-core-3.1.2.RELEASE.jar:3.1.2.RELEASE] at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:383) [reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE] at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:359) [reactor-netty-0.7.2.RELEASE.jar:0.7.2.RELEASE] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.17.Final.jar:4.1.17.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) ~[netty-common-4.1.17.Final.jar:4.1.17.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.17.Final.jar:4.1.17.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) ~[netty-common-4.1.17.Final.jar:4.1.17.Final] at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_71]
能夠看到上面兩種方式只是client端本身關閉channel,非reactive的服務端等到要輸出的時候才發現,而後拋出java.io.IOException: Broken pipe
好比oop
@GetMapping("/flux") public Flux<String> fluxReturn() throws InterruptedException { return Flux.just("1","2","3","4") .delayElements(Duration.ofSeconds(5)) .doOnError(e -> { System.out.println("error:"+e.getMessage()); }); }
輸出ui
2018-02-14 21:59:59.956 DEBUG 9836 --- [ctor-http-nio-2] r.i.n.http.server.HttpServerOperations : Increasing pending responses, now 1 2018-02-14 21:59:59.970 DEBUG 9836 --- [ctor-http-nio-2] r.ipc.netty.http.server.HttpServer : [id: 0x85a11fcb, L:/127.0.0.1:8080 - R:/127.0.0.1:51936] READ COMPLETE 2018-02-14 21:59:59.970 DEBUG 9836 --- [ctor-http-nio-2] r.ipc.netty.channel.ChannelOperations : [HttpServer] [id: 0x85a11fcb, L:/127.0.0.1:8080 - R:/127.0.0.1:51936] handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@6df6d7a5 2018-02-14 22:00:04.862 DEBUG 9836 --- [ctor-http-nio-2] r.ipc.netty.http.server.HttpServer : [id: 0x85a11fcb, L:/127.0.0.1:8080 - R:/127.0.0.1:51936] READ COMPLETE 2018-02-14 22:00:04.868 DEBUG 9836 --- [ctor-http-nio-2] r.ipc.netty.http.server.HttpServer : [id: 0x85a11fcb, L:/127.0.0.1:8080 ! R:/127.0.0.1:51936] INACTIVE 2018-02-14 22:00:04.868 DEBUG 9836 --- [ctor-http-nio-2] r.ipc.netty.http.server.HttpServer : [id: 0x85a11fcb, L:/127.0.0.1:8080 ! R:/127.0.0.1:51936] USER_EVENT: [Handler Terminated] 2018-02-14 22:00:04.869 DEBUG 9836 --- [ctor-http-nio-2] r.i.n.channel.ChannelOperationsHandler : [id: 0x85a11fcb, L:/127.0.0.1:8080 ! R:/127.0.0.1:51936] Disposing context reactor.ipc.netty.channel.ServerContextHandler@2a0abf15 2018-02-14 22:00:04.870 DEBUG 9836 --- [ctor-http-nio-2] r.ipc.netty.http.server.HttpServer : [id: 0x85a11fcb, L:/127.0.0.1:8080 ! R:/127.0.0.1:51936] UNREGISTERED
reactive的服務端能夠感知到client的terminate。
SO_TIMEOUT只適用於OIO,對於NIO不適用;使用ReadTimeoutHandler 或者block(Duration)都關閉client端,非reactive的服務端沒法感知Terminated,reactive的服務端能夠感知到Terminatedthis