Rxjava
因爲其基於事件流的鏈式調用、邏輯簡潔 & 使用簡單的特色,深受各大 Android
開發者的歡迎。若是還不瞭解RxJava,請看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程java
RxJava
的基本使用,但願你們會喜歡。
- 本系列文章主要基於
Rxjava 2.0
- 接下來的時間,我將持續推出
Android
中Rxjava 2.0
的一系列文章,包括原理、操做符、應用場景、背壓等等 ,有興趣能夠繼續關注Carson_Ho的安卓開發筆記!!
此處簡單介紹RxJava
react
若還不瞭解RxJava,請看文章:Android:這是一篇 清晰 & 易懂的Rxjava 入門教程android
RxJava
的基本使用,更深刻的RxJava
使用請繼續關注Carson_Ho的RxJava系列Rxjava
的使用方式有兩種:
Rxjava
的原理 & 使用,主要用於演示說明Observable
)& 生產事件// 1. 建立被觀察者 Observable 對象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// create() 是 RxJava 最基本的創造事件序列的方法
// 此處傳入了一個 OnSubscribe 對象參數
// 當 Observable 被訂閱時,OnSubscribe 的 call() 方法會自動被調用,即事件序列就會依照設定依次被觸發
// 即觀察者會依次調用對應事件的複寫方法從而響應事件
// 從而實現被觀察者調用了觀察者的回調方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
// 2. 在複寫的subscribe()裏定義須要發送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 經過 ObservableEmitter類對象產生事件並通知觀察者
// ObservableEmitter類介紹
// a. 定義:事件發射器
// b. 做用:定義須要發送的事件 & 向觀察者發送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
<--擴展:RxJava 提供了其餘方法用於 建立被觀察者對象Observable -->
// 方法1:just(T...):直接將傳入的參數依次發送出來
Observable observable = Observable.just("A", "B", "C");
// 將會依次調用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
// 方法2:from(T[]) / from(Iterable<? extends T>) : 將傳入的數組 / Iterable 拆分紅具體對象後,依次發送出來
String[] words = {"A", "B", "C"};
Observable observable = Observable.from(words);
// 將會依次調用:
// onNext("A");
// onNext("B");
// onNext("C");
// onCompleted();
複製代碼
Observer
)並 定義響應事件的行爲Next
事件、Complete
事件 & Error
事件。具體以下:<--方式1:採用Observer 接口 -->
// 1. 建立觀察者 (Observer )對象
Observer<Integer> observer = new Observer<Integer>() {
// 2. 建立對象時經過對應複寫對應事件方法 從而 響應對應事件
// 觀察者接收事件前,默認最早調用複寫 onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe鏈接");
}
// 當被觀察者生產Next事件 & 觀察者接收到時,會調用該複寫方法 進行響應
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件做出響應" + value);
}
// 當被觀察者生產Error事件& 觀察者接收到時,會調用該複寫方法 進行響應
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件做出響應");
}
// 當被觀察者生產Complete事件& 觀察者接收到時,會調用該複寫方法 進行響應
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件做出響應");
}
};
<--方式2:採用Subscriber 抽象類 -->
// 說明:Subscriber類 = RxJava 內置的一個實現了 Observer 的抽象類,對 Observer 接口進行了擴展
// 1. 建立觀察者 (Observer )對象
Subscriber<String> subscriber = new Subscriber<Integer>() {
// 2. 建立對象時經過對應複寫對應事件方法 從而 響應對應事件
// 觀察者接收事件前,默認最早調用複寫 onSubscribe()
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "開始採用subscribe鏈接");
}
// 當被觀察者生產Next事件 & 觀察者接收到時,會調用該複寫方法 進行響應
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件做出響應" + value);
}
// 當被觀察者生產Error事件& 觀察者接收到時,會調用該複寫方法 進行響應
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件做出響應");
}
// 當被觀察者生產Complete事件& 觀察者接收到時,會調用該複寫方法 進行響應
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件做出響應");
}
};
<--特別注意:2種方法的區別,即Subscriber 抽象類與Observer 接口的區別 -->
// 相同點:兩者基本使用方式徹底一致(實質上,在RxJava的 subscribe 過程當中,Observer老是會先被轉換成Subscriber再使用)
// 不一樣點:Subscriber抽象類對 Observer 接口進行了擴展,新增了兩個方法:
// 1. onStart():在還未響應事件前調用,用於作一些初始化工做
// 2. unsubscribe():用於取消訂閱。在該方法被調用後,觀察者將再也不接收 & 響應事件
// 調用該方法前,先使用 isUnsubscribed() 判斷狀態,肯定被觀察者Observable是否還持有觀察者Subscriber的引用,若是引用不能及時釋放,就會出現內存泄露
複製代碼
Subscribe
)鏈接觀察者和被觀察者observable.subscribe(observer);
// 或者 observable.subscribe(subscriber);
複製代碼
<-- Observable.subscribe(Subscriber) 的內部實現 -->
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
// 步驟1中 觀察者 subscriber抽象類複寫的方法,用於初始化工做
onSubscribe.call(subscriber);
// 經過該調用,從而回調觀察者中的對應方法從而響應被觀察者生產的事件
// 從而實現被觀察者調用了觀察者的回調方法 & 由被觀察者向觀察者的事件傳遞,即觀察者模式
// 同時也看出:Observable只是生產事件,真正的發送事件是在它被訂閱的時候,即當 subscribe() 方法執行時
}
複製代碼
Rxjava
的原理 & 使用RxJava
基於事件流的鏈式調用// RxJava的鏈式操做
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 建立被觀察者 & 生產事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 經過經過訂閱(subscribe)鏈接觀察者和被觀察者
// 3. 建立觀察者 & 定義響應事件的行爲
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe鏈接");
}
// 默認最早調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"做出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件做出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件做出響應");
}
});
}
}
注:總體方法調用順序:觀察者.onSubscribe()> 被觀察者.subscribe()> 觀察者.onNext()>觀察者.onComplete()
複製代碼
這種 基於事件流的鏈式調用,使得RxJava
:git
更重要的是,隨着程序邏輯的複雜性提升,它依然可以保持簡潔 & 優雅。因此,通常建議使用這種基於事件流的鏈式調用方式實現RxJava
。github
RxJava 2.x
提供了多個函數式接口 ,用於實現簡便式的觀察者模式。具體以下: 數組
以 Consumer
爲例:實現簡便式的觀察者模式bash
Observable.just("hello").subscribe(new Consumer<String>() {
// 每次接收到Observable的事件都會調用Consumer.accept()
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
複製代碼
我將用一個實際工程實例來演示 Rxjava
的使用微信
步驟1:加入依賴ide
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
複製代碼
步驟2:直接在MainActivity.java
中實現下述步驟函數
(Observable )
& 生產事件(Observer )
並 定義響應事件的行爲(Subscribe)
鏈接觀察者和被觀察者public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 步驟1:建立被觀察者 Observable & 生產事件
// 即 顧客入飯店 - 坐下餐桌 - 點菜
// 1. 建立被觀察者 Observable 對象
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 2. 在複寫的subscribe()裏定義須要發送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 經過 ObservableEmitter類對象產生事件並通知觀察者
// ObservableEmitter類介紹
// a. 定義:事件發射器
// b. 做用:定義須要發送的事件 & 向觀察者發送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 步驟2:建立觀察者 Observer 並 定義響應事件行爲
// 即 開廚房 - 肯定對應菜式
Observer<Integer> observer = new Observer<Integer>() {
// 經過複寫對應方法來 響應 被觀察者
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe鏈接");
}
// 默認最早調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"做出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件做出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件做出響應");
}
};
// 步驟3:經過訂閱(subscribe)鏈接觀察者和被觀察者
// 即 顧客找到服務員 - 點菜 - 服務員下單到廚房 - 廚房烹調
observable.subscribe(observer);
複製代碼
public class MainActivity extends AppCompatActivity {
private static final String TAG = "Rxjava";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// RxJava的流式操做
Observable.create(new ObservableOnSubscribe<Integer>() {
// 1. 建立被觀察者 & 生產事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
// 2. 經過經過訂閱(subscribe)鏈接觀察者和被觀察者
// 3. 建立觀察者 & 定義響應事件的行爲
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe鏈接");
}
// 默認最早調用複寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"做出響應" );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件做出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件做出響應");
}
});
}
}
複製代碼
測試效果 實現效果同上
Demo 下載地址 Carson_Ho的Github地址 = RxJava2系列:基礎使用
喜歡的麻煩點個
star
!
public final Disposable subscribe() {}
// 表示觀察者不對被觀察者發送的事件做出任何響應(但被觀察者仍是能夠繼續發送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示觀察者只對被觀察者發送的Next事件做出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示觀察者只對被觀察者發送的Next事件 & Error事件做出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示觀察者只對被觀察者發送的Next事件、Error事件 & Complete事件做出響應
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示觀察者只對被觀察者發送的Next事件、Error事件 、Complete事件 & onSubscribe事件做出響應
public final void subscribe(Observer<? super T> observer) {}
// 表示觀察者對被觀察者發送的任何事件都做出響應
複製代碼
// 主要在觀察者 Observer中 實現
Observer<Integer> observer = new Observer<Integer>() {
// 1. 定義Disposable類變量
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始採用subscribe鏈接");
// 2. 對Disposable類變量賦值
mDisposable = d;
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "對Next事件"+ value +"做出響應" );
if (value == 2) {
// 設置在接收到第二個事件後切斷觀察者和被觀察者的鏈接
mDisposable.dispose();
Log.d(TAG, "已經切斷了鏈接:" + mDisposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件做出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件做出響應");
}
};
複製代碼
至此,關於RxJava
的基礎使用講解完畢。
Rxjava
的基礎使用,更多實例應用請看文章:Android RxJava實際應用教學:你該何時使用RxJava?Android
中 Rxjava 2.0
的一系列文章,包括原理、操做符、應用場景、背壓等等 ,有興趣能夠繼續關注Carson_Ho的安卓開發筆記!!