Stream你們應該都很熟悉了,java8中爲全部的集合類都引入了Stream的概念。優雅的鏈式操做,流式處理邏輯,相信用過的人都會愛不釋手。java
每一個數據流都有一個生產者一個消費者。生產者負責產生數據,而消費者負責消費數據。若是是同步系統,生產一個消費一個沒什麼問題。可是若是在異步系統中,就會產生問題。react
由於生產者沒法感知消費者的狀態,不知道消費者究竟是繁忙狀態仍是空閒狀態,是否有能力去消費更多的數據。編程
通常來講數據隊列的長度都是有限的,即便沒有作限制,可是系統的內存也是有限的。當太多的數據沒有被消費的話,會致使內存溢出或者數據得不到即便處理的問題。異步
這時候就須要back-pressure了。編程語言
若是消息接收方消息處理不過來,則能夠通知消息發送方,告知其正在承受壓力,須要下降負載。back-pressure是一種消息反饋機制,從而使系統得以優雅地響應負載, 而不是在負載下崩潰。工具
而reactive stream的目的就是用來管理異步服務的流數據交換,並可以讓接收方自主決定接受數據的頻率。back-pressure就是reactive stream中不可或缺的一部分。測試
更多內容請訪問 www.flydean.com
上面咱們講到了reactive stream的做用,你們應該對reactive stream有了一個基本的瞭解。這裏咱們再給reactive stream作一個定義:spa
reactive stream就是一個異步stream處理的標準,它的特色就是非阻塞的back pressure。code
reactive stream只是一個標準,它定義了實現非阻塞的back pressure的最小區間的接口,方法和協議。blog
因此reactive stream其實有不少種實現的,不只僅是java可使用reactive stream,其餘的編程語言也能夠。
reactive stream只是定義了最基本的功能,各大實如今實現了基本功能的同時能夠自由擴展。
目前reactive stream最新的java版本是1.0.3,是在2019年8月23發佈的。它包含了java API,協議定義文件,測試工具集合和具體的實現例子。
在介紹java版本的reactive stream以前,咱們先回顧一下reactive stream須要作哪些事情:
爲了實現這4個功能,reactive stream定義了4個接口,Publisher,Subscriber,Subscription,Processor。這四個接口其實是一個觀察者模式的實現。接下來咱們詳細來分析一下各個接口的做用和約定。
先看下Publisher的定義:
public interface Publisher<T> { public void subscribe(Subscriber<? super T> s); }
Publisher就是用來生成消息的。它定義了一個subscribe方法,傳入一個Subscriber。這個方法用來將Publisher和Subscriber進行鏈接。
一個Publisher能夠鏈接多個Subscriber。
每次調用subscribe創建鏈接,都會建立一個新的Subscription,Subscription和subscriber是一一對應的。
一個Subscriber只可以subscribe一次Publisher。
若是subscribe失敗或者被拒絕,則會出發Subscriber.onError(Throwable)方法。
先看下Subscriber的定義:
public interface Subscriber<T> { public void onSubscribe(Subscription s); public void onNext(T t); public void onError(Throwable t); public void onComplete(); }
Subscriber就是消息的接收者。
在Publisher和Subscriber創建鏈接的時候會觸發onSubscribe(Subscription s)方法。
當調用Subscription.request(long)方法時,onNext(T t)會被觸發,根據request請求參數的大小,onNext會被觸發一次或者屢次。
在發生異常或者結束時會觸發onError(Throwable t)或者onComplete()方法。
先看下Subscription的定義:
public interface Subscription { public void request(long n); public void cancel(); }
Subscription表明着一對一的Subscriber和Publisher之間的Subscribe關係。
request(long n)意思是向publisher請求多少個events,這會觸發Subscriber.onNext方法。
cancel()則是請求Publisher中止發送信息,並清除資源。
先看下Processor的定義:
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }
Processor便是Subscriber又是Publisher,它表明着一種處理狀態。
在JDK中java.util.concurrent.Flow就是reactive stream語義的一種實現。
Flow從JDK9就開始有了。咱們看下它的結構:
從上圖咱們能夠看到在JDK中Flow是一個final class,而Subscriber,Publisher,Subscription,Processor都是它的內部類。
咱們會在後面的文章中繼續講解JDK中Flow的使用。敬請期待。
reactive stream的出現有效的解決了異步系統中的背壓問題。只不過reactive stream只是一個接口標準或者說是一種協議,具體的實現還須要本身去實現。
本文做者:flydean程序那些事本文連接:http://www.flydean.com/reactive-stream-protocol/
本文來源:flydean的博客
歡迎關注個人公衆號:程序那些事,更多精彩等着您!