Rxjava基礎

  如今不少Android App的開發開始使用Rxjava,可是Rxjava以學習曲線陡峭著稱,入門有些困難。通過一段時間的學習和使用,這裏來介紹一下我對Rxjava的理解。java

  說到Rxjava首先須要瞭解的兩個東西,一個是Observable(被觀察者,事件源)和 Subscriber(觀察者,是 Observer的子類)。Observable發出一系列事件,Subscriber處理這些事件。首先來看一個基本的例子,咱們如何建立並使用Observable。app

Observable.create(new Observable.OnSubscribe<String>() { 
	@Override public void call(Subscriber<? super String> subscriber) { 
		subscriber.onNext("hello"); 
		} 
	}).subscribe(new Subscriber<String>()
	{ 
		@Override public void onCompleted() {
		} 
		@Override public void onError(Throwable e) { 
		} 
		@Override public void onNext(String s) { 
			Log.d("rx-info", s); 
		} 
	});

  建立Observable的最基本的方法是經過Observable.create() 來進行,當有Subscriber經過Observable.subscribe() 方法進行訂閱以後Observable就會發射事件,注意必需要有訂閱者訂閱纔會發射事件。發射的方式是經過調用 Observable中的 OnSubsribe 類型的成員來實現(每一個Observable有一個final OnSubscribe<T> onSubscribe 成員,該成員是一個接口,後面詳細說),在 Onsubsribe類型成員中調用 call() 方法,注意,這個call方法的參數就是 Observable.subscribe() 方法傳入的 Subsriber實例。總的一句話就是在Obsubscribe 的 call方法中執行訂閱者(Subscriber)的三個方法 onNext(), onCompleted() 和 onError()。ide

  一開始就是一堆 Observable , Subscriber,subscribe() , OnSubscribe 估計看得頭暈,所以咱們須要先來對這些東西有一個瞭解。這裏只列出一個幫助理解的大概。函數

public class Observable<T> {

	  final OnSubscribe<T> onSubscribe;
	  protected Observable(OnSubscribe<T> f) {
              this.onSubscribe = f;
          }

       public final static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(hook.onCreate(f));
        }

       public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
            // cover for generics insanity
        }
	
	  public final Subscription subscribe(Subscriber<? super T> subscriber) {
              return Observable.subscribe(subscriber, this);
          }

       public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
       }
}
public interface Action1<T> extends Action {
    void call(T t);
}
public interface Subscription {
    void unsubscribe();
    boolean isUnsubscribed();
}
public interface Observer<T> {
    void onCompleted();
    void onError(Throwable e);
    void onNext(T t);
}

public abstract class Subscriber<T> implements Observer<T>, Subscription {
	//...
}

  經過上面的代碼幫助理清楚 Observable, Observer, Subscriber, OnSubsriber, subscribe() 之間的關係。這裏額外提一下 Observable.subscribe() 方法有多個重載:post

Subscription    subscribe()
Subscription    subscribe(Action1<? super  T> onNext)
Subscription    subscribe(Action1<? super  T> onNext, Action1< java.lang .Throwable> onError)
Subscription    subscribe(Action1<? super  T> onNext, Action1< java.lang .Throwable> onError, Action0 onComplete)
Subscription    subscribe(Observer<? super  T> observer)
Subscription    subscribe(Subscriber<? super  T> subscriber)

  其它的ActionX 和 FuncX 請你們自行去查閱定義。學習

  介紹了基本的建立Observable和 Observable是怎麼發射事件的以後,來介紹一下Rxjava的Operator和Operator的原理。this

  Rxjava的Operator常見的有map, flatMap, concat, merge之類的。這裏就不介紹Operator的使用了,介紹一下其原理。介紹原理仍是來看源碼,以map爲例。spa

  首先看一下使用map的例子:.net

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("hello");
    }
})
.map(new Func1<String, String>() {
    @Override
    public String call(String s) {
        return s + "word";
    }
})
.subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(String s) {
        Log.d("info-rx", s);
    }
});

  繼續來看 map的定義:代理

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }

  簡單說一下Func1,其中的T表示傳入的參數類型,R表示方法返回的參數類型。Operator的操做原理最核心的就是lift的實現。

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                try {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    try {
                        // new Subscriber created and being subscribed with so 'onStart' it
                        st.onStart();
                        onSubscribe.call(st);
                    } catch (Throwable e) {
                        // localized capture of errors rather than it skipping all operators 
                        // and ending up in the try/catch of the subscribe method which then
                        // prevents onErrorResumeNext and other similar approaches to error handling
                        Exceptions.throwIfFatal(e);
                        st.onError(e);
                    }
                } catch (Throwable e) {
                    Exceptions.throwIfFatal(e);
                    // if the lift function failed all we can do is pass the error to the final Subscriber
                    // as we don't have the operator available to us
                    o.onError(e);
                }
            }
        });
    }

  lift方法看起來太過複雜,稍做簡化:

