Spring5.2.2 WebFlux之WebClient

    Spring WebFlux爲HTTP請求提供了一個reactive的、非阻塞的WebClient 。客戶端有一個功能性的、流暢的API,其中包含用於聲明性合成的反應式類型,請參閱反應式庫。WebFlux客戶端和服務器依賴相同的非阻塞編解碼器對請求和響應內容進行編碼和解碼。javascript

      內部WebClient 委託給HTTP客戶端庫。默認狀況下,它使用Reactor Netty,內置了對Jetty reactive HttpClient的支持,其餘的能夠經過ClientHttpConnector插入。css

一、配置java

     建立WebClient 的最簡單方法是經過靜態工廠方法之一:react

      WebClient.create()nginx

      WebClient.create(String baseUrl)web

  以上方法使用Reactor Netty HttpClient 和expect的默認設置io.projectreactor.netty:reactor-netty在類路徑上。spring

你也可使用WebClient.builder()有更多方式:json

  • uriBuilderFactory:自定義的UriBuilderFactory 用做基本的URL。swift

  • defaultHeader:每一個請求的頭。ruby

  • defaultCookie:每一個請求的Cookies。

  • defaultRequestConsumer 定製每一個請求。

  • filter:每一個請求的客戶端過濾器。

  • exchangeStrategies: HTTP消息讀取器/編寫器自定義。

  • clientConnector:HTTP客戶端庫設置。

如下示例配置HTTP編解碼器:

