JDK11的新特性:HTTP API和reactive streams

簡介

JDK11的新特性:新的HTTP API中,咱們介紹了經過新的HTTP API,咱們能夠發送同步或者異步的請求,並得到的返回的結果。java

今天咱們想探討一下這些同步或者異步請求和響應和reactive streams的關係。react

更多內容請訪問 www.flydean.com

怎麼在java中使用reactive streams

reactive streams的介紹你們能夠參考reactive stream協議詳解,使用reactive streams的目的就是爲了解決發送者和消費者之間的通訊問題,發送者不會發送超出消費者能力的信息。git

咱們再回顧一下reactive streams中的幾個關鍵概念:github

  • Publisher 負責產生消息或者事件,並提供了一個subscribed接口來和Subscriber進行鏈接。
  • Subscriber 用來subscribe一個Publisher,並提供了onNext方法來處理新的消息,onError來處理異常,onComplete提供給Publisher調用來結束監聽。
  • Subscription 負責鏈接Publisher和Subscriber,能夠用來請求消息或者取消收聽。

更進一步,若是咱們想要本身實現一個reactive streams,咱們須要作這些事情:api

  1. 建立Publisher和Subscriber
  • 建立Publisher和Subscriber。
  • 調用Publisher.subscribe(Subscriber)創建Publisher和Subscriber之間的鏈接。
  • Publisher建立一個Subscription,並調用Subscriber.onSubscription(Subscription)方法。
  • Subscriber將Subscription保存起來,供後面使用。
  1. 發送和接收信息
  • Subscriber調用Subscription.request(n) 方法請求n個消息。
  • Publisher調用Subscriber.onNext(item) 將請求的消息發送給Subscriber。
  • 按照須要重複上訴過程。
  1. 取消或者結束
  • Publisher調用Subscriber.OnError(err) 或者 Subscriber.onComplete()方法。
  • Subscriber調用Subscription.cancel()方法。

POST請求的例子

還記得上篇文章咱們講HTTP API新特性的時候,咱們使用的例子嗎?app

例子中,咱們使用了一個HttpRequest.BodyPublisher,用來構建Post請求,而BodyPublisher就是一個Flow.Publisher:異步

public interface BodyPublisher extends Flow.Publisher<ByteBuffer>

也就是說從BodyPublisher開始,就已經在使用reactive streams了。ide

爲了可以更好的瞭解reactive streams的工做原理,咱們建立幾個wrapper類將Publisher,Subscriber,Subscription包裝起來,輸出相應的日誌。函數

代碼有點多咱們就不一一列出來了,這裏只列一個CustBodyPublisher的具體實現:post

public class CustBodyPublisher implements HttpRequest.BodyPublisher {

    private final HttpRequest.BodyPublisher bodyPublisher;

    public CustBodyPublisher(HttpRequest.BodyPublisher bodyPublisher){
        this.bodyPublisher=bodyPublisher;
    }
    @Override
    public long contentLength() {
        long contentLength=bodyPublisher.contentLength();
        log.info("contentLength:{}",contentLength);
        return contentLength;
    }

    @Override
    public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
        log.info("CustBodyPublisher subscribe {}",subscriber);
        bodyPublisher.subscribe(new CustSubscriber(subscriber));
    }
}

wrapper類很簡單,經過構造函數傳入要wrapper的類,而後在相應的方法中調用實際wrapper類的方法。

最後,咱們將以前使用的調用HTTP API的例子改造一下:

public void testCustPost() throws IOException, InterruptedException {
        HttpClient client = HttpClient.newBuilder().build();

        HttpRequest.BodyPublisher requestBody = HttpRequest.BodyPublishers
                .ofString("{ 我是body }");
        CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody);
        HttpRequest postRequest = HttpRequest.newBuilder()
                .POST(custBodyPublisher)
                .uri(URI.create("http://www.flydean.com"))
                .build();

        HttpResponse<String> response = client
                .send(postRequest, HttpResponse.BodyHandlers.ofString());

        log.info("response {}",response);
    }

注意這裏CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody),咱們建立了一個新的wrapper類。

運行它,觀察輸出結果:

[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - contentLength:14
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - CustBodyPublisher subscribe jdk.internal.net.http.Http1Request$FixedContentSubscriber@672776b6
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onSubscribe jdk.internal.net.http.PullPublisher$Subscription@580ce038
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onNext length 14
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1
[HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onComplete
[main] INFO com.flydean.ReactiveHttpUsage - response (POST http://www.flydean.com) 200

能夠看到reactive stream的具體工做流程。首先建立了CustBodyPublisher,而後調用了subscribe方法。

接着CustSubscriber調用onSubscribe建立了Subscription。

每次CustSubscription的request方法都會致使CustSubscriber的onNext方法被調用。

最後當CustSubscription再次請求無結果的時候,CustSubscriber調用onComplete方法結束整個流程。

注意,上面的例子中,咱們wrapper調用的是BodyPublishers.ofString,其實BodyPublishers中內置了多種BodyPublisher的實現。感興趣的朋友能夠自行探索。

總結

本文講解了新的HTTP API中reactive Streams的使用。

本文的例子[https://github.com/ddean2009/
learn-java-base-9-to-20](https://github.com/ddean2009/...

本文做者:flydean程序那些事

本文連接:http://www.flydean.com/jdk11-http-api-reactive-streams/

本文來源:flydean的博客

歡迎關注個人公衆號:程序那些事,更多精彩等着您!

相關文章
相關標籤/搜索