No1:php
RxJava使用html
dependencies{
compile 'io.reactivex:rxjava:1.2.0'
compile 'io.reactivex:rxandroid:1.2.1'
}
1)建立Observer(觀察者)java
Subscriber subscriber = new Subscriber<String>(){ @Override public void onCompleted(){ Log.d(TAG,"onCompleted"); } @Override public void onError(Throwable e){ Log.d(TAG,"onError"); } @Override public void onNext(String s){ Log.d(TAG,"onNext"+s); } @Override public void onStart(){ Log.d(TAG,"onStart"); } }
或者react
Observer<String> observer = new Observer<String>(){ @Override public void onCompleted(){ Log.d(TAG,"onCompleted"); } @Override public void onError(Throwable e){ Log.d(TAG,"onError"); } @Override public void onNext(String s){ Log.d(TAG,"onNext"+s); } }
2)建立Observable(被觀察者)android
Observable observable = Observable.create(new Observable.OnSubscribe<String>){ @Override public void call(Subscriber<? super String> subscriber){ subscriber.onNext("楊影楓"); subscriber.onNext("月媚兒"); subscriber.onCompleted(); } } //或者 Observable observable = Observable.just("楊影楓","月媚兒"); //或者 String[] words = {"楊影楓","月媚兒"}; Observable observable = Observable.from(words);
3)Subscribe(訂閱)網絡
observable.subscribe(subscriber);
No2:併發
RxJava的Subjectide
能夠理解爲Subject=Observal+Observer函數
1)PublishSubject:PublishSubject只會把在訂閱發生的時間點以後來自原始Observable的數據發射給觀察者post
2)BehaviorSubject:當Observer訂閱BehaviorSubject時,它開始發射原始Observable最近發射的數據
3)ReplaySubject:無論Observer什麼時候訂閱ReplaySubject,ReplaySubject均會發射全部來自原始Observable的數據給Observer
4)AsyncSubject:當Observable完成時,AsyncSubject只會發射來自原始Observable的最後一個數據
No3:
建立操做符:
1)interval:建立一個按固定時間間隔發射整數序列的Observable,至關於定時器
2)range:建立發射指定範圍的整數序列的Observable,能夠拿來替代for循環,發射一個範圍內的有序整數序列
3)repeat:建立一個N次重複發射特定數據的Observable
變換操做符:
1)map:經過指定一個Func對象,將Observable轉換爲一個新的Observable對象併發射,觀察者將收到新的Observable處理
2)flatmap:將Observable發射的數據集合變換爲Observable集合,而後將這些Observable發射的數據平坦化地放進一個單獨的Observable
3)cast:cast操做符的做用是強制將Observable發射的全部數據轉換爲指定類型
4)concatMap:和flatMap使用方法相似
5)flatMapIterable:能夠將數據包裝成Iterable,在Iterable中咱們就能夠對數據進行處理了
6)buffer:將源Observable變換爲一個新的Observable,這個新的Observable每次發射一組列表值而不是一個一個發射
7)groupBy:用於分組元素,將源Observable變換成一個發射Observable的新Observable(分組後的)
過濾操做符:
1)filter:對源Observable產生的結果自定義規則進行過濾,只有知足條件的結果纔會提交給訂閱者
2)elementAt:用來返回指定位置的數據
3)distinct:用來去重,其只容許尚未發射過的數據項經過
4)skip:將源Observable發射的數據過濾掉前n項
5)take:只取前n項
6)ignoreElements:忽略全部源Observable產生的結果,只把Observable的onCompleted和onError事件通知給訂閱者
7)throttleFirst:會按期發射這個時間段裏源Observable發射的第一個數據,throttleFirst操做符默認在computation調度器上執行
8)throttleWithTimeOut:經過時間來限流。源Observable每次發射出來的一個數據後就會進行計時。若是在設定好的時間結束前源Observable有新的數據發射出來,這個數據就會被丟棄,同時throttleWithTimeOut從新開始計時。
組合操做符:
1)startWith:會在源Observable發射的數據前面插上一些數據
2)merge:將多個Observable合併到一個Observable中進行發射,merge可能會讓合併的Observable發射的數據交錯
3)concat:將多個Observable發射的數據進行合併發射。concat嚴格按照順序發射數據,前一個Observable沒發射完成時不會發射後一個Observable的數據的
4)zip:合併兩個或者多個Observable發射出的數據項,根據指定的函數變換它們,併發射一個新值
5)combineLastest:當兩個Observable中的任何一個發射了數據時,使用一個函數結合每一個Observable發射的最近數據項,而且基於這個函數的結果發射數據
輔助操做符:
1)delay:讓原始Observable在發射每項數據以前都暫停一段指定的時間段
2)Do系列操做符:包括doOnEach、doOnNext、doOnSubscribe、doOnCompleted、doOnError、doOnTerminate、finallyDo
3)subscribeOn:用於指定Observable自身在哪一個線程上運行
4)observeOn:用來指定Observer所運行的線程,也就是發射出的數據在哪一個線程上使用。通常會指定在主線程中運行,這樣就能夠修改UI
5)timeout:若是原始Observable過了指定的一段時長沒有發射任何數據,timeout操做符會以一個onError通知終止這個Observable,或者繼續執行一個備用的Observable。
錯誤處理操做符:
1)catch:攔截原始Observable的onError通知,將它替換爲其餘數據項或者數據序列,讓產生的Observable可以正常終止或者根本不終止。包括onErrorReturn、onErrorResumeNext、onExceptionResumeNext
2)retry:不會將原始Observable的onError通知傳遞給觀察者,它會訂閱這個Observable,再給它一次機會無錯誤地完成其數據序列。
布爾操做符:
1)all:根據一個函數對源Observable發射的全部數據進行判斷,最終返回的結果就是這個判斷結果。
2)contains:用來判斷Observable所發射的數據是否包含某一個數據
3)isEmpty:用來判斷源Observable是否發射過數據
條件操做符:
1)amb:對於給定兩個或多個Observable,它只發射首先發射數據或通知的那個Observable的全部數據
2)defaultIfEmpty:發射來自原始Observable的數據,若是原始Observable沒有發射數據,就發射一個默認數據
轉換操做符:
1)toList:將發射多項數據且爲每一項數據調用onNext方法的Observable發射的多項數據組合成一個List
2)toSortedList:相似於toList操做符:不一樣的是,它會對產生的列表排序,默認是天然升序
3)toMap:收集原始Observable發射的全部數據項到一個Map(默認是HashMap),而後發射這個Map
No4:
RxJava線程控制:想切換線程,須要使用Scheduler
1)Scheduler.immediate():直接在當前線程運行,它是timeout、timeInterval和timeStamp操做符的默認調度器
2)Schedulers.newThread():老是啓用新線程,並在新線程執行操做
3)Schedulers.io():I/O操做所使用的Scheduler
4)Schedulers.computation():計算所使用的Scheduler
5)Schedulers.trampoline():當咱們想在當前線程執行一個任務時,並非當即時,能夠用.trampoline()將它入列
6)AndroidSchedulers.mainThread():RxAndroid庫中提供的Scheduler,它指定的操做在主線程中運行
No5:
RxJava結合OkHttp訪問網絡
private Observable<String> getObservable(final String ip){ Observable observable = Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(final Subscriber<? super String> subscriber){ mOkHttpClient = new OkHttpClient(); RequestBody formBody = new FormBody.Builder().add("ip",ip).build(); Request request = new Request.Builder().url("http://ip.taobao.com/service/getIpInfo.php").post(formBody).build(); Call call = mOkHttpClient.newCall(request); call.enqueue(new Callback(){ @Override public void onFailure(Call call,IOException e){ subscriber.onError(new Exception("error")); } @Override public void onResponse(Call call,Response response) throws IOException{ String str = response.body().string(); subscriber.onNext(str); subscriber.onCompleted(); } }); } }); return observable; }
private void postAsynHttp(String size){ getObservable(size).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber<String>(){ @Override public void onCompleted(){ Log.d(TAG,"onCompleted"); } @Override public void onError(Throwable e){ Log.d(TAG,e.getMessage()); } @Override public void onNext(String s){ Log.d(TAG,s); Toast.makeText(getApplicationContext(),"請求成功",Toast.LENGTH_SHORT).show(); } }); }
No6:
No7: