本篇文章已受權微信公衆號 YYGeeker
獨家發佈轉載請標明出處bash
CSDN學院課程地址微信
- RxJava2從入門到精通-初級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-中級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-進階篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-源碼分析篇:edu.csdn.net/course/deta…
Observable是最基本的響應類型,但不支持背壓,基本上適用大多數的應用場景ide
有關背壓的概念等,都會在下一章介紹源碼分析
public static void observable() {
//建立被觀察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
//默認在主線程裏執行該方法
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
})
//將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
//將觀察者切換到主線程 須要在Android環境下運行
//.observeOn(AndroidSchedulers.mainThread())
//建立觀察者並訂閱
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError=" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
複製代碼
輸出spa
onNext=俊俊俊很帥
onNext=你值得擁有
onNext=取消關注
onNext=但仍是要保持微笑
onComplete
複製代碼
Flowable和Observable的使用基本相同,只不過Observable不支持背壓,而Flowable支持背壓。但須要注意的是,使用Flowable的時候,必須調用Subscription的requsest方法請求,否則上游是不會發射數據的.net
public static void flowable() {
//建立被觀察者
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> e) throws Exception {
e.onNext("俊俊俊很帥");
e.onNext("你值得擁有");
e.onNext("取消關注");
e.onNext("但仍是要保持微笑");
e.onComplete();
}
}, BackpressureStrategy.DROP)
//將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
//將觀察者切換到主線程 須要在Android環境下運行
//.observeOn(AndroidSchedulers.mainThread())
//建立觀察者並訂閱
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(2);
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable t) {
System.out.println("onError=" + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
複製代碼
輸出線程
onNext=俊俊俊很帥
onNext=你值得擁有
複製代碼
Single只發射一個元素,發射onSuccess
或onError
方法,因此沒有complete方法,不像Observable或者Flowable,數據發射完成以後,須要調用complete告訴下游已經完成code
public static void single() {
//建立被觀察者
Single.create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(SingleEmitter<String> e) throws Exception {
e.onSuccess("success");
}
})
//將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
//將觀察者切換到主線程 須要在Android環境下運行
//.observeOn(AndroidSchedulers.mainThread())
//建立觀察者並訂閱
.subscribe(new SingleObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println("onSuccess=" + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError=" + e.getMessage());
}
});
}
複製代碼
輸出server
onSuccess=success
複製代碼
Completable不會發射數據,只會給下游發送一個信號。回調onComplete
或onError
方法ip
public static void completable() {
//建立被觀察者
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter e) throws Exception {
e.onComplete();
}
})
//將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
//將觀察者切換到主線程 須要在Android環境下運行
//.observeOn(AndroidSchedulers.mainThread())
//建立觀察者並訂閱
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println("onError=" + e.getMessage());
}
});
}
複製代碼
輸出
onComplete
複製代碼
Maybe是Single和Completable的結合,須要注意的是onSuccess和onComplete方法只會執行其中一個,這不一樣於Observable和Flowable最後是以onComplete()結尾
public static void maybe() {
//建立被觀察者
Maybe.create(new MaybeOnSubscribe<String>() {
@Override
public void subscribe(MaybeEmitter<String> e) throws Exception {
e.onSuccess("success");
e.onComplete();
}
})
//將被觀察者切換到子線程
.subscribeOn(Schedulers.io())
//將觀察者切換到主線程 須要在Android環境下運行
//.observeOn(AndroidSchedulers.mainThread())
//建立觀察者並訂閱
.subscribe(new MaybeObserver<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(String s) {
System.out.println("onSuccess=" + s);
}
@Override
public void onError(Throwable e) {
System.out.println("onError=" + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
複製代碼
輸出
onSuccess=success
複製代碼