Android進階系列之第三方庫知識點整理。java
知識點總結,整理也是學習的過程,若有錯誤,歡迎批評指出。web
第一篇:Rxjava2(一)、基礎概念及使用
第二篇:Rxjava2(二)、五種觀察者模式及背壓數組
終於到操做符了,我以爲rxjava2
如此好用,絕對少不了操做符的功勞,下面這張圖你就簡單的掃一眼,別慌,咱們慢慢啃。併發
上一篇講了,rxjava
有五種觀察者建立模式,其中Observable
和Flowable
差很少,只是Flowable
支持背壓,而其它三種,都是簡化版的Observable
,因此,本篇以Observable
方式來說操做符的使用。app
Observable
源碼ide
Observable
是一個抽象類,繼承ObservableSource
函數
ObservableSource
:post
這類操做符,建立直接返回Observable
學習
create
是最經常使用的一個操做符,該操做符的參數中產生的emitter
發射器,經過onNext
不斷給下游發送數據,也能夠發送onComplete
、onError
事件給下游。fetch
須要發送給下游的數據,就經過emitter.onNext()給下游發送。
當發送了
onComplete
或者onError
事件後,下游中止接收剩下的onNext
事件
示意圖:
方法:
static <T> Observable<T> create(ObservableOnSubscribe<T> source)
複製代碼
demo:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
// .....
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: s=" + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
});
複製代碼
結果:
這類操做符,直接將一個數組
、集合
拆分紅單個ObJect數據依次發送給下游,也能夠直接發送Object數據。
轉換一個或多個 Object
數據,依次將這些數據發射到下游。
最多接收十個
Object
參數。
示意圖:
方法:
A : 最多隻接收十個參數。
Demo:
Observable.just("A", "B", "C", "D")
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
複製代碼
結果:
直接傳入一個數組數據,操做符將數組裏面的元素按前後順序依次發送給下游,能夠發送十個以上的數據。
示意圖:
方法:
static <T> Observable<T> fromArray(T... items)
複製代碼
Demo:
String[] data = new String[]{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"};
Observable.fromArray(data)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "fromArray--accept: s=" + s
);
}
});
複製代碼
結果:
直接傳入一個集合數據,操做符將集合裏面的元素按前後順序依次發送給下游,能夠發送十個以上的數據。
示意圖:
方法:
static <T> Observable<T> fromIterable(Iterable<? extends T> source)
複製代碼
Demo:
List<String> mData = new ArrayList<>();
mData.add("A");
mData.add("B");
mData.add("C");
Observable.fromIterable(mData)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "fromIterable--accept: s=" + s);
}
});
複製代碼
結果:
快速建立一個被觀察者對象,連續發送一個指定開始和總數的事件序列
當即發送,無延時
示意圖:
方法:
static Observable<Integer> range(final int start, final int count)
複製代碼
Demo:
// 從3開始發送,直到發送了十個數據中止。
Observable.range(3, 10).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "range--accept: integer=" + integer);
}
});
複製代碼
結果:
快速建立一個被觀察者,延遲必定時間後再每隔指定的一個時間發送一個事件(從0開始的整數)給下游。
發送數據從0開始,依次+1整數遞增
延遲時間能夠爲0,重載方法不設置默認使用第二個參數數值。
示意圖:
方法:
// initialDelay:發射第一個值須要等待時間
// period:後續每隔多少秒發射一個值
// unit:前兩個參數的時間單位
Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
// 兩參方法
public static Observable<Long> interval(long period, TimeUnit unit) {
// 第一個參數和第二個參數一致,即延遲period後再每隔period秒發送一個事件。
// 默認使用 Schedulers.computation()
return interval(period, period, unit, Schedulers.computation());
}
複製代碼
示意圖:
方法:
// initialDelay:發射第一個值須要等待時間
// period:後續每隔多少秒發射一個值
// unit:前兩個參數的時間單位
// scheduler:等待發生併發出項目的調度程序
static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
複製代碼
demo:
// 延遲2秒後發送一個事件,後續每隔五秒發送一個事件
Observable.interval(2, 5, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: aLong=" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: error" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
複製代碼
快速建立1個被觀察者對象,每隔指定時間發送1個事件,能夠指定事件發送開始的值和總的值。
示意圖:
方法:
// start:範圍起始值
// count:要發出的值的總數,若是爲零,則操做員在初始延遲後發出onComplete。
// initialDelay:發出第一個值(開始)以前的初始延遲
// period:後續值之間的時間段
// unit:前面時間參數單位
static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
複製代碼
demo:
// 第一個延遲三秒後發送int值2,後續每隔1秒累加發送給下游,一共發送10個數據。
Observable.intervalRange(2, 10, 3, 1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: aLong=" + aLong);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: error" + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
複製代碼
建立一個Observable
對象,被觀察者邏輯真正執行的時機是在其被訂閱的時候。
當下遊訂閱後,上游纔開始處理邏輯。
示意圖:
方法:
//
static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
複製代碼
demo:
String[] mStrings = new String[]{"A", "B", "C", "D"};
Observable observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
// 上游發送mStrings數組
return Observable.fromArray(mStrings);
}
});
// 在訂閱以前,將數組數據改變
mStrings = new String[]{"defer,訂閱時候才建立"};
// 訂閱
observable.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
複製代碼
建立一個被觀察者對象,上游延時指定的時間後發送一個事件到下游。
發送的數值爲Long型的0
示意圖:
方法:
// delay:發射單個數據以前的延時
// unit:前者時間單位
// scheduler:指定的調度程序 (默認爲Schedulers.computation())
static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
複製代碼
demo:
public void timer() {
// 延遲5秒後發送Long型值0到下游,可指定Schedulers,默認Schedulers.computation()
Observable.timer(5, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.d(TAG, "accept: aLong=" + aLong);
}
});
}
複製代碼
結果:
對上游發送的每個事件都進行指定的函數處理,從而變換成另外一個事件再發送給下游。
常使用場景:用做數據類型轉換
示意圖:
方法:
// R:輸出類型
// mapper:應用於ObservableSource發出的每一個項目的函數
final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
複製代碼
demo:
public void map() {
// 經過just發送整型數值一、二、3
Observable.just(1, 2, 3).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
// 經過Map操做符對上游的數據進行函數處理,再轉換成指定的事件發送給下游
return integer + "變換";
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
複製代碼
將一個發送事件的上游Observable變換爲多個發送事件的Observables,而後將它們發射的事件單獨作處理後再合併放進一個單獨的Observable裏發送給下游。
示意圖:
能夠看到上游發送了三個事件(注意顏色),中間對每一個事假數據進行處理後(每個圓變成兩個矩形),再合併成包含六個矩形事件的Observable對象發送給下游,注意矩形顏色,他是無規律,無序的,並非嚴格按照上游發送的順序來發送給下游。
方法:
final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
複製代碼
demo:
public void flatMap() {
// 被觀察者經過just發送整型數值一、二、3
Observable.just(1, 2, 3).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
// 對收到的數值再進行函數處理。
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("變換後的數據" + integer);
}
// 將函數處理後的數據,在包裝成一個Observable對象發送給下游。
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
複製代碼
同flatMap
同樣的功能,只是flatMap
不能保證轉換後發送給下游事件的時序,concatMap轉換後能嚴格按照上游發送的順序再發送給下游。
示意圖:
同
flatMap
同樣,重點注意顏色,轉換後顏色和上游發送的順序一致,有序發送
方法:
final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int prefetch)
複製代碼
demo:
public void concatMap() {
// 被觀察者經過just發送整型數值一、二、3
Observable.just(1, 2, 3).concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
// 對收到的數值再進行函數處理。
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("變換後的數據" + integer);
}
// 將函數處理後的數據,在包裝成一個Observable對象發送給下游。
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
複製代碼
組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行
組合的被觀察者數量要求小於等於4個,從提供的方法參數裏面能夠得知。
示意圖:
方法:
public static <T> Observable<T> concat(
ObservableSource<? extends T> source1, ObservableSource<? extends T> source2,
ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
複製代碼
demo:
public void concat() {
// 用just操做符建立三個Observable對象
Observable<String> observable1 = Observable.just("1", "2");
Observable<String> observable2 = Observable.just("A", "B", "C");
Observable<String> observable3 = Observable.just("hello", "rxjava");
// 使用concat操做符合並三個Observable對象,並將合併後的數據順序(串行)發送給下游
Observable.concat(observable1
, observable2, observable3)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
複製代碼
同concat
同樣,組合多個被觀察者一塊兒發送數據,合併後 按發送順序串行執行
concatArray對組合的被觀察者對象沒有個數限制,能夠大於4個。
示意圖:
上游發送的是一個組合的觀察者數組,沒有數量限制(注意顏色)
轉換後串行發送(顏色和上游發送順序對應)
方法:
static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
複製代碼
demo:
public void concatArray() {
Observable<String> observable1 = Observable.just("1", "2");
Observable<String> observable2 = Observable.just("A", "B", "C");
Observable<String> observable3 = Observable.just("D", "E");
Observable<String> observable4 = Observable.just("F");
Observable<String> observable5 = Observable.just("G");
Observable.concatArray(observable1, observable2, observable3, observable4, observable5)
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: s=" + s);
}
});
}
複製代碼
使用concat操做符時,若是遇到其中一個被觀察者發出onError
事件則會立刻終止其餘被觀察者的事件,若是但願onError
事件推遲到其餘被觀察者都結束後才觸發,可使用對應的concatDelayError。
方法:
public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources) {
ObjectHelper.requireNonNull(sources, "sources is null");
return concatDelayError(fromIterable(sources));
}
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources) {
return concatDelayError(sources, bufferSize(), true);
}
public static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd)
複製代碼
demo:
public void concatArrayDelayError() {
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
emitter.onNext("B");
emitter.onNext("C");
emitter.onError(new NullPointerException(""));
emitter.onNext("D");
}
});
Observable.concatArrayDelayError(observable, Observable.just("E", "F"))
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: ");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: s="+s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: e" + e.getMessage(), e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
}
複製代碼
結果:
能夠看到,第一個observable發送到c後,手動拋出一個錯誤,可是並滅有影響到Observable.just("E", "F")的執行,咱們依舊打印出了 E,F兩個參數後纔去執行咱們手動拋出的NullPointerException錯誤
。。。。
操做符這部份內容比較多,先整理這部分,後面會對其餘操做符再作整理。