RxJava介紹2:基本結構和使用場景

RxJava構成

簡述

RxJava = Observable + Operator + Scheduler + Observer?html

Observable
    .fromArray(1, 2, 3, 4)
    .map { it * 5 }
    .filter { it -> it > 10 }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { println(it) }
複製代碼


截屏2021-05-10 上午11.13.55.png

Observable*

可觀察數據源有哪些特徵,

  • Usually do work when you start or stop listening
  • Synchronous or asynchronous
  • Single item,many items,or empty
  • Terminates with an error or succeeds to completion
  • May never terminate!
  • Just an implemention of the Observer pattern

GOTO 2016 • Exploring RxJava 2 for Android • Jake Whartonjava

哪些可稱爲可觀察數據源

  • View事件(觸摸,點擊..)
  • 網絡請求返回的數據
  • 查詢數據庫返回的數據
  • 手機傳感器通知數據(GPS,電量)
  • 鬧鐘定時器通知...

如何建立建Observable

  • Observable.just("Hello")
  • Observable.fromArray("Hello","Hello2")
  • Observable.fromCallable{"Hello"}
  • Observable.create{ it.onNext("Hello")}
  • Observable.Interval(200)
Observable.create(new ObservableOnSubscribe<String>() {
	@Override
	public void subscribe(ObservableEmitter<String> emitter)  {
		emitter.onNext("Hello");
		emitter.onComplete();
	}
})
複製代碼
Observable.fromCallable(new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "Hello";
    }
});
複製代碼

