轉載請註明出處:http://www.wangxinarhat.com/2016/05/01/2016-05-01-rxjava-android-operate2/java
最近比較忙,也沒想好這個文章該怎麼寫下去。可能會比較水,不過作事不能有始無終,因此繼續吧。android
使用場景:製做緩存git
效果圖github
代碼:緩存
緩存管理類網絡
public class DataCache { /** * 讀取磁盤緩存數據 */ public List<ImageInfoBean> readData() { ... } /** * 寫緩存 */ public void writeData(List<ImageInfoBean> list) { ... } /** * 刪除緩存 */ public boolean deleteCache() { ... } }
數據管理類併發
public class Data { private static Data instance; private static final int DATA_SOURCE_MEMORY = 1;//內存 private static final int DATA_SOURCE_DISK = 2;//硬盤 private static final int DATA_SOURCE_NETWORK = 3;//網絡 BehaviorSubject<List<ImageInfoBean>> cache; private int dataSource; private Data() { } public static Data newInstance() { if (instance == null) { instance = new Data(); } return instance; } private void setDataSource(@DataSource int dataSource) { this.dataSource = dataSource; } public String getDataSourceText() { int dataSourceTextRes; switch (dataSource) { case DATA_SOURCE_MEMORY: dataSourceTextRes = R.string.data_source_memory; break; case DATA_SOURCE_DISK: dataSourceTextRes = R.string.data_source_disk; break; case DATA_SOURCE_NETWORK: dataSourceTextRes = R.string.data_source_network; break; default: dataSourceTextRes = R.string.data_source_network; } return BaseApplication.getApplication().getString(dataSourceTextRes); } /** * 請求網絡數據 */ public void loadData() { Network.getGankApi() .getBeauties(80, 1) .map(BeautyResult2Beautise.newInstance()) .doOnNext(new Action1<List<ImageInfoBean>>() { @Override public void call(List<ImageInfoBean> list) { DataCache.newInstance().writeData(list); } }) .subscribe(new Action1<List<ImageInfoBean>>() { @Override public void call(List<ImageInfoBean> list) { cache.onNext(list); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } }); } /** * 獲取數據 * @param observer * @return */ public Subscription subscribeData(@Nullable Observer<List<ImageInfoBean>> observer) { if (null == cache) { cache = BehaviorSubject.create(); Observable.create(new Observable.OnSubscribe<List<ImageInfoBean>>() { @Override public void call(Subscriber<? super List<ImageInfoBean>> subscriber) { //從緩存獲取數據 List<ImageInfoBean> list = DataCache.newInstance().readData(); if (null == list) { setDataSource(DATA_SOURCE_NETWORK); //請求網絡數據 loadData(); } else { setDataSource(DATA_SOURCE_DISK); subscriber.onNext(list); } } }) .subscribeOn(Schedulers.io()).subscribe(cache); } else { //內存中獲取的數據 setDataSource(DATA_SOURCE_MEMORY); } return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer); } /** * 清空內存 */ public void clearMemoryCache() { cache = null; } /** * 清空內存和硬盤數據 */ public void clearMemoryAndDiskCache() { clearMemoryCache(); DataCache.newInstance().deleteCache(); } }
獲取數據dom
@OnClick(R.id.load) public void onClick() { startTime = System.currentTimeMillis(); swipeRefreshLayout.setRefreshing(true); unsubscribe(); subscription = Data.newInstance() .subscribeData(getObserver()); }
在觀察者中進行獲取數據結果的處理ide
private Observer<List<ImageInfoBean>> getObserver() { if (null == observer) { observer = new Observer<List<ImageInfoBean>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } @Override public void onNext(List<ImageInfoBean> list) { swipeRefreshLayout.setRefreshing(false); int loadingTime = (int) (System.currentTimeMillis() - startTime); dataSituation.setText(getString(R.string.loading_time_and_source, loadingTime, Data.newInstance().getDataSourceText())); adapter.setImages(list); } }; } return observer; }
詳解 函數
Subject能夠當作是一個橋樑或者代理,在RxJava中同時充當了Observer和Observable的角色。由於它是一個Observer,它能夠訂閱一個或多個Observable;又由於它是一個Observable,它能夠轉發它收到(Observe)的數據,也能夠發射新的數據。
使用場景:有的 token 並不是一次性的,而是能夠屢次使用,直到它超時或被銷燬(多數 token 都是這樣的)。
這樣的 token 處理起來比較麻煩:須要把它保存起來,而且在發現它失效的時候要可以自動從新獲取新的 token >並繼續訪問以前因爲 token 失效而失敗的請求。
若是項目中有多處的接口請求都須要這樣的自動修復機制,使用傳統的 Callback 形式須要寫出很是複雜的代碼。
而使用 RxJava ,能夠用 retryWhen() 來輕鬆地處理這樣的問題。
效果圖
Token API準備
因爲找不到足夠簡單的用於示例的 token API,如下API是代碼僞造的
/** * Created by wangxinarhat on 16-4-5. * TokenApi */ public class TokenApi { /** * 獲取Observable * @param auth * @return */ public static Observable<Token> getToken(@NonNull String auth) { return Observable.just(auth).map(new Func1<String, Token>() { @Override public Token call(String s) { try { Thread.sleep(new Random().nextInt(600) + 600); } catch (InterruptedException e) { e.printStackTrace(); } Token token = new Token(); token.token = createToken(); return token; } }); } /** * 隨機生成token * @return */ private static String createToken() { return "token_wangxinarhat_" + System.currentTimeMillis() % 1000; } /** * 根據Token獲取用戶數據 * @param token * @return */ public static Observable<DataInfo> getData(@NonNull Token token) { return Observable.just(token).map(new Func1<Token, DataInfo>() { @Override public DataInfo call(Token token) { try { Thread.sleep(new Random().nextInt(600) + 600); } catch (InterruptedException e) { e.printStackTrace(); } if (token.isInvalid) { throw new IllegalArgumentException("Token is invalid"); } DataInfo dataInfo = new DataInfo(); dataInfo.id = (int) (System.currentTimeMillis() % 1000); dataInfo.name = "USER_" + dataInfo.id; return dataInfo; } }); } }
Token
/** *Token類 */ public class Token { public String token; public boolean isInvalid;//token是否失效 public Token(boolean isInvalid) { this.isInvalid = isInvalid; } public Token() { } }
用戶數據
/** * Created by wangxinarhat on 16-4-5. * 用戶數據 */ public class DataInfo { public int id; public String name; }
操做符的使用
根據token請求數據
@OnClick(R.id.requestBt) void request() { tokenUpdated = false; swipeRefreshLayout.setRefreshing(true); unsubscribe(); final TokenApi tokenApi = new TokenApi(); subscription = Observable.just(null).flatMap(new Func1<Object, rx.Observable<DataInfo>>() { @Override public Observable<DataInfo> call(Object o) { return null == cachedFakeToken.token ? Observable.<DataInfo>error(new NullPointerException("token id null")) : tokenApi.getData(cachedFakeToken); } }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() { @Override public Observable<?> call(Observable<? extends Throwable> observable) { return observable.flatMap(new Func1<Throwable, Observable<?>>() { @Override public Observable<?> call(Throwable throwable) { if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) { return tokenApi.getToken("flat_map") .doOnNext(new Action1<Token>() { @Override public void call(Token token) { tokenUpdated = true; cachedFakeToken.token = token.token; cachedFakeToken.isInvalid = token.isInvalid; } }); } return Observable.just(throwable); } }); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<DataInfo>() { @Override public void call(DataInfo dataInfo) { swipeRefreshLayout.setRefreshing(false); String token = cachedFakeToken.token; if (tokenUpdated) { token += "(" + getString(R.string.updated) + ")"; } tokenTv.setText(String.format(getString(R.string.got_token_and_data), token, dataInfo.id, dataInfo.name)); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { swipeRefreshLayout.setRefreshing(false); Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show(); } }); }
銷燬token
@OnClick(R.id.invalidateTokenBt) void incalidate() { cachedFakeToken.isInvalid = true; Toast.makeText(getActivity(), R.string.token_expired, Toast.LENGTH_SHORT).show(); }
詳解
若是原始Observable遇到錯誤,從新訂閱它指望它能正常終止。
retryWhen操做符不會將原始Observable的onError通知傳遞給觀察者,它會訂閱這個Observable,再給它一次機會無錯誤地完成它的數據序列。
retryWhen老是傳遞onNext通知給觀察者,因爲從新訂閱,可能會形成數據項重複。
不管收到多少次onError通知,無參數版本的retryWhen都會繼續訂閱併發射原始Observable。
接受單個count參數的retryWhen會最多從新訂閱指定的次數,若是次數超了,它不會嘗試再次訂閱,它會把最新的一個onError通知傳遞給它的觀察者。
還有一個版本的retryWhen接受一個謂詞函數做爲參數,這個函數的兩個參數是:重試次數和致使發射onError通知的Throwable。這個函數返回一個布爾值,若是返回true,retryWhen應該再次訂閱和鏡像原始的Observable,若是返回false,retryWhen會將最新的一個onError通知傳遞給它的觀察者。
retryWhen操做符默認在trampoline調度器上執行。
使用場景
實時搜索,若是在EditText中監聽到字符改變就發起請求數據,明顯不合適。
有了Debounce操做符,僅在過了指定的一段時間還沒發射數據時才發射一個數據,Debounce操做符會過濾掉髮射速率過快的數據項,優化網絡請求
效果圖
代碼
配合jakewharton大神的rxbinding使用,獲取可觀察對象
@Override public void onActivityCreated(@Nullable Bundle savedInstanceState) { super.onActivityCreated(savedInstanceState); setLogger(); //使用rxbing給EditText註冊字符改變事件 subscription = RxTextView.textChangeEvents(input) .debounce(500, TimeUnit.MILLISECONDS)//設置發射時間間隔 .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
在觀察者中進行結果處理
/** * 獲取觀察者 * @return */ private Observer<? super TextViewTextChangeEvent> getObserver() { return new Observer<TextViewTextChangeEvent>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(TextViewTextChangeEvent textViewTextChangeEvent) { //獲得搜索關鍵字,進行網絡請求 log(String.format("搜索關鍵字 : %s", textViewTextChangeEvent.text().toString())); } }; }
更新adapter數據集
/** * 更新adapter數據集 * * @param logMsg */ private void log(String logMsg) { if (isCurrentlyOnMainThread()) { mLogs.add(0, logMsg + " (main thread) "); mAdapter.notifyDataSetChanged(); } else { mLogs.add(0, logMsg + " (NOT main thread) "); new Handler(Looper.getMainLooper()).post(new Runnable() { @Override public void run() { mAdapter.notifyDataSetChanged(); } }); } }
詳解
Debounce僅在過了一段指定的時間還沒發射數據時才發射一個數據,會根據設置的時間間隔過濾掉髮射速率過快的數據項。
按期收集Observable的數據放進一個數據包裹,而後發射這些數據包裹,而不是一次發射一個值。
這個操做符,我暫時尚未比較好的使用場景,不過既然是能夠按期收集數據,那麼應該能夠作指定時間內點擊次數等之類的統計。
效果圖
代碼
仍是使用jakewharton大神的rxbinding,註冊點擊事件獲取可觀察對象
@Override public void onActivityCreated(@Nullable Bundle savedInstanceState) { super.onActivityCreated(savedInstanceState); setLogger(); subscription = RxView.clicks(btn) .map(new Func1<Void, Integer>() { @Override public Integer call(Void aVoid) { log("點擊一次"); return 1; } }) .buffer(3, TimeUnit.SECONDS)//設置收集數據時間間隔爲3s .observeOn(AndroidSchedulers.mainThread()) .subscribe(getObserver()); }
詳解
Buffer操做符將一個Observable變換爲另外一個,原來的Observable正常發射數據,變換產生的Observable發射這些數據的緩存集合。
還有就是:若是原來的Observable發射了一個onError通知,Buffer會當即傳遞這個通知,而不是首先發射緩存的數據,即便在這以前緩存中包含了原始Observable發射的數據。
由於我也才嘗試使用rx,這篇終於擠出來了,好難。。代碼在這裏。
若是又學到新的使用場景,仍是會再寫。
我是從國內rx大神扔物線,還有github上star數最多的那位哥們兒(kaushikgopal)學習的。由於他們都沒有很詳細的說明操做符的使用,因此纔想寫這個文章。
如想深刻學習,請看大神代碼。