Rxjava1升級Rxjava2過程當中的一些知識總結

rxjava1升級rxjava2過程當中的一些知識總結

背壓(Backpressure)

什麼是背壓(Backpressure)

當被觀察者(Observable)觀察者(Subscriber)在不一樣的線程中工做,而且Observable生產事件的速度比Subscriber消費事件的速度快時,就是將事件積壓在隊列(queue)裏面,當queue裏大小超過必定大小時,就會拋出MissingBackpressureExceptionjava

好比下面的代碼,Observable每1毫秒生產一個事件,但Subscriber每一秒才消費一個事件,因此一運行就會立馬報MissingBackpressureException錯誤git

Observable.interval(1, TimeUnit.MILLISECONDS)
    .observeOn(Schedulers.newThread())
    .subscribe({
        Thread.sleep(1000)
        System.out.println("onNext==$it")
    })

複製代碼

那queue大小超過多少,會報錯呢??,我們直接看rxjava1的源碼github

static {
    int defaultSize = 128;

    // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
    if (PlatformDependent.isAndroid()) {
        defaultSize = 16;
    }

    // possible system property for overriding
    String sizeFromProperty = System.getProperty("rx.ring-buffer.size"); // also see IndexedRingBuffer
    if (sizeFromProperty != null) {
        try {
            defaultSize = Integer.parseInt(sizeFromProperty);
        } catch (NumberFormatException e) {
            System.err.println("Failed to set 'rx.buffer.size' with value " + sizeFromProperty + " => " + e.getMessage()); // NOPMD
        }
    }

    SIZE = defaultSize;
}
複製代碼

可見若是發現是Android的話,size是16;若是是其它的話就是128緩存

如何改變默認的size呢?

咱們先來看一下observeOn的幾個重載方法bash

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, RxRingBuffer.SIZE);
}
    
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
    return observeOn(scheduler, false, bufferSize);
}
複製代碼

可見 能夠在指定消費線程的時候指定隊列大小;而咱們經常使用的是第一個方法,因此默認的大小就是上面說的16, 咱們能夠使用第二個重載方法指定隊列大小ide

rxjava2的背壓策略BackpressureStrategy

先看看rxjava2的默認隊列大小,源碼以下ui

public abstract class Flowable<T> implements Publisher<T> {
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
    
    @CheckReturnValue
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Flowable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public static int bufferSize() {
        return BUFFER_SIZE;
    }
    
    ...
}

複製代碼

Observable 的源碼以下:spa

public abstract class Observable<T> implements ObservableSource<T> {

	@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public static int bufferSize() {
        return Flowable.bufferSize();
    }
	...
}
複製代碼

可見rxjava2FlowableObservable默認大小都是128(rxjava是16),但rxjava2新增的Flowable支持背壓策略BackpressureStrategy, 而Observable不支持線程

背壓策略BackpressureStrategy

  • MISSING: 緩存大小固定, 下游消費跟不上上游生產事件時, 則會報MissingBackpressureException
  • ERROR: 緩存大小固定, 下游消費跟不上上游生產事件,則會報MissingBackpressureException
  • BUFFER: 緩存無限大,上游不斷生產事件,下游不斷消費事件,直到程序崩潰
  • DROP: 緩存大小固定,若是下游消費跟不上上游生產事件,而且緩存已滿時,若是再生產事件,則會丟掉這次生產的事件
  • LATEST: 緩存大小固定,若是下游消費跟不上上游生產事件,則會替換掉最後的事件,保持最新的事件會被消費

MISSING 和 ERROR 我的感受沒什麼差異,感受效果是同樣的code

順便說一句 retrofit2使用的是BackpressureStrategy.LATEST策略

RxJava2的Single、Completable、Maybe

Single 只發射單個數據或錯誤事件

Single 只有 onSuccessonError 事件,onSuccess相似於ObservableFlowableonNext()方法, 用來發射生產的數據,Single只能發射一個數據,當發射多個數據時,則無效;好比

Single.create(SingleOnSubscribe<Int> {
    try {
        it.onSuccess(1)
        it.onSuccess(2)
    } catch (e: Exception) {
        it.onError(e)
    }
}).subscribe({
    System.out.println("==onNext: $it")
}, {
    System.out.println("==onError==")
})

複製代碼

打印結果以下:

==onNext: 1
複製代碼

可見並無打印 「2」 這個數據

注意: 若是在onSuccess回調中報錯,則不會走onError, 而是直接crash

Completable

