背壓(backpressure)
當上下游在不一樣的線程中,經過Observable發射,處理,響應數據流時,若是上游發射數據的速度快於下游接收處理數據的速度,這樣對於那些沒來得及處理的數據就會形成積壓,這些數據既不會丟失,也不會被垃圾回收機制回收,而是存放在一個異步緩存池中,若是緩存池中的數據一直得不處處理,越積越多,最後就會形成內存溢出,這即是響應式編程中的背壓(backpressure)問題。
例如,運行如下代碼:java
public void demo1() {編程
Observable .create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { int i = 0; while (true) { i++; e.onNext(i); } } }) .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { Thread.sleep(5000); System.out.println(integer); } }); }
建立一個可觀察對象Observable在Schedulers.newThread()的線程中不斷髮送數據,而觀察者Observer在Schedulers.newThread()的另外一個線程中每隔5秒接收打印一條數據。
運行後,查看內存使用以下:緩存
因爲上游經過Observable發射數據的速度大於下游經過Consumer接收處理數據的速度,並且上下游分別運行在不一樣的線程中,下游對數據的接收處理不會堵塞上游對數據的發射,形成上游數據積壓,內存不斷增長,最後便會致使內存溢出。異步
Flowable
既然在函數響應式編程中會產生背壓(backpressure)問題,那麼在函數響應式編程中就應該有解決方案。
Rxjava2相對於Rxjava1最大的更新就是把對背壓問題的處理邏輯從Observable中抽取出來產生了新的可觀察對象Flowable。ide
在Rxjava2中,Flowable能夠看作是爲了解決背壓問題,在Observable的基礎上優化後的產物,與Observable不處在同一組觀察者模式下,Observable是ObservableSource/Observer這一組觀察者模式中ObservableSource的典型實現,而Flowable是Publisher與Subscriber這一組觀察者模式中Publisher的典型實現。函數
因此在使用Flowable的時候,可觀察對象再也不是Observable,而是Flowable;觀察者再也不是Observer,而是Subscriber。Flowable與Subscriber之間依然經過subscribe()進行關聯。性能
雖然在Rxjava2中,Flowable是在Observable的基礎上優化後的產物,Observable能解決的問題Flowable也都能解決,可是並不表明Flowable能夠徹底取代Observable,在使用的過程當中,並不能拋棄Observable而只用Flowable。優化
因爲基於Flowable發射的數據流,以及對數據加工處理的各操做符都添加了背壓支持,附加了額外的邏輯,其運行效率要比Observable慢得多。線程
只有在須要處理背壓問題時,才須要使用Flowable。code
因爲只有在上下游運行在不一樣的線程中,且上游發射數據的速度大於下游接收處理數據的速度時,纔會產生背壓問題;因此,若是可以肯定:一、上下游運行在同一個線程中,二、上下游工做在不一樣的線程中,可是下游處理數據的速度不慢於上游發射數據的速度,三、上下游工做在不一樣的線程中,可是數據流中只有一條數據則不會產生背壓問題,就沒有必要使用Flowable,以避免影響性能。