什麼是RxJava? GitHub給出的介紹是:RxJava是ReactiveX(Reactive Extensions)的Java VM實現:用於經過使用可觀察序列來編寫異步和基於事件的程序的庫。java
在個人理解中RxJava主要能夠實現異步任務,和事件總線的功能,這也是RxJava的厲害之處。git
RxJava的GitHub地址:github
關於RxJava的使用詳解,該篇文章會介紹更加詳細,並且主要偏向於對RxJava的使用。數據結構
建立一個觀察者,有兩種方式實現:建立Observer和Subscriber。併發
Observer:app
Observer<String> observer = new Observer<String>() {
@Override
public void onError(Throwable e) {
Log.i("test", "onError");
}
@Override
public void onComplete() {
Log.i("test", "onComplete");
}
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("test", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.i("test", "onNext----->" + s);
}
};
複製代碼
Subscriber:異步
Subscriber subscriber = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
Log.i("test", "onSubscribe");
}
@Override
public void onNext(Object o) {
Log.i("test", "onNext");
}
@Override
public void onError(Throwable t) {
Log.i("test", "onError");
}
@Override
public void onComplete() {
Log.i("test", "onComplete");
}
}
複製代碼
建立被觀察者,而且須要和觀察者訂閱起來,在RxJava中的被觀察者是Observable使用subscribe方法訂閱。ide
建立被觀察者有三種方式能夠實現:函數
1)使用Observable.create建立
Observable.create(new ObservableOnSubscribe<String>(){
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("你好");
}
}).subscribe(observer);//訂閱
複製代碼
在建立方式須要在subscribe方法裏,手動調用Observer的onNext、onError、onComplete方法,而onSubscribe方法是自動調用的。
2)Observable.just 可使用Observable.just方式來建立一個Observable
Observable.just("你好","hello world").subscribe(observer);
複製代碼
使用Observable.just建立,而後subscribe訂閱,這種方式會自動調用onSubscribe、onNext、onError和onComplete方法。
3)Observable.fromArray
使用Observable.fromArray來建立一個Observable對象。
String[] quotations = {"熱愛祖國", "熱愛人民"};
Observable.fromArray(quotations).subscribe(observer);
複製代碼
使用Observable.fromArray建立,而後訂閱,和Observable.just同樣,會自動調用觀察者的方法。
在上面咱們已經建立了一個觀察,建立一個觀察者,其中包括四個方法:onError、onComplete、onSubscribe和onNext。
那麼這幾個方法都表示什麼呢?
onSubscribe:被觀察者訂閱觀察者的時候,就會觸發該方法。
onCompleted:事件隊列完結。RxJava 不只把每一個事件單獨處理,還會把它們看作一個隊列。RxJava 規定,當不會再有新的 onNext 發出時,須要觸發 onCompleted方法做爲標誌。
onError: 事件隊列異常。在事件處理過程當中出異常時,onError方法會被觸發,同時隊列自動終止,不容許再有事件發出。
onNext:表示普通事件,能夠在該方法作一些業務邏輯處理。
操做符包括普通的操做符、變換操做符、過濾操做符、組合操做符、輔助操做符、錯誤處理操做符、條件操做符、布爾操做符和轉換操做符。
使用,如:
Observable.intervalRange(0,6,0,3,TimeUnit.SECONDS).create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("666");
}
}).subscribe(observer);
複製代碼
intervalRange該操做符是用於延遲執行、而且按期執行。
map:指定一個Function對象,將Observable轉換爲一個新的Observable併發射。
Observable.just("你好","hello world").map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s;
}
}).subscribe(observer);
複製代碼
flatMap、cast:
Observable.just("你好","hello world").flatMap(new Function<String, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull String s) throws Exception {
return Observable.just(s);
}
}).cast(String.class).subscribe(observer);
複製代碼
flatMap將Observable發射的數據集合變換爲Observable集合,而後將這些Observable發射的數據平坦地放進一個單獨的Observable,而cast則強制將Observable發射的全部數據轉換爲指定類型。
buffer操做符功能:
一、能一次性集齊多個結果到列表中,訂閱後自動清空相應結果,直到徹底清除
二、 也能夠週期性的集齊多個結果到列表中,訂閱後自動清空相應結果,直到徹底清除
Observable
.range(0,5)
.buffer(2)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull List<Integer> integers) {
Log.i("test","----------------->onNext:" + integers);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
Observable
.just("你好","hello world","我愛我家")
.buffer(3)
.subscribe(new Observer<List<String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull List<String> strings) {
Log.i("test",""+strings);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
複製代碼
除了以上的變換操做符,還有groupBy操做符,進行分組操做。
過濾操做符包括filter、skip、take、element等等。
filter:對Observable產生的結果自定義規則進行過濾,只有知足條件的結果才提交給訂閱者。
Observable
.just("你好","hello world","我愛我家")
.filter(new Predicate<String>() {
@Override
public boolean test(@NonNull String s) throws Exception {
Log.i("test",""+s);
return s.equals("你好");
}
}).subscribe(observer);
複製代碼
distinct:去重
Observable
.just("你好","hello world","我愛我家","我愛我家")
.distinct()
.subscribe(observer);
複製代碼
skip:過濾掉前n項
Observable
.just("你好","hello world","我愛我家","我愛我家")
.skip(2)
.subscribe(observer);
複製代碼
take:取前n項
Observable
.just("你好","hello world","我愛我家","我愛我家")
.take(2)
.subscribe(observer);
複製代碼
throttleWithTimeout:若是在限定的時間內,源Observable有新的數據發射出來,該數據就會被丟棄,同時throttleWithTimeout從新開始計時,若是每次都是在計時結束前發射數據,那麼這個限流就會走向極端(只會發射最後一個數據)
Observable
.just("你好","hello world","我愛我家","我愛我家")
.throttleWithTimeout(200, TimeUnit.MILLISECONDS)
.subscribe(observer);
複製代碼
組合操做符包括:merge、startWidth、concat、jion、switch和zip等等。
merge:將多個Observable合併到一個Observable中進行發射。
Observer<String> observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("test", "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i("test", "onNext--->" + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i("test", "onError");
}
@Override
public void onComplete() {
Log.i("test", "onComplete");
}
};
Observable<String> observable1 = Observable.just("你好", "hello World");
Observable<String> observable2 = Observable.just("new obj", "mergeobj");
Observable.merge(observable1, observable2).subscribe(observer);
複製代碼
concat:將多個Observable發射的數據合併發射,其具備嚴格的順序,發射順序具備隊列的特色。前一個數據沒有發射完成不會發射後一個數據。
Observable<String> observable1 = Observable.just("你好", "hello World");
Observable<String> observable2 = Observable.just("new obj", "mergeobj");
Observable.concat(observable1, observable2).subscribe(observer);
複製代碼
除了以上的組合操做符,還有zip、combineLastest等。
zip:合併兩個或者多個Obserable發射出的數據項,根據指定的函數變換它們,併發射一個新值。
輔助操做符包括DO、delay、observeOn、timeout、timeInterval、timestamp、subscribeOn、meterialize和to等。
delay:延遲執行發射數據
Observable<String> observable1 = Observable.just("你好", "hello World");
Observable<String> observable2 = Observable.just("new obj", "mergeobj");
Observable.concat(observable1, observable2).delay(5, TimeUnit.SECONDS).subscribe(observer);
複製代碼
subscribeOn:指定Obserable自身在那個線程上運行。
observeOn:指定Obserable發射出的數據在那個線程運行。
其餘的操做符讀者能夠自行實踐。
在rxjava中,錯誤操做符包括catch和retry。
catch可以攔截原始Observable的onError通知,將它替換爲其餘數據項或者數據序列,讓產生的Observable可以正常終止或者根本不終止。 catch實現分爲三個不一樣的操做符:
一、onErrorReturn:返回原有Observable行爲的備用Observable, 備用的Observable忽略原有的Observable的onError調用,即不會將錯誤傳遞給觀察者,而是發射一個特殊的項,以及調用觀察者的onCompleted。
二、onErrorResumeNext:跟onErrorReturn同樣返回備用的Observable,不會調用原有的Observable的onError,它會發射備用的Observable數據。
三、onExceptionResumeNext:若是onError收到的Throwable不是一個Exception,它會將錯誤傳遞給觀察者的onError方法,不會使用備用的Observable。
retry:不會將原有的Observable的onError通知傳遞給觀察者,這會訂閱這個Observable,再給它一次機會無錯誤地完成其數據序列,而它總會傳遞onNext通知給觀察者。該操做符有可能形成數據重複,由於從新訂閱。若是超過了從新訂閱的次數,就不會再次訂閱了,而是把最新的一個onError通知傳遞給觀察者。
條件操做符包括:defaultEmpty、skipUntil、amb、skipWhile、takeUtil、takeWhile
defaultEmpty:若是原有的Observable沒有發射數據,就發射一個默認的數據。
skipUntil:訂閱原始的Observable,可是忽略它的發射物,直到第二個Observable發射了一項數據那一刻,它開始發射原始Observable。
布爾操做符包括:all、isEmpty、contains、exists和sequenceEqual。
關於條件操做符和布爾操做符,讀者能夠關注《RxJava操做符(08-條件和布爾操做) 》這篇文章,文章地址:
轉換操做符可以將Observable轉換爲另外一個對象或者數據結構,其中轉換操做符包括:toMap、toMultiMap、toList、toSortedList、nest和getIterator等。
toMap:將原始的Observable發射的全部數據項集合到一個Map中,而後發射該Map。
String s1 = "你好";
String s2 = "hello world";
String s3 = "lalala";
Observable.just(s1,s2,s3).toMap(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
return s;
}
}).subscribe(new SingleObserver<Map<String, String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull Map<String, String> stringStringMap) {
Log.i("test",""+stringStringMap);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
複製代碼
toMultiMap:相似於toMap,不一樣的地方在於map的value是一個集合。
toList:將發射的數據組成一個List。
String s1 = "你好";
String s2 = "hello world";
String s3 = "lalala";
Observable.just(s1,s2,s3).toList().subscribe(new SingleObserver<List<String>>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onSuccess(@NonNull List<String> strings) {
Log.i("test",""+strings);
}
@Override
public void onError(@NonNull Throwable e) {
}
});
複製代碼
關於其餘操做符,讀者能夠參考《RxJava操做符大全》這篇文章。
前面講解輔助操做符的時候,咱們知道使用subscribeOn能夠指定Obserable自身在那個線程上運行。使用observeOn能夠指定Obserable發射出的數據在那個線程運行。RxJava默認線程是在調用subcribe方法的線程上進行回調,可是若是想切換線程,就須要使用Scheduler。
在RxJava中內置瞭如下幾個Scheduler:
一、Scheduler.immediate():運行在當前線程,是timeout、timestamp和timeInterval操做符的默認調度器。
二、Scheduler.io():I/O操做使用的Scheduler。
三、Scheduler.newThread():開啓一個新的線程執行操做。
2和3的區別就是:2的內部實現了一個無數量上限的線程池,重用空閒的線程,所以2具備更高的效率。
四、Scheduler.trampoline():能夠將任務經過trampoline方法加入隊列,該調度器會按順序處理隊列的任務,是repeat和retry操做符的默認調度器。
五、Scheduler.computation():計算所使用的調度器,它具備固定的線程池,大小爲cpu核數,注意不要將io操做放到computation中,不然io操做的等待時間會浪費cpu。該調度器是buffer、delay、sample、debounce、interval和skip的默認調度器。
六、AndroidSchedulers.mainThread():表示在主線程中運行,該調度器是RxAndroid提供的。
Observable.just("你好","hello world")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
複製代碼
寫到這裏,關於RxJava的知識基本講解完了,相信讀者讀完該文,也懂得使用RxJava了,接下來我更新RxJava結合Retrofix和OkHttp的使用,敬請關注,謝謝!