RxJava = Observable + Operator + Scheduler + Observer?html
Observable
.fromArray(1, 2, 3, 4)
.map { it * 5 }
.filter { it -> it > 10 }
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe { println(it) }
複製代碼
GOTO 2016 • Exploring RxJava 2 for Android • Jake Whartonjava
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) {
emitter.onNext("Hello");
emitter.onComplete();
}
})
複製代碼
Observable.fromCallable(new Callable<String>() {
@Override
public String call() throws Exception {
return "Hello";
}
});
複製代碼
同時在Android使用場景中:react
ReactiveX的每種特定語言實現都實現了一組操做符。大體分爲如下分類:
數據庫
指定訂閱過程運行的線程,同時控制了Observable,Observer。
咱們先看一段代碼運行結果。windows
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
//.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[main,5,main] 5
複製代碼
加上subscribeOn
以後,發現subscribeOn的上游和下游運行線程都發生了變化。markdown
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
複製代碼
指定觀察者所在線程。
咱們先看一段代碼運行結果。網絡
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
//.observeOn(Schedulers.newThread())
.subscribe { println("int next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[main,5,main] 5
複製代碼
加上observeOn
以後,發現observeOn下游運行線程都發生了變化。app
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.observeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
複製代碼
@POST("/content/user/info")
Observable<BaseResponse<UserInfo>> userInfo_Ob(@Body UserInfoReq req);
複製代碼
這裏有一段咱們使用操做符的代碼,app初始化時同步服務端數據
//app初始化時同步服務端數據,
fun checkRequiredObservable(): Observable<PersonRequiredInfo> {
//後續須要的我的信息
var personRequiredInfo = PersonRequiredInfo()
val allObservables = arrayListOf(
...
//一系列獨立的請求
appTabSetting(),//底部tab配置
userInfoObservable(),//用戶信息
queryBindConfig(),//用戶綁定帳號信息
syncBabyInfoObservable(userId) //同步寶寶信息
...
)
var zipObservable = Observable.zip(allObservables) {
return@zip personRequiredInfo
}
return apolloAppConfig()//先獲取apollo配置信息,同時保存配置
.flatMap {zipObservable } //並行上面一系列請求
.onErrorReturn { personRequiredInfo }
.doOnNext {registerUserSuperProperties()}
.compose(RxHelper.io2MainThread())
}
private fun syncBabyInfoObservable(): Observable<Any> {
return getBabyInfoObservable() //先獲取服務端寶寶信息
.flatMap { updateBabyInfoObservable() } //本地與服務端比較,若是須要上傳則上傳
.flatMap { getBabyInfoObservable().map { Any() } }//再次獲取同步後的寶寶信息
.onErrorReturn { Any()}
}
複製代碼
現有接口情況的狀況下,很難想象,若是沒有RxJava我該如何組合這些請求。
debounce
Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.
throttleFirst
Returns an Observable that emits only the first item emitted by the source ObservableSource during sequential time windows of a specified duration.
RxView.clicks(container)
.throttleFirst(800,TimeUnit.MILLISECONDS)
.subscribe { onclick() }
複製代碼
RxTextView.textChanges(etSearch)
.debounce(searchDebounceTime, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe { goSearch(chars) }
複製代碼
class RefreshDebounce {
private var rxEmitter: ObservableEmitter<Long>? = null
private var observable = Observable
.create<Long> { rxEmitter = it }
.debounce(1000L, TimeUnit.MILLISECONDS)
constructor(consumer: (Long) -> Unit? ){
observable.subscribe { consumer.invoke(it) }
}
fun callRefresh() = rxEmitter?.onNext(System.currentTimeMillis())
}
//使用
var refreshDebounce = RefreshDebounce { println("refresh:$it") }
refreshDebounce.callRefresh()
複製代碼
private void startCountDown15Min() {
countDown15MinDisposable = Observable.timer(900, TimeUnit.SECONDS)
subscribe(aLong -> Log.i(TAG, "倒計時時間到"));
}
private void cancelCountDown15Min() {
if (countDown15MinDisposable != null && !countDown15MinDisposable.isDisposed()) {
countDown15MinDisposable.dispose();
}
}
複製代碼
new RxPermissions(activity)
.request(Manifest.permission.READ_PHONE_STATE)
.subscribe(aBoolean -> Log.d(TAG,"result:"+aBoolean)
複製代碼