本文主要研究一下WebClient的LoadBalance支持html
@Configuration public class WebClientConfig { @Autowired private LoadBalancerExchangeFilterFunction lbFunction; @Bean public WebClient webClient(){ return WebClient.builder() .filter(lbFunction) .build(); } }
@Component public class DepartmentService { @Autowired WebClient webClient; public Flux<Department> getDepartmentsByOrgId(Long orgId) { return webClient .get() .uri("http://department-service/organization/{orgId}",orgId) .retrieve() .bodyToFlux(Department.class); } }
@Autowired DepartmentService departmentService; @GetMapping("/departments") public Flux<Department> getDepartmentsById(Long orgId){ return departmentService.getDepartmentsByOrgId(orgId); }
[ { "id": 1, "name": "department 1", "employees": [] } ]
2018-04-29 13:09:15 ERROR [organization-service,,,] Failed to handle request [GET http://localhost:8092/flux/departments?orgId=1] io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.16.205.106:8091 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_151] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_151] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:325) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:633) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151] Caused by: java.net.ConnectException: Connection refused ... 10 common frames omitted
2018-04-29 13:12:08 ERROR [organization-service,,,] Failed to handle request [GET http://localhost:8092/flux/departments?orgId=1] java.lang.IllegalArgumentException: instance can not be null at org.springframework.util.Assert.notNull(Assert.java:193) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE] at org.springframework.cloud.netflix.ribbon.RibbonLoadBalancerClient.reconstructURI(RibbonLoadBalancerClient.java:53) ~[spring-cloud-netflix-ribbon-2.0.0.RC1.jar:2.0.0.RC1] at org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerExchangeFilterFunction.filter(LoadBalancerExchangeFilterFunction.java:34) ~[spring-cloud-commons-2.0.0.RC1.jar:2.0.0.RC1] at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$andThen$1(ExchangeFilterFunction.java:56) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE] at org.springframework.web.reactive.function.client.ExchangeFilterFunction.lambda$apply$2(ExchangeFilterFunction.java:67) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE] at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.exchange(DefaultWebClient.java:320) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE] at org.springframework.web.reactive.function.client.DefaultWebClient$DefaultRequestBodyUriSpec.retrieve(DefaultWebClient.java:367) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE] at com.example.service.DepartmentService.getDepartmentsByOrgId(DepartmentService.java:24) ~[classes/:na] at com.example.controller.FluxDemoController.getDepartmentsById(FluxDemoController.java:23) ~[classes/:na] at sun.reflect.GeneratedMethodAccessor82.invoke(Unknown Source) ~[na:na] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151] at org.springframework.web.reactive.result.method.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:243) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE] at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:138) ~[spring-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefaultIfEmpty.subscribe(MonoDefaultIfEmpty.java:37) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:148) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:271) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:803) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onNext(ScopePassingSpanSubscriber.java:81) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:156) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.request(ScopePassingSpanSubscriber.java:69) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1444) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1318) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onSubscribe(ScopePassingSpanSubscriber.java:63) [spring-cloud-sleuth-core-2.0.0.RC1.jar:2.0.0.RC1] at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:418) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:210) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.FluxLift.subscribe(FluxLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) [reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1073) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:241) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:323) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:1630) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip$ZipInner.onSubscribe(MonoZip.java:312) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoZip.subscribe(MonoZip.java:128) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoSubscriberContext.subscribe(MonoSubscriberContext.java:47) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeek.subscribe(MonoPeek.java:71) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:3080) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:167) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.core.publisher.MonoLift.subscribe(MonoLift.java:46) ~[reactor-core-3.1.6.RELEASE.jar:3.1.6.RELEASE] at reactor.ipc.netty.channel.ChannelOperations.applyHandler(ChannelOperations.java:381) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE] at reactor.ipc.netty.http.server.HttpServerOperations.onHandlerStart(HttpServerOperations.java:397) ~[reactor-netty-0.7.6.RELEASE.jar:0.7.6.RELEASE] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) ~[netty-common-4.1.23.Final.jar:4.1.23.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.23.Final.jar:4.1.23.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) ~[netty-transport-4.1.23.Final.jar:4.1.23.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886) ~[netty-common-4.1.23.Final.jar:4.1.23.Final] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_151]
spring-cloud-commons-2.0.0.RC1-sources.jar!/org/springframework/cloud/client/loadbalancer/reactive/LoadBalancerExchangeFilterFunction.javajava
public class LoadBalancerExchangeFilterFunction implements ExchangeFilterFunction { private final LoadBalancerClient loadBalancerClient; public LoadBalancerExchangeFilterFunction(LoadBalancerClient loadBalancerClient) { this.loadBalancerClient = loadBalancerClient; } @Override public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) { URI originalUrl = request.url(); String serviceId = originalUrl.getHost(); Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUrl); //TODO: reactive lb client ServiceInstance instance = this.loadBalancerClient.choose(serviceId); URI uri = this.loadBalancerClient.reconstructURI(instance, originalUrl); ClientRequest newRequest = ClientRequest.method(request.method(), uri) .headers(headers -> headers.addAll(request.headers())) .cookies(cookies -> cookies.addAll(request.cookies())) .attributes(attributes -> attributes.putAll(request.attributes())) .body(request.body()) .build(); return next.exchange(newRequest); } }
對於webclient來講,在這個filterChain中使用了LoadBalancerExchangeFilterFunction,能夠看到使用了LoadBalancerExchangeFilterFunction的filter方法裏頭,對原來的request進行了包裝,使用loadBalancerClient根據服務ID進行服務發現選取可用的服務地址,而後替換原來的uri,構形成新的request傳遞到下一個filter
spring-webflux-5.0.5.RELEASE-sources.jar!/org/springframework/web/reactive/function/client/DefaultWebClientBuilder.javareact
class DefaultWebClientBuilder implements WebClient.Builder { @Nullable private List<ExchangeFilterFunction> filters; @Override public WebClient.Builder filter(ExchangeFilterFunction filter) { Assert.notNull(filter, "ExchangeFilterFunction must not be null"); initFilters().add(filter); return this; } @Override public WebClient build() { ExchangeFunction exchange = initExchangeFunction(); ExchangeFunction filteredExchange = (this.filters != null ? this.filters.stream() .reduce(ExchangeFilterFunction::andThen) .map(filter -> filter.apply(exchange)) .orElse(exchange) : exchange); return new DefaultWebClient(filteredExchange, initUriBuilderFactory(), unmodifiableCopy(this.defaultHeaders), unmodifiableCopy(this.defaultCookies), new DefaultWebClientBuilder(this)); }
能夠看到調用webClient的filter就會往filters添加,以後在build的時候,利用ExchangeFilterFunction::andThen構造一個ExchangeFunction,傳遞給DefaultWebClient的構造器
spring-webflux-5.0.5.RELEASE-sources.jar!/org/springframework/web/reactive/function/client/ExchangeFilterFunction.javaweb
/** * Apply this filter to the given request and exchange function. * <p>The given {@linkplain ExchangeFunction exchange function} represents the next entity * in the chain, and can be {@linkplain ExchangeFunction#exchange(ClientRequest) invoked} * in order to proceed to the exchange, or not invoked to block the chain. * @param request the request * @param next the next exchange function in the chain * @return the filtered response */ Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next); /** * Return a composed filter function that first applies this filter, and then applies the * {@code after} filter. * @param after the filter to apply after this filter is applied * @return a composed filter that first applies this function and then applies the * {@code after} function */ default ExchangeFilterFunction andThen(ExchangeFilterFunction after) { Assert.notNull(after, "'after' must not be null"); return (request, next) -> { ExchangeFunction nextExchange = exchangeRequest -> after.filter(exchangeRequest, next); return filter(request, nextExchange); }; } /** * Apply this filter to the given exchange function, resulting in a filtered exchange function. * @param exchange the exchange function to filter * @return the filtered exchange function */ default ExchangeFunction apply(ExchangeFunction exchange) { Assert.notNull(exchange, "'exchange' must not be null"); return request -> this.filter(request, exchange); }
ExchangeFilterFunction經過andThen添加filter鏈(
裏頭的lambda是實現filter這個函數式方法
),最後經過apply轉換爲ExchangeFunction
webClient的loadBalanced支持比restTemplate更爲簡潔和清晰,直接使用filter模式,經過loadBalancerClient獲取服務地址,替換uri再傳遞給下一個filter。若是loadBalancerClient沒能得到到服務地址的話,則RibbonLoadBalancerClient.reconstructURI方法會報錯java.lang.IllegalArgumentException: instance can not be null。另外因爲註冊中心的信息可能有延遲,由於也可能存在Connection refused的異常。spring