eg:短期內連續點擊按鈕
public static void throttleFirst() { Button button = null; RxView.clicks(button) .throttleFirst(1, TimeUnit.SECONDS) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { //跳轉 } }); }
RxTextView.textChanges源碼分析
採用skip(1)緣由:跳過 第1次請求 = 初始輸入框的空字符狀態
public static void search() { EditText editText = null; RxTextView.textChanges(editText) .debounce(1, TimeUnit.SECONDS) .skip(1) .subscribe(new Consumer<CharSequence>() { @Override public void accept(CharSequence charSequence) throws Exception { //開始搜索 } }); }
<?xml version="1.0" encoding="utf-8"?> <LinearLayout xmlns:android="http://schemas.android.com/apk/res/android" android:orientation="vertical" android:layout_width="match_parent" android:layout_height="match_parent"> <EditText android:id="@+id/name" android:layout_width="wrap_content" android:layout_height="wrap_content" android:hint="請填寫姓名" /> <EditText android:id="@+id/age" android:layout_width="wrap_content" android:layout_height="wrap_content" android:hint="請填寫年齡" /> <EditText android:id="@+id/job" android:layout_width="wrap_content" android:layout_height="wrap_content" android:hint="請填寫職業" /> <Button android:id="@+id/list" android:layout_width="wrap_content" android:layout_height="wrap_content" android:text="提交" android:enabled="false" /> </LinearLayout>
Observable<CharSequence> nameObservable = RxTextView.textChanges(name).skip(1); Observable<CharSequence> ageObservable = RxTextView.textChanges(age).skip(1); Observable<CharSequence> jobObservable = RxTextView.textChanges(job).skip(1); /* * 經過combineLatest()合併事件 & 聯合判斷 **/ Observable.combineLatest(nameObservable,ageObservable,jobObservable,new Function3<CharSequence, CharSequence, CharSequence,Boolean>() { @Override public Boolean apply(@NonNull CharSequence name, @NonNull CharSequence age, @NonNull CharSequence job) throws Exception { //1. 姓名 boolean isUserNameValid = !TextUtils.isEmpty(name) // 2. 年齡信息 boolean isUserAgeValid = !TextUtils.isEmpty(age); // 3. 職業信息 boolean isUserJobValid = !TextUtils.isEmpty(job) ; return isUserNameValid && isUserAgeValid && isUserJobValid; } }).subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean s) throws Exception { Log.e(TAG, "提交按鈕是否可點擊: " + s); list.setEnabled(s); } });
/ 用於存放最終展現的數據 String result = "數據源來自 = " ; /* * 設置第1個Observable:經過網絡獲取數據 * 此處僅做網絡請求的模擬 **/ Observable<String> network = Observable.just("網絡"); /* * 設置第2個Observable:經過本地文件獲取數據 * 此處僅做本地文件請求的模擬 **/ Observable<String> file = Observable.just("本地文件"); /* * 經過merge()合併事件 & 同時發送事件 **/ Observable.merge(network, file) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String value) { Log.d(TAG, "數據源有: "+ value ); result += value + "+"; } @Override public void onError(Throwable e) { Log.d(TAG, "對Error事件做出響應"); } // 接收合併事件後,統一展現 @Override public void onComplete() { Log.d(TAG, "獲取數據完成"); Log.d(TAG, result ); } });
// 該2變量用於模擬內存緩存 & 磁盤緩存中的數據 String memoryCache = null; String diskCache = "從磁盤緩存中獲取數據"; /* * 設置第1個Observable:檢查內存緩存是否有該數據的緩存 **/ Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // 先判斷內存緩存有無數據 if (memoryCache != null) { // 如有該數據,則發送 emitter.onNext(memoryCache); } else { // 若無該數據,則直接發送結束事件 emitter.onComplete(); } } }); /* * 設置第2個Observable:檢查磁盤緩存是否有該數據的緩存 **/ Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // 先判斷磁盤緩存有無數據 if (diskCache != null) { // 如有該數據,則發送 emitter.onNext(diskCache); } else { // 若無該數據,則直接發送結束事件 emitter.onComplete(); } } }); /* * 設置第3個Observable:經過網絡獲取數據 **/ Observable<String> network = Observable.just("從網絡中獲取數據"); // 此處僅做網絡請求的模擬 /* * 經過concat() 和 firstElement()操做符實現緩存功能 **/ // 1. 經過concat()合併memory、disk、network 3個被觀察者的事件(即檢查內存緩存、磁盤緩存 & 發送網絡請求) // 並將它們按順序串聯成隊列 Observable.concat(memory, disk, network) // 2. 經過firstElement(),從串聯隊列中取出併發送第1個有效事件(Next事件),即依次判斷檢查memory、disk、network .firstElement() // 即本例的邏輯爲: // a. firstElement()取出第1個事件 = memory,即先判斷內存緩存中有無數據緩存;因爲memoryCache = null,即內存緩存中無數據,因此發送結束事件(視爲無效事件) // b. firstElement()繼續取出第2個事件 = disk,即判斷磁盤緩存中有無數據緩存:因爲diskCache ≠ null,即磁盤緩存中有數據,因此發送Next事件(有效事件) // c. 即firstElement()已發出第1個有效事件(disk事件),因此中止判斷。 // 3. 觀察者訂閱 .subscribe(new Consumer<String>() { @Override public void accept( String s) throws Exception { Log.d(TAG,"最終獲取的數據來源 = "+ s); } });
Disposable mDisposable; //開啓輪詢 public void autoLoop() { if (mDisposable == null || mDisposable.isDisposed()) { Observable.interval(0, 5, TimeUnit.SECONDS) .subscribeOn(Schedulers.computation()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { } }); } } //關閉輪詢 public void stopLoop() { if (mDisposable != null && !mDisposable.isDisposed()) { mDisposable.dispose(); mDisposable = null; } }
//一段時間以後再作一些事情
public void timer() { Observable.timer(5, TimeUnit.SECONDS) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { } }); }
public static void countDown(final int count) { Observable.intervalRange(0, count, 0, 1, TimeUnit.SECONDS) .map(new Function<Long, Long>() { @Override public Long apply(Long aLong) throws Exception { return count - aLong; } }) .subscribe(new Observer<Long>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe"); } @Override public void onNext(Long value) { Log.d(TAG, "onNext: value = " + value); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete"); } }); }