同時在Android使用場景中:react

  • RxView.clicks(view) : Observable
  • login(@Body LoginReqParams) : Observable
  • RxPermissions.request(READ_PHONE_STATE) : Observable
  • Operator 操做符

    介紹

    ReactiveX的每種特定語言實現都實現了一組操做符。大體分爲如下分類:
    數據庫

    • Creating Observables : create,just,from
    • Transforming Observables :Map,FlatMap
    • Filtering Observables:Filter,Debouncee,Distinct,Skip
    • Combining Observables : Merge,Zip,CombineLatest
    • Error Handling Operators: Catch,Retry
    • Observable Utility Operators: Delay,TimeInterval,ObserveOn,SubscribeOn
    • Conditional and Boolean Operators : All,Contains,TkeWhile
    • Mathematical and Aggregate Operators : Average,Count,Max
    • Connectable Observable Operators : Connect,Replay
    • Operators to Convert Observables : To

    map

    image.png

    fliter

    image.png

    Scheduler 線程調度

    subscribeOn

    指定訂閱過程運行的線程,同時控制了Observable,Observer。
    咱們先看一段代碼運行結果。windows

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	//.subscribeOn(Schedulers.newThread())
        .subscribe { println("in next :${Thread.currentThread()} $it") }
    
    //運行結果
    in main:Thread[main,5,main]
    in create:Thread[main,5,main]
    in next  :Thread[main,5,main] 5
    複製代碼

    加上subscribeOn以後,發現subscribeOn的上游和下游運行線程都發生了變化。markdown

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	.subscribeOn(Schedulers.newThread())
        .subscribe { println("in next :${Thread.currentThread()} $it") }
    
    //運行結果
    in main:Thread[main,5,main]
    in create:Thread[RxNewThreadScheduler-1,5,main]
    in next  :Thread[RxNewThreadScheduler-1,5,main] 5
    複製代碼

    image.png

    observeOn

    指定觀察者所在線程。
    咱們先看一段代碼運行結果。網絡

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	//.observeOn(Schedulers.newThread())
        .subscribe { println("int next :${Thread.currentThread()} $it") }
    
    //運行結果
    in main:Thread[main,5,main]
    in create:Thread[main,5,main]
    in next  :Thread[main,5,main] 5
    複製代碼

    加上observeOn以後,發現observeOn下游運行線程都發生了變化。app

    println("in main:${Thread.currentThread()}")
    Observable.create<Int> {
            println("in create:${Thread.currentThread()}");
            it.onNext(1) }
    	.observeOn(Schedulers.newThread())
        .subscribe { println("in next :${Thread.currentThread()} $it") }
    //運行結果
    in main:Thread[main,5,main]
    in create:Thread[main,5,main]
    in next  :Thread[RxNewThreadScheduler-1,5,main] 5
    複製代碼


    image.png
    特別的是:
    image.png async

    Observer 觀察者

    ide

    APP中使用場景

    列舉目前咱們App中使用到RxJava的場景。

    網絡請求:Retrofit+RxJava

    @POST("/content/user/info")
    Observable<BaseResponse<UserInfo>> userInfo_Ob(@Body UserInfoReq req);
    複製代碼

    這裏有一段咱們使用操做符的代碼,app初始化時同步服務端數據

    //app初始化時同步服務端數據,
    fun checkRequiredObservable(): Observable<PersonRequiredInfo> {
        	//後續須要的我的信息
            var personRequiredInfo = PersonRequiredInfo()
            val allObservables = arrayListOf(
                    ...
                	//一系列獨立的請求
                    appTabSetting(),//底部tab配置
                	userInfoObservable(),//用戶信息
                    queryBindConfig(),//用戶綁定帳號信息
                    syncBabyInfoObservable(userId) //同步寶寶信息
                    ...
            )
             var zipObservable =  Observable.zip(allObservables) { 
                 return@zip personRequiredInfo 
             }
            return apolloAppConfig()//先獲取apollo配置信息,同時保存配置
                    .flatMap {zipObservable } //並行上面一系列請求
                    .onErrorReturn { personRequiredInfo }
                    .doOnNext {registerUserSuperProperties()}
                    .compose(RxHelper.io2MainThread())
    }
    private fun syncBabyInfoObservable(): Observable<Any> {
            return getBabyInfoObservable() //先獲取服務端寶寶信息
                .flatMap { updateBabyInfoObservable() } //本地與服務端比較,若是須要上傳則上傳
                .flatMap { getBabyInfoObservable().map { Any() } }//再次獲取同步後的寶寶信息
                .onErrorReturn { Any()}
    }
    複製代碼

    現有接口情況的狀況下,很難想象,若是沒有RxJava我該如何組合這些請求。

    截屏2021-05-10 下午4.27.31.png

    防抖(debounce)與節流(throttle)

    debounce

    Returns an Observable that mirrors the source ObservableSource, except that it drops items emitted by the source ObservableSource that are followed by newer items before a timeout value expires. The timer resets on each emission.

    throttleFirst

    Returns an Observable that emits only the first item emitted by the source ObservableSource during sequential time windows of a specified duration.

    View重複點擊

    RxView.clicks(container)
        .throttleFirst(800,TimeUnit.MILLISECONDS)
        .subscribe { onclick() }
    複製代碼

    搜索防抖

    RxTextView.textChanges(etSearch)
        .debounce(searchDebounceTime, TimeUnit.MILLISECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe { goSearch(chars) }
    複製代碼

    頁面刷新頻率控制

    class RefreshDebounce {
        private var rxEmitter: ObservableEmitter<Long>? = null
        private var observable = Observable
            .create<Long> { rxEmitter = it }
            .debounce(1000L, TimeUnit.MILLISECONDS)
    
        constructor(consumer: (Long) -> Unit? ){
            observable.subscribe { consumer.invoke(it) }
        }
        fun callRefresh() = rxEmitter?.onNext(System.currentTimeMillis())
    }
    
    //使用
    var refreshDebounce = RefreshDebounce { println("refresh:$it") }
    refreshDebounce.callRefresh()
    複製代碼

    定時/延時執行

    private void startCountDown15Min() {
        countDown15MinDisposable = Observable.timer(900, TimeUnit.SECONDS)
               subscribe(aLong -> Log.i(TAG, "倒計時時間到"));
    }
    
    private void cancelCountDown15Min() {
        if (countDown15MinDisposable != null && !countDown15MinDisposable.isDisposed()) {
            countDown15MinDisposable.dispose();
        }
    }
    
    複製代碼

    系統權限申請

    new RxPermissions(activity)
    	.request(Manifest.permission.READ_PHONE_STATE)
        .subscribe(aBoolean -> Log.d(TAG,"result:"+aBoolean)
    複製代碼
相關文章
相關標籤/搜索