RxJava中操做符到底作了什麼?

    RxJava今年完全火了一把,其中最牛逼之處就是操做符了,之前只知道怎麼用,這幾天看了看源碼,大體的弄清楚了操做符的工做過程,今天分享給你們。若是有什麼不對地方,請你們多多指教。java

    今天咱們已filter爲例,看代碼:app

Integer[] datas={1,2,3,4,5,6,7,8,9,10};
Observable.from(datas)
        .filter(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer>=5;
            }
        })
        .subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                mText.append(integer.toString()+",");
            }
        });

     一個很簡單的小例子,用過濾操做符 filter 找出大於等於5的數字。咱們點進去看看源碼中filter作了什麼ide

public final Observable<T> filter(Func1<? super T, Boolean> predicate) {
            return create(new OnSubscribeFilter<T>(this, predicate));
        }

    調用了create()方法,等等咱們何時是否是也用過create() 方法,咱們在建立Observable時候也用過create()方法,原來建立了一個新的Observable返回出去了,那豈不是說咱們的訂閱者其實訂閱的是這個新的Observable,咱們繼續往下看create方法,create方法須要的參數是一個OnSubscribe對象,那咱們能夠肯定
OnSubscribeFilter是OnSubscribe的一個實現類,咱們點進去看看。學習

public final class OnSubscribeFilter<T> implements OnSubscribe<T> {
    
        final Observable<T> source;
    
        final Func1<? super T, Boolean> predicate;
    
        public OnSubscribeFilter(Observable<T> source, Func1<? super T, Boolean> predicate) {
            this.source = source;
            this.predicate = predicate;
        }

    果真不出咱們所料,OnSubscribeFilter是OnSubscribe的實現類,咱們看他的構造方法,傳遞了兩個參數,第一個參數Observable對象,一個Func1,其中第一個參數就是咱們咱們本身建立的那個Observable,第二個參數使咱們在外面寫的Func1,而後保存了起來。咱們都知道在subscribe()訂閱的時候,OnSubscribe的call()方法。咱們看看OnSubscribeFilter的call()方法都幹了些什麼測試

@Override
        public void call(final Subscriber<? super T> child) {
            FilterSubscriber<T> parent = new FilterSubscriber<T>(child, predicate);
            child.add(parent);
            source.unsafeSubscribe(parent);
        }
&ensp;&ensp;&ensp;&ensp;出現了一個FilterSubscriber,什麼鬼玩意兒,咱們看看他是什麼鬼
static final class FilterSubscriber<T> extends Subscriber<T> {

        final Subscriber<? super T> actual;

        final Func1<? super T, Boolean> predicate;

        boolean done;

        public FilterSubscriber(Subscriber<? super T> actual, Func1<? super T, Boolean> predicate) {
            this.actual = actual;
            this.predicate = predicate;
            request(0);
        }

        @Override
        public void onNext(T t) {
            boolean result;

            try {
                result = predicate.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }

            if (result) {
                actual.onNext(t);
            } else {
                request(1);
            }
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;

            actual.onError(e);
        }


        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
        @Override
        public void setProducer(Producer p) {
            super.setProducer(p);
            actual.setProducer(p);
        }
    }
}

    一個Subscriber的子類,咱們看他的構造方法,兩個參數,一個Subscriber一個Func1,咱們在建立對象時候Subscriber對象是咱們真正的從外界傳過來的觀察者,Func1呢使咱們建立OnSubscribeFilter時候傳遞進來的對象,也就是咱們在外界定義的Func1。
    回過頭來咱們繼續看OnSubscribeFilter的call方法。咱們看到source.unsafeSubscribe(parent),source是咱們原來外界的Observable,他訂閱了FilterSubscriber對象。咱們在他的onNext方法中看到他根據func1.call(t)的返回值來判斷是否讓咱們外界的真正的觀察者調用onNext方法。
    看到這裏有沒有恍然大悟,啥?我都不知道你在說啥,額,那咱們總體的屢屢。this

    咱們外界的代碼,在subscribe()時候,Subscriber並非訂閱了咱們本身寫的Observable,Subscriber訂閱的是filter方法返回的那個新的Observable對象,因此訂閱時候會調用OnSubscribeFilter的call方法,OnSubscribeFilter纔是咱們訂閱的被觀察者的onSubscribe對象,在OnSubscribeFilter的call()方法中,咱們讓咱們包裝的FilterSubscriber訂閱咱們原來的被觀察者,也就是咱們在外界生成的那個Observable。咱們在外界的Observable的onSubscribe對象的call方法中獲得的觀察者是FilterSubscriber對象,咱們調用的onNext會回調到FilterSubscriber的onNext方法中。在FilterSubscriber的onNext方法中咱們根據咱們傳遞的Func1來判斷是否要回調真正的Subscriber的onNext方法,在爲true的時候咱們纔回調咱們外界的觀察者的onNext方法,也就起到了過濾的做用。這就是Filter的整個的流程。spa

    咱們來測試下咱們的小結論:code

Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    Log.e("call:subscriber", "" + subscriber.getClass().getCanonicalName());
                    subscriber.onNext(5);
                }
            }).filter(new Func1<Integer, Boolean>() {
                @Override
                public Boolean call(Integer integer) {
                    return integer > 0;
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    
                }
            });

結果

不知道你們看明白沒有,很是願意和你們一塊兒討論,一塊兒學習,歡迎留言對象

相關文章
相關標籤/搜索