第一次接觸學習RxJava應該是一兩個月前的事情了,但其中也是斷斷續續,最近又再次去學習RxJava,和當初剛接觸RxJava徹底不是一樣的心情,輕鬆了不少,也感覺到了RxJava的魅力,真是不禁衷感嘆太牛了。目前關於RxJava的文章也不少,我的推薦兩篇扔物線的給 Android 開發者的 RxJava 詳解和大頭鬼Bruce的譯文 深刻淺出RxJava系列。那麼這篇文章經過代碼介紹RxJava中的操做符,以及操做符的使用。固然操做符較多,準備分幾篇文章介紹。若是你想提早學習其餘操做符能夠去GitHub,歡迎star項目。javascript
爲了力求有沒有RxJava基礎都能看懂此文,簡單介紹一下RxJava以及一些名詞。在RxJava開源的Github上是這樣解釋的a library for composing asynchronous and event-based programs using observable sequences for the Java VM。不管多麼複雜的邏輯,均可以保持整潔的代碼格式。java
在RxJava中最重要的就是Observable(被觀察者),subscribe(訂閱),Observer(觀察者)或者Subscriber(訂閱者),Observable也就是數據(事件)源,Subscriber負責接收以及處理數據(事件)。固然要想實現二者通訊,須要有一種機制那就是訂閱。Observer 經過 subscribe() 方法實現訂閱關係,從而 Observable 能夠在須要的時候發出事件來通知 Observer。git
例如張三(觀察者)想看某款新聞軟件的科技信息(被觀察者),因爲科技信息是天天推送或者不定時推送,若是張三一直盯着手機屏幕看而且刷新消息是否是又新的信息,顯然不現實。這時候就能夠經過張三 subscribe(訂閱)科技信息,而實現當有新的科技信息時自動給張三推送消息,在這期間,張三並不須要一直盯着屏幕刷新聞。在咱們平時的認知中實現訂閱應該是張三.subscribe(科技新聞),不過在RxJava代碼中實現訂閱應該寫成科技新聞.subscribe(張三)。github
在RxJava中,有三個事件回調方法,分別是onNext(),OnError(),onCompleted(),onNext()是最終輸出及處理數據的回調,在發射數據過程當中出現錯誤異常會回調OnError()方法,當不會再有新的 onNext() 發出時,須要觸發 onCompleted() 方法做爲標誌。,OnError()和onCompleted()是互斥的。下面舉一個最簡單的例子數組
Observable observable2 = Observable.just("也許當初忙着微笑和哭泣", "忙着追逐天空中的流星", "人理所固然的忘記", "是誰風裏雨裏一直默默守護在原地");
Subscriber subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " )
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ")
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+s )
}
};
observable.subscribe(subscriber);複製代碼
運行後打印信息爲app
onNext: 也許當初忙着微笑和哭泣
onNext: 忙着追逐天空中的流星
onNext: 人理所固然的忘記
onNext: 是誰風裏雨裏一直默默守護在原地
onCompleted:複製代碼
咱們可使用該操做符從零開始建立一個Observable,給這個操做符傳遞一個接受觀察者做爲參數的函數,並調用觀察者的onNext,onError和onCompleted方法。以下async
//被觀察者
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//能夠屢次調用subscriber.onNext("你們好")發射數據
subscriber.onNext("你們好");
subscriber.onNext("我開始學習RxJava");
subscriber.onCompleted();
}
});複製代碼
發送數據須要在毀掉方法call中調用subscriber的onNext(),onNext(T)發送的參數須要和Observable.OnSubscribe
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext:"+s);
}
};複製代碼
數據成功發送後,會回調Subscriber的onNext()的方法,其中的參數就是接收到的數據。當onNext()接收數據完畢後會執行onCompleted(),若是中途有環節出現錯誤異常,會執行onError()。如今觀察者和被觀察者都建立完畢了,他們執行還須要一個前提就是訂閱,若是不訂閱,observable並不會發射數據,subscribe也不會接收數據,訂閱代碼以下函數
observable.subscribe(subscriber);複製代碼
執行後輸出信息post
onNext:你們好
onNext:我開始學習RxJava
onCompleted複製代碼
該操做符是將其它種類的對象和數據類型轉換爲Observable,若是當你發射的的數據是同一種類型,而不是混合使用Observables和其它類型的數據,會很是方便。以下建立Observable
Integer[] integers = {1,2, 3, 4};
Observable<Integer> observable=Observable.from(integers);
Subscriber<String> subscriber = new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.getMessage());
}
@Override
public void onNext(Integer i) {
Log.e(TAG, "onNext:"+i);
}
};
observable.subscribe(subscriber);複製代碼
輸出信息爲
onNext:1
onNext:2
onNext:3
onNext:4
onCompleted複製代碼
from操做符能夠轉換Future、Iterable和數組。對於Iterable和數組,產生的Observable會發射Iterable或數組的每一項數據。對於Future,它會發射Future.get()方法返回的單個數據,而且還能夠增長經過: from(Future,timeout, timeUnit)指定超時時間,若是執行的時候Future超時會回調onError()方法。
just將單個數據轉換爲發射那個數據的Observable,Just相似於From,可是From會將數組或Iterable的數據取出而後逐個發射,而Just只是簡單的原樣發射,將數組或Iterable當作單個數據,若是你傳遞null給Just,它會返回一個發射null值的Observable。不要誤認爲它會返回一個空Observable(徹底不發射任何數據的Observable)。對於just能夠接收1到10個數據,返回一個按參數列表順序發射這些數據的Observable。
Observable.just(1 2, 3, 4)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: ");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
});複製代碼
輸出
onNext:1
onNext:2
onNext:3
onNext:4
onCompleted:複製代碼
對於just參數類型能夠是多種,以下,傳入兩個類型數據
Observable.just(0, "one", 6, "two", 8, "three")
.subscribe(new Subscriber<Serializable>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Serializable serializable) {
Log.e(TAG, "onNext: "+serializable.toString());
}
});複製代碼
則輸出信息
onNext:0
onNext:one
onNext:6
onNext:two
onNext:8
onNext:three
onCompleted:複製代碼
Empty:建立一個不發射任何數據可是正常終止的Observable,此時會回調onCompleted()
Never:建立一個不發射數據也不終止的Observable
Error:建立一個不發射數據以一個錯誤終止的Observable
error操做符須要一個Throwable參數,你的Observable會以此終止。這些操做符默認不在任何特定的調度器上執行,可是empty和error有一個可選參數是Scheduler,若是你傳遞了Scheduler參數,它們會在你指定的調度器上發送通知。
該操做符建立特定整數序列的Observable,它接受兩個參數,一個是範圍的起始值,一個是範圍的數據的數目。若是你將第二個參數設爲0,將致使Observable不發射任何數據(若是設置爲負數,會拋異常)。
Observable.range(1,4)
.subscribe(new Subscriber<Integer>() {
public String TAG="RXJAVA";
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer);
}
});複製代碼
輸出信息
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onCompleted:複製代碼
你能夠在代碼實戰中,更改第二個參數爲負數,或者0,以及將第一個參數更改成你想測試的任意值,去觀察執行日誌幫助理解。
Timer操做符建立一個在給定的時間段以後返回一個特殊值的Observable。它在延遲一段給定的時間後發射一個簡單的數字0 。
Observable.timer(1, TimeUnit.SECONDS)
.subscribe(new Subscriber<Long>() {
public String TAG="RXJAVA";
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(Long integer) {
Log.e(TAG, "onNext:1111111 "+integer);
}
});複製代碼
對於該操做符默認在computation調度器上執行的,若是你想在onNext()回調方法更新UI,須要經過observeOn(AndroidSchedulers.mainThread())設置,不然會調用onError()方法。固然Time人提供的有一個三個參數的方法timer(long,TimeUnit,Scheduler)能夠指定 Scheduler 。
該操做符按固定的時間間隔發射一個無限遞增的整數序列,它接受一個表示時間間隔的參數和一個表示時間單位的參數,固然該操做符合Timer同樣,是在computation調度器上執行的,若想更新UI須要指定Scheduler 爲AndroidSchedulers.mainThread()。
Subscription subscription = Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
tv.append(" " + aLong + " ");
}
});複製代碼
經過上面代碼就會每隔1秒在tv上追加一個數字,而且會永遠執行。若是在某個時刻不想繼續輸出,就須要要解除訂閱。
if (subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
}複製代碼
該操做符是重複的發射某個數據序列,而且能夠本身設置重複的次數。當接收到onComplete()會觸發重訂閱再次重複發射數據,當重複發射數據次數到達後執行onCompleted()。
String[] strs = {"也許當初忙着微笑和哭泣", "忙着追逐天空中的流星"};
Observable.from(strs).repeat(2)..subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+s );
tv1.append("\n" + s);
}
});複製代碼
輸出
onNext: 也許當初忙着微笑和哭泣
onNext: 忙着追逐天空中的流星
onNext: 也許當初忙着微笑和哭泣
onNext: 忙着追逐天空中的流星
onCompleted:複製代碼
直到有觀察者訂閱時才建立Observable,而且爲每一個觀察者建立一個新的Observable,該操做符能保證訂閱執行時數據源是最新的數據。以下正常代碼
String test="舊數據";
Observable observable=Observable.just(test);
Subscriber subscriber=new Subscriber() {
public String TAG="RXJAVA";
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: "+o );
}
};
test="新數據";
observable.subscribe(subscriber);複製代碼
輸出
onNext: 舊數據
onCompleted:複製代碼
經過上面代碼和輸出日誌發現,雖然在後面講數據test更新爲新數據,可是並無生效,要想使用最新的數據就須要使用defer操做符。此時更改使用defer
test="舊數據";
Observable<String> observable=Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
return Observable.just(test);
}
});
Subscriber subscriber=new Subscriber() {
public String TAG="RXJAVA";
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted: " );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onNext(Object o) {
Log.e(TAG, "onNext: "+o );
}
};
test="新數據";
observable.subscribe(subscriber);複製代碼
輸出信息
onNext: 新數據
onCompleted:複製代碼
經過新的打印信息,發現輸出值已是最新的數據。
到這裏,這篇文章暫時就先結束了,若文章有不足或者錯誤的地方,歡迎指正,以防止給其餘讀者錯誤引導。更多的操做符,將在接下來的幾篇文章介紹。