Completable 只有onCompleteonError事件,它不發射任何數據給觀察者,只告訴觀察者完成了(onComplete)或出錯了(onError)

Completable.create(CompletableOnSubscribe {
    try {
        //do something success
        it.onComplete()
    } catch (e: Exception) {
        it.onError(e)
    }
}).subscribe({
    System.out.println("==onComplete==")
}, {
    System.out.println("==onError==")
})

複製代碼

注意: 若是在onComplete回調中報錯,則不會走onError, 而是直接crash

Maybe

Maybe 能夠當作是SingleCompletable的結合,它有onSuccessonCompleteonError事件,但相互都是互斥的;同時Maybe也只能發射一個數據

若是有數據,則調用onSuccess發射數據給觀察者

若是沒有數據,也沒有出錯,則調用onComplete,告訴觀察者已完成

若是出錯,則調用onError,告訴觀察者出錯了

Maybe.create(MaybeOnSubscribe<Int> {
    try {
        // 若是有數據
        it.onSuccess(1)
        // 若是不關心數據
        // it.onComplete() // 即便你了這個方法,也不會走onComplete回調,由於已經調了onSuccess方法
    } catch (e: Exception) {
        it.onError(e)
    }
}).subscribe({
    System.out.println("==onSuccess: $it")
}, {
    System.out.println("==onError==")
}, {
    System.out.println("==onComplete==")
})

複製代碼

Maybe跟Single同樣,即便發射了多個數據,後面的數據也不會處理

注意: 若是在onSuccess 或 onComplete回調中報錯,則不會走onError, 而是直接crash

如何處理Rxjava2不支持null的狀況

在使用rxjava發射數據時,若是發射的數據時null,則會直接走onError事件

FlowableEmitter的onNext實現源碼以下

@Override
public void onNext(T t) {
    ...
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    ...
}

複製代碼

若是必定要發射null數據,但又不想走onError事件,由於它不屬於異常,該怎麼辦呢??

發射Optional包裝過的數據

1. 在使用create方法建立Flowable的時候指定Optional

Flowable.create(FlowableOnSubscribe<Optional<Int>> {
    try {
        it.onNext(Optional.ofNullable(1))
        it.onNext(Optional.ofNullable(null))
        it.onComplete()
    } catch (e: Exception) {
        it.onError(e)
    }
}, BackpressureStrategy.LATEST)
        .subscribe({
            if (it.isPresent) {
                System.out.println("==Next: ${it.get()}")
            } else {
                System.out.println("==Next: null")
            }
        }, {
            System.out.println("==onError==")
        }, {
            System.out.println("==onComplete==")
        })
複製代碼

2. 使用rxjava2其它操做符的時候指定Optional

好比map操做符

Flowable.just(1)
    .map {
        // 好比根據id,去找user, 可能返回null,但又不屬於onError
        Optional.ofNullable(findUserById(it))
    }
    .subscribe({
        if (it.isPresent) {
            System.out.println("==Next: ${it.get()}")
        } else {
            System.out.println("==Next: null")
        }
    }, {
        System.out.println("==onError==")
    }, {
        System.out.println("==onComplete==")
    })

複製代碼

注意:在使用Optional的使用不要直接調get()方法,若是是null,調get()方法會報異常;而是先要用isPresent方法判斷

有的時候對於觀察者回調中,老是使用isPresent去判斷,感受有點不方便,因爲是對於rxjava1升級到rxjava2的時候改動比較大,由於rxjava1的onNext容許返回null,這個時候咱們能夠封裝一個BaseSubscribe

abstract class BaseSubscribe<T> : DisposableSubscriber<Optional<T>>() {

    override fun onComplete() {

    }

    override fun onNext(t: Optional<T>) {
        if (t.isPresent) {
            onOpNext(t.get())
        } else {
            onOpNext(null)
        }
    }

    abstract fun onOpNext(t: T?)

    override fun onError(t: Throwable) {

    }
}

複製代碼

而後調用subscribe方法的時候 全都使用BaseSubscribe類

Flowable.just(1)
	.map {
	    // 好比根據id,去找user, 可能返回null
	    Optional.ofNullable(findUserById(it))
	}
	.subscribe(object : BaseSubscribe<User>() {
	    override fun onOpNext(t: User?) {
	        System.out.println("==Next: ${t ?: null}")
	    }
	})

複製代碼

除了用Optional包裝以外,其實本身也能夠約定一個數據類型,好比Result<T>

class Result<T> {
    var data: T? = null
    ...
}

複製代碼
相關文章
相關標籤/搜索