public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
    return new Observable<R>(...);
}

  lift方法實際是產生一個新的 Observable。在map()調用以後,咱們操做的就是新的Observable對象,咱們能夠把它取名爲Observable$2,咱們這裏調用subscribe就是Observable$2.subscribe,繼續看到subscribe裏,重要的幾個調用:

hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);

  注意,這裏的observable是Observable$2!!也就是說,這裏的onSubscribe是,lift中定義的!!

  回過頭來看lift方法中建立新Observable的過程:

return new Observable<R>(new OnSubscribe<R>() {
    @Override
    public void call(Subscriber<? super R> o) {
        try {
            Subscriber<? super T> st = hook.onLift(operator).call(o);
            try {
                // new Subscriber created and being subscribed with so 'onStart' it
                st.onStart();
                onSubscribe.call(st); //請注意我!! 這個onSubscribe是原始的OnSubScribe對象!!
            } catch (Throwable e) {
                // localized capture of errors rather than it skipping all operators 
                // and ending up in the try/catch of the subscribe method which then
                // prevents onErrorResumeNext and other similar approaches to error handling
                if (e instanceof OnErrorNotImplementedException) {
                    throw (OnErrorNotImplementedException) e;
                }
                st.onError(e);
            }
        } catch (Throwable e) {
            if (e instanceof OnErrorNotImplementedException) {
                throw (OnErrorNotImplementedException) e;
            }
            // if the lift function failed all we can do is pass the error to the final Subscriber
            // as we don't have the operator available to us
            o.onError(e);
        }
    }
});

  必定必定要注意這段函數執行的上下文!,這段函數中的onSubscribe對象指向的是外部類,也就是第一個Observable的onSubScribe!而不是Observable$2中的onSubscribe,接下來看:

Subscriber<? super T> st = hook.onLift(operator).call(o);

  這行代碼,就是定義operator,生成一個通過operator操做過的Subscriber,看下OperatorMap這個類中的call方法: 

@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
    return new Subscriber<T>(o) {

        @Override
        public void onCompleted() {
            o.onCompleted();
        }

        @Override
        public void onError(Throwable e) {
            o.onError(e);
        }

        @Override
        public void onNext(T t) {
            try {
                o.onNext(transformer.call(t));
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(OnErrorThrowable.addValueAsLastCause(e, t));
            }
        }

    };
}

  沒錯,對傳入的Subscriber作了一個代理,把轉換後的值傳入。這樣就生成了一個代理的Subscriber,最後咱們最外層的OnSubscribe對象對咱們代理的Subscriber進行了調用: 

 @Override
public void call(Subscriber<? super String> subscriber) {
    //此處的subscriber就是被map包裹(wrapper)後的對象。
    subscriber.onNext("hello");
}

  而後這個subscriber傳入到內部,鏈式的通知,最後通知到咱們在subscribe函數中定義的對象。

  分析lift的原理,其實仍是回到了一開始介紹的Observable必需要有訂閱者進行訂閱才能發射事件。lift方法會產生一個新的Observable,而且這個Observable位於原始Observable和後面的Subsriber之間,所以lift方法也須要提供一個新的Subscriber來使得新產生的Observable發射事件,這個新的Subsriber就是對事件鏈後方的Subscriber就行包裝作一個代理。

  詳細使用Rxjava可參見:

  1. 給 Android 開發者的 RxJava 詳解

  2.Rxjava使用基礎合集

相關文章
相關標籤/搜索