每一個Android開發者,都是愛RxJava的,簡潔線程切換和多網絡請求合併,再配合Retrofit,簡直是APP開發的福音。不知不覺,RxJava一路走來,已經更新到第三大版本了。不像RxJava 2對RxJava 1那麼殘忍,RxJava 3對RxJava 2的兼容性仍是挺好的,目前並無作出很大的更改。RxJava2到2020年12月31號再也不提供支持,錯誤的會同時在2.x和3.x修復,但新功能只會在3.x上添加。java
同時,但願經過本文,能知道垃圾箱顏色分類。react
做爲嚐鮮,趕忙品嚐吧。git
implementation "io.reactivex.rxjava3:rxjava:3.0.0-RC0"
複製代碼
很差意思哦,還沒看到RxAndroid出3.0,這就很尷尬了...github
在RxJava,數據以流的方式組織。也就是說,Rxjava包括一個源的數據流,數據流後跟着消費者的零個到多個消費數據流步驟。windows
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
複製代碼
在上文代碼中,對於operator2來講,在它前面叫作上流,在它後面的叫作下流。憋住,別笑,真的是下流來的。緩存
在RxJava的文檔中,emission, emits, item, event, signal, data and message都被認爲在數據流中被傳遞的數據對象。bash
當數據流經過異步的步驟執行時,這些步驟的執行速度可能不一致。也就是說上流數據發送太快,下流沒有足夠的能力去處理。爲了不這種狀況,通常要麼緩存上流的數據,要麼拋棄數據。但這種處理方式,有時會帶來很大的問題。爲此,RxJava帶來了backpressure的概念。背壓是一種流量的控制步驟,在不知道上流還有多少數據的情形下控制內存的使用,表示它們還能處理多少數據。網絡
支持背壓的有Flowable類,不支持背壓的有Observable,Single, Maybe and Completable類。併發
對於咱們Android開發來講,最喜歡的就是它簡潔切換線程的操做。RxJava經過調度器來方便線程的切換。app
在不一樣平臺還有不一樣的調度器,例如Android的主線程:AndroidSchedulers.mainThread()
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
複製代碼
在 RxJava 3 能夠發現有如下幾個基類(跟RxJava 2是一致的吧):
下文關於操做符內容太多了
等須要了,再來查閱
下班時間仍是好好護髮吧
指定觀察者的線程,例如在Android訪問網絡後,數據須要主線程消費,那麼將觀察者的線程切換到主線就須要ObserveOn操做符。每次指定一次都會生效。
指定被觀察者的線程,即數據源發生的線程。例如在Android訪問網絡時,須要將線程切換到子線程。屢次指定只有第一次有效。
數據源(Observable)每發送一次數據,就調用一次。
數據源每次調用onNext() 以前都會先回調該方法。
數據源每次調用onError() 以前會回調該方法。
數據源每次調用onComplete() 以前會回調該方法
數據源每次調用onSubscribe() 以後會回調該方法
數據源每次調用dispose() 以後會回調該方法
其餘的見官網吧,不難
主要講對數據源進行選擇和過濾的經常使用操做符
能夠做用於Flowable,Observable,表示源發射數據前,跳過多少個。例以下面跳過前四個:
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skip(4)
.subscribe(System.out::print);
打印結果:5678910
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.skipLast(4)
.subscribe(System.out::print);
打印結果:1 2 3 4 5 6
複製代碼
skipLast(n)操做表示從流的尾部跳過n個元素。
可做用於Flowable,Observable。在Android開發,一般爲了防止用戶重複點擊而設置標記位,而經過RxJava的debounce操做符能夠有效達到該效果。在規定時間內,用戶重複點擊只有最後一次有效,
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(1_500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2_000);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
打印:A D E onComplete
複製代碼
上文代碼中,數據源以必定的時間間隔發送A,B,C,D,E。操做符debounce的時間設爲1秒,發送A後1.5秒並無發射其餘數據,因此A能成功發射。發射B後,在1秒以內,又發射了C和D,在D以後的2秒才發射E,全部B、C都失效,只有D有效;而E以後已經沒有其餘數據流了,全部E有效。
可做用於Flowable,Observable,去掉數據源重複的數據。
Observable.just(2, 3, 4, 4, 2, 1)
.distinct()
.subscribe(System.out::print);
// 打印:2 3 4 1
Observable.just(1, 1, 2, 1, 2, 3, 3, 4)
.distinctUntilChanged()
.subscribe(System.out::print);
//打印:1 2 1 2 3 4
複製代碼
distinctUntilChanged()去掉相鄰重複數據。
可做用於Flowable,Observable,從數據源獲取指定位置的元素,從0開始。
Observable.just(2,4,3,1,5,8)
.elementAt(0)
.subscribe(integer ->
Log.d("TAG","elmentAt->"+integer));
打印:2
Observable<String> source = Observable.just("Kirk", "Spock", "Chekov", "Sulu");
Single<String> element = source.elementAtOrError(4);
element.subscribe(
name -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onSuccess will not be printed!
複製代碼
elementAtOrError:指定元素的位置超過數據長度,則發射異常。
可做用於 Flowable,Observable,Maybe,Single。在filter中返回表示發射該元素,返回false表示過濾該數據。
Observable.just(1, 2, 3, 4, 5, 6)
.filter(x -> x % 2 == 0)
.subscribe(System.out::print);
打印:2 4 6
複製代碼
做用於 Flowable,Observable。發射數據源第一個數據,若是沒有則發送默認值。
Observable<String> source = Observable.just("A", "B", "C");
Single<String> firstOrDefault = source.first("D");
firstOrDefault.subscribe(System.out::println);
打印:A
Observable<String> emptySource = Observable.empty();
Single<String> firstOrError = emptySource.firstOrError();
firstOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
打印:onError: java.util.NoSuchElementException
複製代碼
和firstElement的區別是first返回的是Single,而firstElement返回Maybe。firstOrError在沒有數據會返回異常。
last、lastElement、lastOrError與fist、firstElement、firstOrError相對應。
Observable<String> source = Observable.just("A", "B", "C");
Single<String> lastOrDefault = source.last("D");
lastOrDefault.subscribe(System.out::println);
//打印:C
Observable<String> source = Observable.just("A", "B", "C");
Maybe<String> last = source.lastElement();
last.subscribe(System.out::println);
//打印:C
Observable<String> emptySource = Observable.empty();
Single<String> lastOrError = emptySource.lastOrError();
lastOrError.subscribe(
element -> System.out.println("onSuccess will not be printed!"),
error -> System.out.println("onError: " + error));
// 打印:onError: java.util.NoSuchElementException
複製代碼
ignoreElements 做用於Flowable、Observable。ignoreElement做用於Maybe、Single。二者都是忽略掉數據,返回完成或者錯誤時間。
Single<Long> source = Single.timer(1, TimeUnit.SECONDS);
Completable completable = source.ignoreElement();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 1秒後打印:Donde!
Observable<Long> source = Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS);
Completable completable = source.ignoreElements();
completable.doOnComplete(() -> System.out.println("Done!"))
.blockingAwait();
// 五秒後打印:Done!
複製代碼
做用於Flowable、Observable、Maybe、過濾掉類型。
Observable<Number> numbers = Observable.just(1, 4.0, 3, 2.71, 2f, 7);
Observable<Integer> integers = numbers.ofType(Integer.class);
integers.subscribe((Integer x) -> System.out.print(x+" "));
//打印:1 3 7
複製代碼
做用於Flowable、Observable,在一個週期內發射最新的數據。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.sample(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print("onComplete"));
// 打印: C D onComplete
複製代碼
與debounce的區別是,sample是以時間爲週期的發射,一秒又一秒內的最新數據。而debounce是最後一個有效數據開始。
做用於Flowable、Observable。throttleLast與smaple一致,而throttleFirst是指定週期內第一個數據。throttleWithTimeout與debounce一致。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(800);
emitter.onNext("D");
Thread.sleep(600);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleFirst(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
//打印:A D onComplete
source.subscribeOn(Schedulers.io())
.throttleLast(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.print(" onComplete"));
// 打印:C D onComplete
複製代碼
之因此拿出來單獨說,我看不懂官網的解釋。而後看別人的文章:throttleFirst+throttleLast的組合?開玩笑的吧。我的理解是:若是源的第一個數據總會被髮射,而後開始週期計時,此時的效果就會跟throttleLast一致。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(500);
emitter.onNext("B");
Thread.sleep(200);
emitter.onNext("C");
Thread.sleep(200);
emitter.onNext("D");
Thread.sleep(400);
emitter.onNext("E");
Thread.sleep(400);
emitter.onNext("F");
Thread.sleep(400);
emitter.onNext("G");
Thread.sleep(2000);
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.throttleLatest(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> Log.e("RxJava",item),
Throwable::printStackTrace,
() -> Log.e("RxJava","finished"));
複製代碼
打印結果:
做用於Flowable、Observable,take發射前n個元素;takeLast發射後n個元素。
Observable<Integer> source = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
source.take(4)
.subscribe(System.out::print);
//打印:1 2 3 4
source.takeLast(4)
.subscribe(System.out::println);
//打印:7 8 9 10
複製代碼
做用於Flowable、Observable、Maybe、Single、Completabl。後一個數據發射未在前一個元素髮射後規定時間內發射則返回超時異常。
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(800);
emitter.onNext("B");
Thread.sleep(400);
emitter.onNext("C");
Thread.sleep(1200);
emitter.onNext("D");
emitter.onComplete();
});
source.timeout(1, TimeUnit.SECONDS)
.subscribe(
item -> System.out.println("onNext: " + item),
error -> System.out.println("onError: " + error),
() -> System.out.println("onComplete will not be printed!"));
// 打印:
// onNext: A
// onNext: B
// onNext: C
// onError: java.util.concurrent.TimeoutException:
The source did not signal an event for 1 seconds
and has been terminated.
複製代碼
經過鏈接操做符,將多個被觀察數據(數據源)鏈接在一塊兒。
可做用於Flowable、Observable。將指定數據源合併在另外數據源的開頭。
Observable<String> names = Observable.just("Spock", "McCoy");
Observable<String> otherNames = Observable.just("Git", "Code","8");
names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));
//打印:
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Spock
RxJava: McCo
複製代碼
可做用全部數據源類型,用於合併多個數據源到一個數據源。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));
//也能夠是
//names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
複製代碼
merge在合併數據源時,若是一個合併發生異常後會當即調用觀察者的onError方法,並中止合併。可經過mergeDelayError操做符,將發生的異常留到最後處理。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable<String> error = Observable.error(
new NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Error!
複製代碼
可做用於Flowable、Observable、Maybe、Single。將多個數據源的數據一個一個的合併在一塊兒哇。當其中一個數據源發射完事件以後,若其餘數據源還有數據未發射完畢,也會中止。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code", "8");
names.zipWith(otherNames, (first, last) -> first + "-" + last)
.subscribe(item -> Log.d(TAG, item));
//打印:
RxJava: Hello-Git
RxJava: world-Code
複製代碼
可做用於Flowable, Observable。在結合不一樣數據源時,發射速度快的數據源最新item與較慢的相結合。 以下時間線,Observable-1發射速率快,發射了65,Observable-2才發射了C, 那麼二者結合就是C5。
一個發射多個小數據源的數據源,這些小數據源發射數據的時間發生重複時,取最新的數據源。
變化數據源的數據,並轉化爲新的數據源。
做用於Flowable、Observable。指將數據源拆解含有長度爲n的list的多個數據源,不夠n的成爲一個數據源。
Observable.range(0, 10)
.buffer(4)
.subscribe((List<Integer> buffer) -> System.out.println(buffer));
// 打印:
// [0, 1, 2, 3]
// [4, 5, 6, 7]
// [8, 9]
複製代碼
做用於Flowable、Observable、Maybe、Single。將數據元素轉型成其餘類型,轉型失敗會拋出異常。
Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
numbers.filter((Number x) -> Integer.class.isInstance(x))
.cast(Integer.class)
.subscribe((Integer x) -> System.out.println(x));
// prints:
// 1
// 7
// 12
// 5
複製代碼
做用於Flowable、Observable、Maybe。將數據源的元素做用於指定函數後,將函數的返回值有序的存在新的數據源。
Observable.range(0, 5)
.concatMap(i -> {
long delay = Math.round(Math.random() * 2);
return Observable.timer(delay, TimeUnit.SECONDS).map(n -> i);
})
.blockingSubscribe(System.out::print);
// prints 01234
複製代碼
與concatMap做用相同,只是將過程發送的全部錯誤延遲到最後處理。
Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.concatMapDelayError(x -> {
if (x.equals(1L)) return Observable.error(new IOException("Something went wrong!"));
else return Observable.just(x, x * x);
})
.blockingSubscribe(
x -> System.out.println("onNext: " + x),
error -> System.out.println("onError: " + error.getMessage()));
// prints:
// onNext: 2
// onNext: 4
// onNext: 3
// onNext: 9
// onError: Something went wrong!
複製代碼
做用於Flowable、Observable。與contactMap相似,不過應用於函數後,返回的是CompletableSource。訂閱一次並在全部CompletableSource對象完成時返回一個Completable對象。
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletable(x -> {
return Completable.timer(x, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
});
completable.doOnComplete(() -> System.out.println("Info: Processing of all items completed"))
.blockingAwait();
// prints:
// Info: Processing of item "2" completed
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Info: Processing of all items completed
複製代碼
與concatMapCompletable做用相同,只是將過程發送的全部錯誤延遲到最後處理。
Observable<Integer> source = Observable.just(2, 1, 3);
Completable completable = source.concatMapCompletableDelayError(x -> {
if (x.equals(2)) {
return Completable.error(new IOException("Processing of item \"" + x + "\" failed!"));
} else {
return Completable.timer(1, TimeUnit.SECONDS)
.doOnComplete(() -> System.out.println("Info: Processing of item \"" + x + "\" completed"));
}
});
completable.doOnError(error -> System.out.println("Error: " + error.getMessage()))
.onErrorComplete()
.blockingAwait();
// prints:
// Info: Processing of item "1" completed
// Info: Processing of item "3" completed
// Error: Processing of item "2" failed!
複製代碼
做用於Flowable、Observable、Maybe、Single。與contactMap相似,只是contactMap的數據發射是有序的,而flatMap是無序的。
Observable.just("A", "B", "C")
.flatMap(a -> {
return Observable.intervalRange(1, 3, 0, 1, TimeUnit.SECONDS)
.map(b -> '(' + a + ", " + b + ')');
})
.blockingSubscribe(System.out::println);
// prints (not necessarily in this order):
// (A, 1)
// (C, 1)
// (B, 1)
// (A, 2)
// (C, 2)
// (B, 2)
// (A, 3)
// (C, 3)
// (B, 3)
複製代碼
太多了,減小篇幅,你們感興趣本身查閱官網吧。功能與flatMap和contactMap相似。
做用於Maybe、Single,將其轉化爲Flowable,或Observable。
Single<Double> source = Single.just(2.0);
Flowable<Double> flowable = source.flattenAsFlowable(x -> {
return List.of(x, Math.pow(x, 2), Math.pow(x, 3));
});
flowable.subscribe(x -> System.out.println("onNext: " + x));
// prints:
// onNext: 2.0
// onNext: 4.0
// onNext: 8.0
複製代碼
做用於Flowable、Observable。根據必定的規則對數據源進行分組。
Observable<String> animals = Observable.just(
"Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
.concatMapSingle(Observable::toList)
.subscribe(System.out::println);
// prints:
// [TIGER, TURTLE]
// [ELEPHANT]
// [CAT, CHAMELEON]
// [FROG, FISH, FLAMINGO]
複製代碼
做用於Flowable、Observable。對數據進行相關聯操做,例如聚合等。
Observable.just(5, 3, 8, 1, 7)
.scan(0, (partialSum, x) -> partialSum + x)
.subscribe(System.out::println);
// prints:
// 0
// 5
// 8
// 16
// 17
// 24
複製代碼
對數據源發射出來的數據進行收集,按照指定的數量進行分組,以組的形式從新發射。
Observable.range(1, 4)
// Create windows containing at most 2 items, and skip 3 items before starting a new window.
.window(2)
.flatMapSingle(window -> {
return window.map(String::valueOf)
.reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
})
.subscribe(System.out::println);
// prints:
// [1, 2]
// [3, 4]
複製代碼
做用於Flowable、Observable、Maybe、Single。但調用數據源的onError函數後會回到該函數,可對錯誤進行處理,而後返回值,會調用觀察者onNext()繼續執行,執行完調用onComplete()函數結束全部事件的發射。
Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturn(error -> {
if (error instanceof NumberFormatException) return 0;
else throw new IllegalArgumentException();
})
.subscribe(
System.out::println,
error -> System.err.println("onError should not be printed!"));
// prints 0
複製代碼
與onErrorReturn相似,onErrorReturnItem不對錯誤進行處理,直接返回一個值。
Single.just("2A")
.map(v -> Integer.parseInt(v, 10))
.onErrorReturnItem(0)
.subscribe(
System.out::println,
error -> System.err.println("onError should not be printed!"));
// prints 0
複製代碼
可做用於Flowable、Observable、Maybe。onErrorReturn發生異常時,回調onComplete()函數後再也不往下執行,而onExceptionResumeNext則是要在處理異常的時候返回一個數據源,而後繼續執行,若是返回null,則調用觀察者的onError()函數。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new NullPointerException());
e.onNext(4);
})
.onErrorResumeNext(throwable -> {
Log.d(TAG, "onErrorResumeNext ");
return Observable.just(4);
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
複製代碼
結果:
onExceptionResumeNext操做符也是相似的,只是捕獲Exception。可做用於全部的數據源,當發生錯誤時,數據源重複發射item,直到沒有異常或者達到所指定的次數。
boolean first=true;
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
if (first){
first=false;
e.onError(new NullPointerException());
}
})
.retry(9)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
複製代碼
結果:
做用於Flowable、Observable、Maybe。與retry相似,但發生異常時,返回值是false表示繼續執行(重複發射數據),true再也不執行,但會調用onError方法。
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
e.onNext(3);
e.onComplete();
})
.retryUntil(() -> true)
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe ");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError ");
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete ");
}
});
複製代碼
結果:
retryWhen與此相似,但其判斷標準不是BooleanSupplier對象的getAsBoolean()函數的返回值。而是返回的 Observable或Flowable是否會發射異常事件。太多操做符太累了,看得心好累。仍是根據實際開發須要查閱文檔纔是正確的姿式。本文只是RxJava冰山一角,更多請參閱官網。同時不建議立馬在項目上實踐,給它點時間報bug。
若是你看到了這,點個贊,收下我雙膝。若是文章有誤,幫忙指正,謝謝大佬們。