當被觀察者(Observable)
和觀察者(Subscriber)
在不一樣的線程中工做,而且Observable
生產事件的速度比Subscriber
消費事件的速度快時,就是將事件積壓在隊列(queue)
裏面,當queue裏大小超過必定大小時,就會拋出MissingBackpressureException
java
好比下面的代碼,Observable
每1毫秒生產一個事件,但Subscriber
每一秒才消費一個事件,因此一運行就會立馬報MissingBackpressureException
錯誤git
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe({
Thread.sleep(1000)
System.out.println("onNext==$it")
})
複製代碼
那queue大小超過多少,會報錯呢??,我們直接看rxjava1
的源碼github
static {
int defaultSize = 128;
// lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
if (PlatformDependent.isAndroid()) {
defaultSize = 16;
}
// possible system property for overriding
String sizeFromProperty = System.getProperty("rx.ring-buffer.size"); // also see IndexedRingBuffer
if (sizeFromProperty != null) {
try {
defaultSize = Integer.parseInt(sizeFromProperty);
} catch (NumberFormatException e) {
System.err.println("Failed to set 'rx.buffer.size' with value " + sizeFromProperty + " => " + e.getMessage()); // NOPMD
}
}
SIZE = defaultSize;
}
複製代碼
可見若是發現是Android
的話,size是16;若是是其它的話就是128緩存
咱們先來看一下observeOn的幾個重載方法bash
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, RxRingBuffer.SIZE);
}
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
return observeOn(scheduler, false, bufferSize);
}
複製代碼
可見 能夠在指定消費線程的時候指定隊列大小;而咱們經常使用的是第一個方法,因此默認的大小就是上面說的16, 咱們能夠使用第二個重載方法指定隊列大小ide
BackpressureStrategy
先看看rxjava2
的默認隊列大小,源碼以下ui
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
}
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public static int bufferSize() {
return BUFFER_SIZE;
}
...
}
複製代碼
Observable
的源碼以下:spa
public abstract class Observable<T> implements ObservableSource<T> {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public static int bufferSize() {
return Flowable.bufferSize();
}
...
}
複製代碼
可見rxjava2
的Flowable
、Observable
默認大小都是128(rxjava
是16),但rxjava2
新增的Flowable
支持背壓策略BackpressureStrategy
, 而Observable
不支持線程
BackpressureStrategy
MissingBackpressureException
MissingBackpressureException
MISSING 和 ERROR 我的感受沒什麼差異,感受效果是同樣的code
順便說一句 retrofit2
使用的是BackpressureStrategy.LATEST
策略
Single
只有 onSuccess
和 onError
事件,onSuccess
相似於Observable
或Flowable
的onNext()
方法, 用來發射生產的數據,Single
只能發射一個數據,當發射多個數據時,則無效;好比
Single.create(SingleOnSubscribe<Int> {
try {
it.onSuccess(1)
it.onSuccess(2)
} catch (e: Exception) {
it.onError(e)
}
}).subscribe({
System.out.println("==onNext: $it")
}, {
System.out.println("==onError==")
})
複製代碼
打印結果以下:
==onNext: 1
複製代碼
可見並無打印 「2」 這個數據
注意: 若是在onSuccess回調中報錯,則不會走onError, 而是直接crash
Completable
只有onComplete
和onError
事件,它不發射任何數據給觀察者,只告訴觀察者完成了(onComplete)
或出錯了(onError
)
Completable.create(CompletableOnSubscribe {
try {
//do something success
it.onComplete()
} catch (e: Exception) {
it.onError(e)
}
}).subscribe({
System.out.println("==onComplete==")
}, {
System.out.println("==onError==")
})
複製代碼
注意: 若是在onComplete回調中報錯,則不會走onError, 而是直接crash
Maybe
能夠當作是Single
和Completable
的結合,它有onSuccess
、onComplete
、onError
事件,但相互都是互斥的;同時Maybe
也只能發射一個數據
若是有數據,則調用onSuccess
發射數據給觀察者
若是沒有數據,也沒有出錯,則調用onComplete
,告訴觀察者已完成
若是出錯,則調用onError
,告訴觀察者出錯了
Maybe.create(MaybeOnSubscribe<Int> {
try {
// 若是有數據
it.onSuccess(1)
// 若是不關心數據
// it.onComplete() // 即便你了這個方法,也不會走onComplete回調,由於已經調了onSuccess方法
} catch (e: Exception) {
it.onError(e)
}
}).subscribe({
System.out.println("==onSuccess: $it")
}, {
System.out.println("==onError==")
}, {
System.out.println("==onComplete==")
})
複製代碼
Maybe跟Single同樣,即便發射了多個數據,後面的數據也不會處理
注意: 若是在onSuccess 或 onComplete回調中報錯,則不會走onError, 而是直接crash
在使用rxjava發射數據時,若是發射的數據時null,則會直接走onError事件
FlowableEmitter的onNext實現源碼以下
@Override
public void onNext(T t) {
...
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
...
}
複製代碼
若是必定要發射null數據,但又不想走onError事件,由於它不屬於異常,該怎麼辦呢??
Flowable.create(FlowableOnSubscribe<Optional<Int>> {
try {
it.onNext(Optional.ofNullable(1))
it.onNext(Optional.ofNullable(null))
it.onComplete()
} catch (e: Exception) {
it.onError(e)
}
}, BackpressureStrategy.LATEST)
.subscribe({
if (it.isPresent) {
System.out.println("==Next: ${it.get()}")
} else {
System.out.println("==Next: null")
}
}, {
System.out.println("==onError==")
}, {
System.out.println("==onComplete==")
})
複製代碼
好比map操做符
Flowable.just(1)
.map {
// 好比根據id,去找user, 可能返回null,但又不屬於onError
Optional.ofNullable(findUserById(it))
}
.subscribe({
if (it.isPresent) {
System.out.println("==Next: ${it.get()}")
} else {
System.out.println("==Next: null")
}
}, {
System.out.println("==onError==")
}, {
System.out.println("==onComplete==")
})
複製代碼
注意:在使用Optional的使用不要直接調get()方法,若是是null,調get()方法會報異常;而是先要用isPresent方法判斷
有的時候對於觀察者回調中,老是使用isPresent去判斷,感受有點不方便,因爲是對於rxjava1升級到rxjava2的時候改動比較大,由於rxjava1的onNext容許返回null,這個時候咱們能夠封裝一個BaseSubscribe
abstract class BaseSubscribe<T> : DisposableSubscriber<Optional<T>>() {
override fun onComplete() {
}
override fun onNext(t: Optional<T>) {
if (t.isPresent) {
onOpNext(t.get())
} else {
onOpNext(null)
}
}
abstract fun onOpNext(t: T?)
override fun onError(t: Throwable) {
}
}
複製代碼
而後調用subscribe方法的時候 全都使用BaseSubscribe類
Flowable.just(1)
.map {
// 好比根據id,去找user, 可能返回null
Optional.ofNullable(findUserById(it))
}
.subscribe(object : BaseSubscribe<User>() {
override fun onOpNext(t: User?) {
System.out.println("==Next: ${t ?: null}")
}
})
複製代碼
Optional
包裝以外,其實本身也能夠約定一個數據類型,好比Result<T>
class Result<T> {
var data: T? = null
...
}
複製代碼