在JDK11的新特性:新的HTTP API中,咱們介紹了經過新的HTTP API,咱們能夠發送同步或者異步的請求,並得到的返回的結果。java
今天咱們想探討一下這些同步或者異步請求和響應和reactive streams的關係。react
更多內容請訪問 www.flydean.com
reactive streams的介紹你們能夠參考reactive stream協議詳解,使用reactive streams的目的就是爲了解決發送者和消費者之間的通訊問題,發送者不會發送超出消費者能力的信息。git
咱們再回顧一下reactive streams中的幾個關鍵概念:github
更進一步,若是咱們想要本身實現一個reactive streams,咱們須要作這些事情:api
還記得上篇文章咱們講HTTP API新特性的時候,咱們使用的例子嗎?app
例子中,咱們使用了一個HttpRequest.BodyPublisher,用來構建Post請求,而BodyPublisher就是一個Flow.Publisher:異步
public interface BodyPublisher extends Flow.Publisher<ByteBuffer>
也就是說從BodyPublisher開始,就已經在使用reactive streams了。ide
爲了可以更好的瞭解reactive streams的工做原理,咱們建立幾個wrapper類將Publisher,Subscriber,Subscription包裝起來,輸出相應的日誌。函數
代碼有點多咱們就不一一列出來了,這裏只列一個CustBodyPublisher的具體實現:post
public class CustBodyPublisher implements HttpRequest.BodyPublisher { private final HttpRequest.BodyPublisher bodyPublisher; public CustBodyPublisher(HttpRequest.BodyPublisher bodyPublisher){ this.bodyPublisher=bodyPublisher; } @Override public long contentLength() { long contentLength=bodyPublisher.contentLength(); log.info("contentLength:{}",contentLength); return contentLength; } @Override public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) { log.info("CustBodyPublisher subscribe {}",subscriber); bodyPublisher.subscribe(new CustSubscriber(subscriber)); } }
wrapper類很簡單,經過構造函數傳入要wrapper的類,而後在相應的方法中調用實際wrapper類的方法。
最後,咱們將以前使用的調用HTTP API的例子改造一下:
public void testCustPost() throws IOException, InterruptedException { HttpClient client = HttpClient.newBuilder().build(); HttpRequest.BodyPublisher requestBody = HttpRequest.BodyPublishers .ofString("{ 我是body }"); CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody); HttpRequest postRequest = HttpRequest.newBuilder() .POST(custBodyPublisher) .uri(URI.create("http://www.flydean.com")) .build(); HttpResponse<String> response = client .send(postRequest, HttpResponse.BodyHandlers.ofString()); log.info("response {}",response); }
注意這裏CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody),咱們建立了一個新的wrapper類。
運行它,觀察輸出結果:
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - contentLength:14 [HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - CustBodyPublisher subscribe jdk.internal.net.http.Http1Request$FixedContentSubscriber@672776b6 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onSubscribe jdk.internal.net.http.PullPublisher$Subscription@580ce038 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onNext length 14 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onComplete [main] INFO com.flydean.ReactiveHttpUsage - response (POST http://www.flydean.com) 200
能夠看到reactive stream的具體工做流程。首先建立了CustBodyPublisher,而後調用了subscribe方法。
接着CustSubscriber調用onSubscribe建立了Subscription。
每次CustSubscription的request方法都會致使CustSubscriber的onNext方法被調用。
最後當CustSubscription再次請求無結果的時候,CustSubscriber調用onComplete方法結束整個流程。
注意,上面的例子中,咱們wrapper調用的是BodyPublishers.ofString,其實BodyPublishers中內置了多種BodyPublisher的實現。感興趣的朋友能夠自行探索。
本文講解了新的HTTP API中reactive Streams的使用。
本文的例子[https://github.com/ddean2009/
learn-java-base-9-to-20](https://github.com/ddean2009/...
本文做者:flydean程序那些事本文連接:http://www.flydean.com/jdk11-http-api-reactive-streams/
本文來源:flydean的博客
歡迎關注個人公衆號:程序那些事,更多精彩等着您!