上節初步瞭解了RxJava,本節主要講講RxJava的操做符。官方文檔中能夠看到操做符特別的多,一樣也是RxJava比較重要的內容,在我看來靈活使用RxJava離不開對它的操做符的理解。那本節就根據圖書《RxJava Essentials》上操做符的描述,根據操做符類型分類分別介紹一些操做符的使用。另外本節引用了RxJava-Android-Samples一些實例更好理解操做在實際開發中如何使用。git
take程序員
發射事件流中的前n個事件
Observable.just(1,2,3,4,5,6,7,8,9,10) //只取前4個事件 1,2,3,4 .take(4) .subscribe(...)
takeLastgithub
發射事件流中的後n個事件
Observable.just(1,2,3,4,5,6,7,8,9,10) //只取4個事件 7,8,9,10 .takeLast(4) .subscribe(...)
distinct編程
過濾事件流重複發射的事件
Observable.just(1,2,3,3,2,3,2,4,5,4,5,5) //只取5個事件 1,2,3,4,5 .distinct() .subscribe(...)
distinctUntilChange數組
過濾同樣的事件直到事件發生變化才進行發射
Observable.just(1,2,2,3,3,4,4,4,5,5) //只取5個事件 1,2,3,4,5 .distinct() .subscribe(...)
repeat緩存
重複發射事件流
Observable.just(1,2,3) .repeat(3) .subscribe(...) //訂閱獲取 1,2,3,1,2,3,1,2,3
first併發
發射首個事件
Observable.just(1,2,3) .first() .subscribe(...) //訂閱獲取 1
lastide
發射最後一個事件
Observable.just(1,2,3) .last() .subscribe(...) //訂閱獲取 1
skipLast函數式編程
跳過發射事件流後n個事件
Observable.just(1,2,3,4,5,6,7,8,9,10) .skipLast(4) .subscribe(...) //訂閱獲取 1,2,3,4,5,6
timeout函數
不發射超出指定時間內外的事件
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); deplyTime();//延遲 subscriber.onNext(4); subscriber.onNext(5); } }) .timeout(2, TimeUnit.SECONDS) .subscribe(...) //訂閱獲取 1,2,3
sample
發射在指定時間內事件流中的最後一個事件
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); deplyTime(); subscriber.onNext(4); subscriber.onNext(5); deplyTime(); subscriber.onNext(6); } }) .sample(2,TimeUnit.SECONDS) .subscribe(...) //訂閱獲取 3,5,6
throttleFirst
發射在指定時間內事件流中的第一個事件
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onNext(3); deplyTime(); subscriber.onNext(4); subscriber.onNext(5); deplyTime(); subscriber.onNext(6); } }) .throttleFirst(2,TimeUnit.SECONDS) .subscribe(...) //訂閱獲取 1,4,6
debounce
在計時時間內事件流沒有產生新事件則發射當前事件,如有新事件產生則從新計時
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); deplyTime();//延時兩秒 subscriber.onNext(3); subscriber.onNext(4); subscriber.onNext(5); deplyTime(); deplyTime(); subscriber.onNext(3); subscriber.onNext(4); deplyTimeLittle();//延時一秒 subscriber.onNext(5); subscriber.onNext(4); deplyTimeLittle(); subscriber.onNext(7); deplyTime(); subscriber.onNext(6); } }) .debounce(2,TimeUnit.SECONDS) .subscribe(...) //訂閱獲取 2,5,7,6
實例應用:監聽EditTextView字符變化,在輸入字符後400ms內無變化則訂閱字符串事件
_subscription = RxTextView.textChangeEvents(_inputSearchText) .debounce(400, TimeUnit.MILLISECONDS)// default Scheduler is Computation .filter(changes -> isNotNullOrEmpty(_inputSearchText.getText().toString())) .observeOn(AndroidSchedulers.mainThread()) .subscribe(_getSearchObserver());
flatMap
faltMap用法和map相似但flatMap返回的是Observable<T>對象,另外flatMap支持無序,最後訂閱的事件流並不必定和原來的序列保持一致。
concatMap
concatMap用法和flatMap同樣,只是concatMap可以保證事件流保持原來的序列。
flatMapIterable
flatMapIterable和faltMap相似,但返回類型是Iterable
switchMap
scan
獲取當前事件和後一個事件作特殊處理返回同類型事件,主要應用對事件的包裝。
groupBy
對事件進行分類訂閱,根據自定義篩選規則對事件流分類,經過GroupedObservable.getKey()區分處理事件。
Observable.just(1,2,3,4,5,6,7,8,9,10) .groupBy(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer % 2 == 0; } }).subscribe(new Action1<GroupedObservable<Boolean, Integer>>() { @Override public void call(GroupedObservable<Boolean, Integer> booleanIntegerGroupedObservable) { if(booleanIntegerGroupedObservable.getKey()){ //True booleanIntegerGroupedObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("Subscriber true",integer + "\n"); } }); }else{ //False booleanIntegerGroupedObservable.subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { Log.i("Subscriber false",integer + "\n"); } }); } } }); //訂閱結果 1,3,5,7,9由false處理,2,4,6,8,10由true處理
buffer
將事件流組裝爲數組發射,大小由Buffer決定
Observable.just(1,2,3,4,5,6,7,8,9,10) .buffer(3) .subscribe(...) //訂閱獲取 [1, 2, 3],[4, 5, 6],[7, 8, 9],[10]
實例應用:點擊事件,buffer作定時組裝數組,計時時間內無新事件產生則組裝當前數組發射
RxView.clickEvents(_tapBtn) .map(onClickEvent -> { Timber.d("--------- GOT A TAP"); _log("GOT A TAP"); return 1; }) .buffer(2, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(...);
window
window操做符會在時間間隔內緩存結果,相似於buffer緩存一個list集合,區別在於window將這個結果集合封裝成了observable
bservable.interval(1,TimeUnit.SECONDS) .take(10) .window(3,TimeUnit.SECONDS) .subscribe(...)
cast
設置事件的指定類型
merge
合併事件流,用法和groupBy偏偏相反
Observable.merge(Observable.just(2,3),Observable.just(3,5)) .subscribe(...); //訂閱獲取 2,3,3,5
zip
整合多個事件流將事件結果整合處理再發射事件
Observable.zip(Observable.just(1, 2, 3), Observable.just(1, 2, 3), new Func2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }) .subscribe(...) //訂閱結果 2,4,6
實戰應用:兩個http請求併發,等待兩個結果返回再處理結果實現屢次請求一次處理
Observable.zip( service.getUserPhoto(id), service.getPhotoMetadata(id), (photo, metadata) -> createPhotoWithData(photo, metadata)) .subscribe(photoWithData -> showPhoto(photoWithData));
join
join一樣是將多個事件流結果合併統一處理,當join可控制每一個事件流結果生命週期,,在每一個結果的生命週期內,能夠與另外一個Observable產生的結果按照必定的規則進行合併。
combineLatest
CombineLatest操做符行爲相似於zip,可是隻有當原始的Observable中的每個都發射了一條數據時zip才發射數據。CombineLatest則在原始的Observable中任意一個發射了數據時發射一條數據。當原始Observables的任何一個發射了一條數據時,CombineLatest使用一個函數結合它們最近發射的數據,而後發射這個函數的返回值。
本節一些地方引用的例子使用了函數式編程寫法,這是Java SE 8中一個重要特性。這裏稍微作一個簡短的Lambda介紹。
Lambda表達式是Java SE 8中一個重要的新特性。lambda表達式容許你經過表達式來代替功能接口。 lambda表達式就和方法同樣,它提供了一個正常的參數列表和一個使用這些參數的主體(body,能夠是一個表達式或一個代碼塊)。Lambda表達式還加強了集合庫。
lambda的確讓Java代碼緊湊簡潔,可並行處理集合例如filter、map、reduce等並行函數。但目前看來據我所知使用Lambda的程序員不是大多數,Lambda也下降了代碼的可讀性在開發企業項目不易於維護開發,但不妨先學習瞭解。
在學習RxJava的過程當中我對於RxJava有了本身的理解,找了一張來自泡在網上的日子的一張圖。整個RxJava被訂閱的過程是一個Subscription能夠比喻成工廠生產商品到消費者的過程,產品最終是否能夠到達消費者手中由Subscription決定,observable能夠理解成未加工過的產品(至少在爲被Subscriber消費以前是的),Operations爲整個生產線上每一條流水線。Schers爲一個車間,它表明着Operation在哪車間裏運做。Observable經過流水線的傳遞最終到達消費者Subscer手中。這個過程就像是給一個初始的產品模型在生產製做過程中不斷加工製做組裝,最終達到消費者手中是一個製做加工所想要的商品。這就是我對RxJava的理解,可能有點誤差但大體上和其思想比較接近。
其實關於RxJava的資料有不少,我主要是以學習分享爲目的來講說本身學習RxJava所感所想,但願更和你們一塊兒討論學習進步。