WebClient client = WebClient.builder() .exchangeStrategies(builder -> { return builder.codecs(codecConfigurer -> { //... }); }) .build();

    一旦構建,WebClient 實例是不可變的。可是,能夠在不影響原始實例的狀況下克隆它並生成修改後的副本,以下例所示:

WebClient client1 = WebClient.builder() .filter(filterA).filter(filterB).build();
WebClient client2 = client1.mutate() .filter(filterC).filter(filterD).build();
// client1 有filterA, filterB
// client2 有 filterA, filterB, filterC, filterD


1.一、最大內存大小

    Spring WebFlux配置了在編解碼器中緩衝內存數據的限制,以免應用程序內存問題。默認狀況下,它被配置爲256KB,若是這對於你的用例來講還不夠,你將看到如下內容:

org.springframework.core.io.buffer.DataBufferLimitException: Exceeded limit on max bytes to buffer

你可使用如下代碼示例在全部默認編解碼器上配置此限制:

WebClient webClient = WebClient.builder() .exchangeStrategies(builder -> builder.codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(2 * 1024 * 1024) ) ) .build();

1.三、Reactor Netty

    要自定義Reactor Netty設置,只需提供預配置的HttpClient

HttpClient httpClient = HttpClient.create().secure(sslSpec -> ...);
WebClient webClient = WebClient.builder() .clientConnector(new ReactorClientHttpConnector(httpClient)) .build();

Resources

    默認狀況下,HttpClient 參與持有的全局reactor.netty.http.HttpResources,包括事件循環線程和鏈接池。這是推薦的模式,由於固定的共享資源是事件循環併發的首選。在此模式下,全局資源將保持活動狀態,直到進程退出。

        若是服務器與進程同步,則一般不須要顯式關閉。可是,若是服務器能夠在進程內啓動或中止(例如,部署爲WAR的Spring MVC應用程序),則可使用globalResources=true(默認值)聲明一個Spring管理的bean,ReactorResourceFactory 類型爲Reactor Netty global resources,以確保在Spring ApplicationContext 關閉時Reactor Netty全局資源,以下例所示:

@Beanpublic ReactorResourceFactory reactorResourceFactory() { return new ReactorResourceFactory();}

    你也能夠選擇不參與全局 Reactor Netty資源。可是,在這種模式下,確保全部Reactor Netty客戶端和服務端實例使用共享資源的負擔就在你身上,以下例所示:

@Beanpublic ReactorResourceFactory resourceFactory() { ReactorResourceFactory factory = new ReactorResourceFactory();    factory.setUseGlobalResources(false); //建立獨立於全局資源的資源。 return factory;}
@Beanpublic WebClient webClient() {
Function<HttpClient, HttpClient> mapper = client -> { // 進一步自定義 };//將ReactorClientHttpConnector構造方法與資源工廠一塊兒使用。 ClientHttpConnector connector = new ReactorClientHttpConnector(resourceFactory(), mapper);  //連接WebClient.Builder. return WebClient.builder().clientConnector(connector).build(); }

1.四、超時

       要配置鏈接超時:

import io.netty.channel.ChannelOption;
HttpClient httpClient = HttpClient.create() .tcpConfiguration(client -> client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000));

要配置讀和/或寫超時值:

import io.netty.handler.timeout.ReadTimeoutHandler;import io.netty.handler.timeout.WriteTimeoutHandler;
HttpClient httpClient = HttpClient.create() .tcpConfiguration(client -> client.doOnConnected(conn -> conn .addHandlerLast(new ReadTimeoutHandler(10)) .addHandlerLast(new WriteTimeoutHandler(10))));

1.五、Jetty

      如下示例顯示如何自定義Jetty HttpClient 設置:

HttpClient httpClient = new HttpClient();httpClient.setCookieStore(...);ClientHttpConnector connector = new JettyClientHttpConnector(httpClient);
WebClient webClient = WebClient.builder().clientConnector(connector).build();

默認狀況下,HttpClient 建立本身的資源(ExecutorByteBufferPoolScheduler),這些資源在進程退出或調用stop()以前保持活動狀態。

      你能夠在Jetty客戶端(和服務端)的多個實例之間共享資源,並經過聲明JettyResourceFactory類型的Spring託管bean來確保在Spring ApplicationContext 關閉時關閉資源,以下例所示:

@Beanpublic JettyResourceFactory resourceFactory() { return new JettyResourceFactory();}
@Beanpublic WebClient webClient() {
HttpClient httpClient = new HttpClient(); // 進一步自定義   //將JettyClientHttpConnector構造方法與資源工廠一塊兒使用。 ClientHttpConnector connector = new JettyClientHttpConnector(httpClient, resourceFactory());    //鏈接WebClient.Builder. return WebClient.builder().clientConnector(connector).build(); }

二、retrieve()

    retrieve()方法是獲取響應體並對其進行解碼的最簡單方法。下面的示例演示如何執行此操做:

WebClient client = WebClient.create("https://example.org");
Mono<Person> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .retrieve() .bodyToMono(Person.class);

你還能夠從響應中得到解碼的對象流,以下例所示:

Flux<Quote> result = client.get() .uri("/quotes").accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(Quote.class);

    默認狀況下,帶有4xx或5xx狀態代碼的響應將致使WebClientResponseException 或其HTTP狀態特定的子類之一,如WebClientResponseException.BadRequestWebClientResponseException.NotFound,以及其餘。也可使用onStatus 方法自定義生成的異常,以下例所示:

Mono<Person> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .retrieve() .onStatus(HttpStatus::is4xxClientError, response -> ...) .onStatus(HttpStatus::is5xxServerError, response -> ...) .bodyToMono(Person.class);

使用onStatus 時,若是預期響應包含內容,則onStatus 回調應使用它。不然,內容將被自動清空,以確保資源被釋放。

三、exchange()

     exchange()方法比retrieve 方法提供更多的控制。如下示例至關於retrieve(),但也提供了對ClientResponse的訪問:

Mono<Person> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .exchange() .flatMap(response -> response.bodyToMono(Person.class));

如此的話,你還能夠建立徹底響應:

Mono<ResponseEntity<Person>> result = client.get() .uri("/persons/{id}", id).accept(MediaType.APPLICATION_JSON) .exchange() .flatMap(response -> response.toEntity(Person.class));

      請注意(與retrieve())不一樣,對於exchange(),4xx和5xx響應沒有自動錯誤信號。必須檢查狀態代碼並決定如何繼續。

      使用exchange()時,必須確保始終使用或釋放主體,即便發生異常(請參見使用DataBuffer)。一般,經過調用ClientResponse 上的bodyTo*toEntity*來將body轉換爲所需類型的對象,但也能夠調用releaseBody()放棄正文內容而不使用它,或調用oBodilessEntity()只獲取狀態和頭(同時丟棄正文)。

     最後,還有bodyToMono(Void.class),只有在不須要響應內容時才應使用。若是響應確實包含內容,則鏈接將關閉,而且不會放回池中,由於它不會處於可重用狀態。

四、Request Body

     請求主體能夠從ReactiveAdapterRegistry處理的任何異步類型進行編碼,如Mono 協同程序Deferred ,以下例所示:

Mono<Person> personMono = ... ;
Mono<Void> result = client.post() .uri("/persons/{id}", id) .contentType(MediaType.APPLICATION_JSON) .body(personMono, Person.class) .retrieve() .bodyToMono(Void.class);

還能夠對對象流進行編碼,以下例所示:

Flux<Person> personFlux = ... ;
Mono<Void> result = client.post() .uri("/persons/{id}", id) .contentType(MediaType.APPLICATION_STREAM_JSON) .body(personFlux, Person.class) .retrieve() .bodyToMono(Void.class);

或者,若是可使用下面的body值,則可使用下面的方法:

Person person = ... ;
Mono<Void> result = client.post() .uri("/persons/{id}", id) .contentType(MediaType.APPLICATION_JSON) .bodyValue(person) .retrieve() .bodyToMono(Void.class);

表單數據

      要發送表單數據,能夠提供MultiValueMap<String, String>做爲主體。注意,內容被FormHttpMessageWriter自動設置爲application/x-www-form-urlencoded。下面的示例演示如何使用MultiValueMap<String, String>

MultiValueMap<String, String> formData = ... ;
Mono<Void> result = client.post() .uri("/path", id) .bodyValue(formData) .retrieve() .bodyToMono(Void.class);

你還可使用BodyInserters以聯機方式提供表單數據,以下例所示:

import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post() .uri("/path", id) .body(fromFormData("k1", "v1").with("k2", "v2")) .retrieve() .bodyToMono(Void.class);

Multipart 數據

        要發送多部分數據,你須要提供MultiValueMap<String, ?>其值能夠是表示部件內容的Object 實例,也能夠是表示部件內容和頭的HttpEntity 實例。MultipartBodyBuilder 提供了一個方便的API來準備多部分請求。下面的示例演示如何建立MultiValueMap<String, ?>:

MultipartBodyBuilder builder = new MultipartBodyBuilder();builder.part("fieldPart", "fieldValue");builder.part("filePart1", new FileSystemResource("...logo.png"));builder.part("jsonPart", new Person("Jason"));builder.part("myPart", part); // 來自服務器請求的部分
MultiValueMap<String, HttpEntity<?>> parts = builder.build();

     在大多數狀況下,你沒必要爲每一個部分指定Content-TypeContent-Type是根據選擇序列化它的HttpMessageWriter 自動肯定的,若是是資源,則根據文件擴展名肯定。若有必要,你能夠經過重載的構建器part 方法之一顯式地爲每一個部件提供要使用的MediaType 

      一旦準備好MultiValueMap ,將其傳遞給WebClient 的最簡單方法是經過body 方法,以下例所示:

MultipartBodyBuilder builder = ...;
Mono<Void> result = client.post() .uri("/path", id) .body(builder.build()) .retrieve() .bodyToMono(Void.class);

      若是MultiValueMap 至少包含一個非字符串值(也就是說,application/x-www-form-urlencoded),則無需將Content-Type設置爲multipart/form-data。使用MultipartBodyBuilder時老是這樣,它確保了HttpEntity 封裝器。

      做爲MultipartBodyBuilder的替代,你還能夠經過內置的BodyInserters提供多部份內容,內聯樣式,以下例所示:

import static org.springframework.web.reactive.function.BodyInserters.*;
Mono<Void> result = client.post() .uri("/path", id) .body(fromMultipartData("fieldPart", "value").with("filePart", resource)) .retrieve() .bodyToMono(Void.class);

五、客戶端Filters

       你能夠經過註冊客戶端篩選器(ExchangeFilterFunctionWebClient.Builder爲了攔截和修改請求,以下例所示:

WebClient client = WebClient.builder() .filter((request, next) -> {
ClientRequest filtered = ClientRequest.from(request) .header("foo", "bar") .build();
return next.exchange(filtered); }) .build();

      這能夠用於交叉關注點,例如身份驗證。如下示例使用篩選器經過靜態工廠方法進行基自己份驗證:

import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = WebClient.builder() .filter(basicAuthentication("user", "password")) .build();

     過濾器將全局應用於每一個請求。要更改特定請求的篩選器行爲,能夠向ClientRequest 添加請求屬性,而後鏈中的全部篩選器均可以訪問這些屬性,以下例所示:

WebClient client = WebClient.builder() .filter((request, next) -> { Optional<Object> usr = request.attribute("myAttribute"); // ... }) .build();
client.get().uri("https://example.org/") .attribute("myAttribute", "...") .retrieve() .bodyToMono(Void.class);
}

      你還能夠複製現有的WebClient、插入新的篩選器或刪除已註冊的篩選器。如下示例在索引0處插入基自己份驗證篩選器:

import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
WebClient client = webClient.mutate() .filters(filterList -> { filterList.add(0, basicAuthentication("user", "password")); }) .build();

六、同步使用

     WebClient 能夠同步方式使用,方法是在末尾阻塞,結果是:

Person person = client.get().uri("/person/{id}", i).retrieve() .bodyToMono(Person.class) .block();
List<Person> persons = client.get().uri("/persons").retrieve() .bodyToFlux(Person.class) .collectList() .block();

      可是,若是須要進行多個調用,則更有效的方法是避免對每一個響應單獨進行阻塞,而是等待組合結果:

Mono<Person> personMono = client.get().uri("/person/{id}", personId) .retrieve().bodyToMono(Person.class);
Mono<List<Hobby>> hobbiesMono = client.get().uri("/person/{id}/hobbies", personId) .retrieve().bodyToFlux(Hobby.class).collectList();
Map<String, Object> data = Mono.zip(personMono, hobbiesMono, (person, hobbies) -> { Map<String, String> map = new LinkedHashMap<>(); map.put("person", person); map.put("hobbies", hobbies); return map; }) .block();

     以上只是一個例子。還有許多其餘模式和運算符能夠用來組合一個響應式管道,該管道能夠進行許多遠程調用,可能有些是嵌套的、相互依賴的,直到最後都不會阻塞。

      對於Flux Mono,你永遠沒必要阻塞Spring MVCSpring WebFlux 控制器。只需從controller方法返回獲得的reactive類型,只需在控制器方法中使用掛起函數或返回Flow 

歡迎關注和轉發Spring中文社區(加微信羣,能夠關注後加我微信):

本文分享自微信公衆號 - Spring中文社區(gh_81d233bb13a4)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索