RxJava源碼解析(2) —— Lift

Lift是RxJava中比較核心的操做。RxJava中的絕大多數操做符都是經過lift完成的,而Lift本質上是做了一個相似於中轉的做用,接收前一個Observable的事件,做處理,而後發送給後一個subscriber;java

此次咱們經過查看Map操做的實現,來了解lift的原理。ide

首先仍是讓咱們回顧下subscribe操做的原理:this

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
  
        //合法性校驗
        (省略)  

        //onStart()回調
        subscriber.onStart();

      //調用observable.onSubscribe.call()方法
        try {
         observable.onSubscribe.call(subscriber);
         
            return subscriber;
        } catch (Throwable e) {
          
            //解綁
            return Subscriptions.unsubscribed();
        }
    }
//以上代碼省略部分非核心邏輯    

  由以上代碼能夠知道一個通用的RxJava訂閱操做中:spa

Observable的內部成員變量OnSubscribe的call方法的參數(下面代碼的第3行),就是subscribe方法的參數(下面代碼的第7行),也就是這個Observable訂閱的subscriber。code

Observable的內部成員變量OnSubscribe的call方法的參數(下面代碼的第3行),就是subscribe方法的參數(下面代碼的第7行),也就是這個Observable訂閱的subscriber。orm

Observable的內部成員變量OnSubscribe的call方法的參數(下面代碼的第3行),就是subscribe方法的參數(下面代碼的第7行),也就是這個Observable訂閱的subscriber。對象

 1 Observable.create(new Observable.OnSubscribe<String>() {
 2             @Override
 3             public void call(Subscriber<? super String> subscriber) {
 4                 subscriber.onNext("Hello world");
 5                 subscriber.onCompleted();
 6             }
 7         }).subscribe(new Subscriber<String>() {
 8             @Override
 9             public void onStart() {
10                 super.onStart();
11             }
12             @Override
13             public void onCompleted() {
14             }
15             @Override
16             public void onError(Throwable e) {
17             }
18             @Override
19             public void onNext(String o) {
20                 System.out.println(o);
21             }
22         });

 

回顧了最基本的訂閱操做以後,讓咱們看下Map的操做符的實現:blog

舉個簡單的例子:新建一個originObservable,而後進行map操做,獲取每個String的length,而後把length發送給lastSubscriber事件

Observable originObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Haaaaaaa");
                subscriber.onCompleted();

            }
        });

 

1  originObservable.map(new Func1<String, Integer>() {
2             @Override
3             public Integer call(String string) {
4                 return string.length();
5             }
6         }).subscribe(lastSubscriber);

 

 直接看map的實現:ip

 public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
     }

 

這樣的話,上述的例子實際上就變成了這樣:

 originObservable.lift(new OperatorMap<String, Integer>(new Func1<String, Integer>() {
            @Override
            public Integer call(String string) {
                return string.length();
            }
        })).subscribe(lastSubscriber);

 

而後咱們再看lift的實現:

 public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = operator.call(o);
                    try {
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                     st.onError(e);
                    }
                } catch (Throwable e) {
                    o.onError(e);
                }
            }
        });
    }
//以上代碼省略了部分非主要邏輯

 

首先觀察lift的源碼,咱們能夠發現:

  • Lift內部new了一個Observable,(暫且稱之爲NewLiftObservable)而且做爲Lift方法的返回值。
  • NewLiftObservable的成員變量OnSubscribe的call方法內部,調用operator.call(Subscriber)方法,獲得了一個新的Subscriber。(暫且稱之爲OperatorCallSubscriber

這樣再看咱們再結合咱們的例子,就會發現,訂閱lastSubscriber的原來是Lift內部new出來的那個Observable,即:NewLiftObervable,因此例子也就變成了下面這樣:

 

 new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = operator.call(o);
                    try {
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                     st.onError(e);
                    }
                } catch (Throwable e) {
                    o.onError(e);
                }
            }
        }).subscribe(lastSubscriber);

 

再結合subscribe的原理,咱們能夠知道上述中call方法的o參數,其實就是lastSubscriber對象,

那麼上述operator.call實際操做的也是lastSubscriber對象,即以下面這樣:

 Subscriber<? super T> st = operator.call(lastSubscriber);

 

因此OperatorCallSubscriber是operator調用call方法去操做lastSubscriber生成的。

因此OperatorCallSubscriber是operator調用call方法去操做lastSubscriber生成的。

因此OperatorCallSubscriber是operator調用call方法去操做lastSubscriber生成的。

而後咱們繼續看NewLiftOperator的OnSubscribe的call方法內部還作了哪些事,

調用OperatorCallSubscriber的onStart方法

OperatorCallSubscriber做爲OnSubscribe的call方法的參數傳遞進去。

這裏有一點須要注意:

onSubscribe.call(st);

 

這句代碼中的ObSubscribe不是NewLiftObervable的成員變量,而是originObservable的。由於實際上調用lift方法的是originObservable。(這一點須要想通)

因此originObservable生成的代碼變成了這樣:

Observable originObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> OperatorCallSubscriber) {
                OperatorCallSubscriber.onNext("Haaaaaaa");
                OperatorCallSubscriber.onCompleted();

            }
        });

 

 

其實也就是:

Observable originObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> oprator.call(lastSubscriber)) {
                oprator.call(lastSubscriber).onNext("Haaaaaaa");
                oprator.call(lastSubscriber).onCompleted();

            }
        });

 

對比下沒有調用map操做的結果:

Observable originObservable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> lastSubscriber) {
               lastSubscriber.onNext("Haaaaaaa");
                lastSubscriber.onCompleted();

            }
        });

 

很明顯,能夠發現map操做的左右就是調用operator.call方法把lastSubscriber給包裝了下,

再看下operator哪來的:觀察lift的源碼,很明顯operator是lift的參數,在本例中也就是OperatorMap對象,即map操做符的實現,

那麼看下OperatorMap的源碼吧:

public final class OperatorMap<T, R> implements Operator<R, T> {

    private final Func1<? super T, ? extends R> transformer;

    public OperatorMap(Func1<? super T, ? extends R> transformer) {
        this.transformer = transformer;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o) {
        return new Subscriber<T>(o) {

            @Override
            public void onCompleted() {
                o.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }

            @Override
            public void onNext(T t) {
                try {
                    o.onNext(transformer.call(t));
                } catch (Throwable e) {
                    Exceptions.throwOrReport(e, this, t);
                }
            }

        };
    }

}

 

經過前面的例子,很明顯能夠看到,源碼中的transformer就是咱們使用map的時候傳入的func;

而後再看下OperatorMap的call方法,很簡單,就是把call方法傳入的Subscriber進行了簡單的包裝,而且返回了一個新的Subscriber,結合前面的結論:

OperatorCallSubscriber是operator調用call方法去操做lastSubscriber生成的。

所以

OperatorCallSubscriber實際上只是簡單包裝了lastSubscriber,而且調用在OperatorCallSubscriber的onNext方法的時候,先把參數經過map的func作變換,再傳遞給lastSubscriber的onNext方法。

這樣就完成了map的功能,上述也就是lift的原理的簡單闡述。

相關文章
相關標籤/搜索