來源:Carson_Ho-簡書java
角色 | 做用 | 類比 |
---|---|---|
被觀察者(Observable) | 產生事件 | 顧客 |
觀察者(Observer) | 接收事件,並給出響應動做 | 廚房 |
訂閱(Subscribe) | 鏈接 被觀察者 & 觀察者 | 服務員 |
事件(Event) | 被觀察者 & 觀察者 溝通的載體 | 菜式 |
implementation 'io.reactivex.rxjava2:rxjava:2.2.19' implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
//建立被觀察者,產生事件 public Observable<Integer> createObservable() { Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }); return observable; }
//建立觀察者 public Observer<Integer> createObserver() { Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "onSubscribe 鏈接"); } @Override public void onNext(Integer value) { Log.v("lanjiabinRx", "onNext " + value + " 事件"); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "onError 事件"); } @Override public void onComplete() { Log.v("lanjiabinRx", "onComplete 事件"); } }; return observer; }
//觀察者訂閱被觀察者 public void createSubscribe() { createObservable().subscribe(createObserver()); }
createSubscribe();
//鏈式調用 public void chainCall() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "onSubscribe 鏈接"); } @Override public void onNext(Integer value) { Log.v("lanjiabinRx", "onNext " + value + " 事件"); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "onError 事件"); } @Override public void onComplete() { Log.v("lanjiabinRx", "onComplete 事件"); } }); }
即觀察者 沒法繼續 接收 被觀察者的事件,但被觀察者仍是能夠繼續發送事件react
Disposable mDisposable; //1.定義 @Override public void onSubscribe(Disposable d) { mDisposable=d; //2.賦值 Log.v("lanjiabinRx", "onSubscribe 鏈接"); } @Override public void onNext(Integer value) { if (value==2) mDisposable.dispose(); //3.在第二個next事件斷開鏈接 Log.v("lanjiabinRx", "onNext " + value + " 事件"); }
最基礎的建立android
//1.create public void chainCall() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext(1); e.onNext(2); e.onNext(3); e.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "onSubscribe 鏈接"); } @Override public void onNext(Integer value) { Log.v("lanjiabinRx", "onNext " + value + " 事件"); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "onError 事件"); } @Override public void onComplete() { Log.v("lanjiabinRx", "onComplete 事件"); } }); }
//2.just public void justDo() { Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Integer integer) { Log.d("lanjiabinRx", "接受的事件 onNext =" + integer); } @Override public void onError(Throwable e) { Log.d("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.d("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:
編程
應用場景:
1.快速建立 被觀察者對象(Observable) & 發送10個以上事件(數組形式)
2.數組元素遍歷數組
//3.fromArray public void fromArrayDo() { Integer[] items = {0, 1, 2, 3, 4}; Observable.fromArray(items).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Integer integer) { Log.v("lanjiabinRx", "接受的事件 onNext =" + integer); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:ide
應用場景:
1.快速建立 被觀察者對象(Observable) & 發送10個以上事件(集合形式)
2.集合元素遍歷測試
//4.fromIterable public void fromIterableDo(){ List<Integer> list = new ArrayList<>(); list.add(1); list.add(2); list.add(3); Observable.fromIterable(list).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Integer integer) { Log.v("lanjiabinRx", "接受的事件 onNext =" + integer); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:線程
// 下列方法通常用於測試使用 <-- empty() --> // 該方法建立的被觀察者對象發送事件的特色:僅發送Complete事件,直接通知完成 Observable observable1=Observable.empty(); // 即觀察者接收後會直接調用onCompleted() <-- error() --> // 該方法建立的被觀察者對象發送事件的特色:僅發送Error事件,直接通知異常 // 可自定義異常 Observable observable2=Observable.error(new RuntimeException()) // 即觀察者接收後會直接調用onError() <-- never() --> // 該方法建立的被觀察者對象發送事件的特色:不發送任何事件 Observable observable3=Observable.never(); // 即觀察者接收後什麼都不調用
應用場景:
動態建立被觀察者對象(Observable) & 獲取最新的Observable對象數據3d
//5.defer Integer i = 10; //第一次賦值 public void deferDo() { Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() { @Override public ObservableSource<? extends Integer> call() throws Exception { return Observable.just(i); } }); i = 15; //第二次賦值 observable.subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Integer integer) { Log.v("lanjiabinRx", "接受的事件 onNext =" + integer); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:獲得最新賦值的數字,說明取到了最新的數據code
應用場景:
延遲指定事件,發送一個0,通常用於檢測
//6.timer public void timerDo() { // 注:timer操做符默認運行在一個新線程上 // 也可自定義線程調度器(第3個參數):timer(long,TimeUnit,Scheduler) //TimeUnit.SECONDS延遲2s後,發送一個0 /** * timer(long delay, TimeUnit unit) * delay 數值 * unit 單位 * 下面就是 2數值,單位爲秒,因此是2秒 * */ Observable.timer(2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Long aLong) { /* * 獲得的結果爲0 通常用於檢測 * */ Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:
//7.interval public void intervalDo() { /** * 從0開始遞增 * * @param initialDelay (Long) * 初始延遲時間(第一次延遲時間) * @param period (Long) * 後續數字發射之間的時間間隔(一個週期時間) * @param unit * 時間單位 * */ Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Long aLong) { Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:
//8.intervalRange public void intervalRangeDo() { /** * * @param start 起始值 * @param count 總共要發送的值的數量,若是爲零,則運算符將在初始延遲後發出onComplete * @param initialDelay 發出第一個值(開始)以前的初始延遲 * @param period 後續值之間的時間段 * @param unit 時間單位 * */ Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Long aLong) { Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:3-12 經歷10個數
//9.range public void rangeDo(){ /** * @param start * 序列中第一個Integer的值 * @param count * 要生成的順序整數的數量 * */ Observable.range(3,5).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Integer integer) { Log.v("lanjiabinRx", "接受的事件 onNext =" + integer); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:
//10.rangeLong public void rangeLongDo(){ /** * @param start * Long類型,序列中第一個Integer的值 * @param count * Long類型,要生成的順序整數的數量 * */ Observable.rangeLong(3,8).subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.v("lanjiabinRx", "開始採用subscribe鏈接"); } @Override public void onNext(Long aLong) { Log.v("lanjiabinRx", "接受的事件 onNext =" + aLong); } @Override public void onError(Throwable e) { Log.v("lanjiabinRx", "接受的事件 onError =" + e.getMessage()); } @Override public void onComplete() { Log.v("lanjiabinRx", "接受的事件 onComplete"); } }); }
結果:
編程中咱們會遇到多少挫折?表放棄,沙漠盡頭必是綠洲。