RxJava源碼分析及版本對比(一)——1.x版本基本使用分析

簡介

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.java

RxJava是使用Java VM實現的響應式編程庫:一個經過使用可觀察序列來編寫異步和基於事件的程序的庫。react

它擴展了觀察者模式以支持數據/事件序列,並添加了容許您以聲明方式組合序列的運算符,同時抽象出對低級線程、線程同步、線程安全和併發數據結構等問題的關注。android

發展歷史

RxJava存在1.x版本和2.x版本,API改動較大,接入方法基本不兼容,但實現思路相似,作了必定的優化,下面是官方對RxJava2的介紹。git

Version 2.x (Javadoc)

  • continued support for Java 6+ & Android 2.3+github

  • performance gains through design changes learned through the 1.x cycle and through Reactive-Streams-Commons research project.編程

  • Java 8 lambda-friendly API安全

  • non-opinionated about source of concurrency (threads, pools, event loops, fibers, actors, etc)微信

  • async or synchronous execution數據結構

  • virtual time and schedulers for parameterized concurrency併發

  • 單一依賴:Reactive-Streams

  • 繼續支持Java 6+和android 2.3+

  • 經過1.x和Reactive-Streams-Commons項目的積累,實現設計變動,提升性能。

  • 友好地支持Java 8 lambda表達式

  • 靈活的處理併發,包括threads, pools, event loops, fibers, actors等

  • 同步或異步操做

  • 爲參數化的併發設計了調度器

源碼地址

既然都點到這篇文章裏了,難道還不下載源碼看看嗎??

項目地址:github.com/ReactiveX/R…

這裏1.x版本和2.x版本在不一樣的分支上

1.x基本使用

public static void main(String[] args) {
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext("test");
                    subscriber.onCompleted();
                }
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext:" + s);
            }
        });
        
    }
複製代碼

拆分一下,分爲兩部分

  1. 構建一個Observable對象
  2. 調用Observable對象的subscribe方法
// 經過Observable的create靜態方法,傳入一個OnSubscribe對象
	// 這個OnSubscribe對象附帶了一個call方法,用於回調
	// 整個create方法返回了一個Observable對象的實例
	Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
		@Override
		public void call(Subscriber<? super String> subscriber) {
			if (!subscriber.isUnsubscribed()) {
				subscriber.onNext("test");
				subscriber.onCompleted();
			}
		}
	});
複製代碼
// 調用observable的subscribe方法,傳入一個Observer對象,
    // 這個Observer對象附帶了三個回調方法
    // 經過這裏subscribe方法調用上面Observable.OnSubscribe對象中的call方法
    // 再在call方法中的subscriber對象調用這裏Observer中的onNext,onCompleted方法
    observable.subscribe(new Observer<String>() {
        @Override
        public void onCompleted() {
            System.out.println("onCompleted");
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(String s) {
            System.out.println("onNext:" + s);
        }
    });

複製代碼

上面的程序打印出的效果是:
onNext:test
onCompleted

1.x源碼解析

從Observable.create方法開始

@Deprecated
    public static <T> Observable<T> create(OnSubscribe<T> f) {
        // 接收一個OnSubscribe參數,調用構造函數返回一個Observable
        // 1)、先看Observable的構造函數作了什麼
        // 2)、再看OnSubscribe類是怎樣的
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
複製代碼

Observable的構造函數

// Obserbable類中存在一個final變量是OnSubscribe類型的
    final OnSubscribe<T> onSubscribe;

    // 構造方法將傳入的OnSubscribe對象賦值給局部變量onSubscribe保存起來
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
複製代碼

再來看OnSubscribe類,其實就是一個包含有一個call方法的類,在基本使用的第2步,調用observable對象subscribe方法時觸發。

/** * 這裏的OnSubscribe是一個接口,繼承子Action1接口 * * 在Observable.subscribe方法被調用的時候執行 * 這裏的subscribe是第2部分,若是印象模糊能夠回頭看一下上面的基本使用部分 * @param <T> the output value type */
    public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
    
	/** * 一個call方法,observable對象subscribe方法時觸發 * 用上面的基本使用的例子,這裏的call攜帶的範型T就是Subscriber的實例subscriber * 上面call方法就是調用subscribe中的onNext方法和onCompleted * 能夠看出這裏的onNext和onCompleted就是基本使用例子第二步的subscribe方法傳入Observer對象的方法 * * 繼續跟Action和 Action繼承的Function都沒有實現 * @param <T> the first argument type */
    public interface Action1<T> extends Action {
    	void call(T t);
    }
複製代碼

能夠看出例子第1步實例Observable.OnSubscribe對象的時候實現了一個call方法,call方法傳入的參數就是Subscriber實例,方法內調用的onNext方法和onCompleted方法執行了第2步中Observer實例的對應方法,因此這裏能夠猜想Observer是一個接口,Subscriber實現了Observer。咱們來看一下這兩個類。

/** * 很熟悉的三連,本身實現,完成業務 */
	public interface Observer<T> {
		void onCompleted();
		void onError(Throwable e);
		void onNext(T t);
	}
	
	/** * 和咱們的猜想一致,Subscriber實現了Observer */
	public abstract class Subscriber<T> implements Observer<T>, Subscription {
		...
	}
複製代碼

到這裏應該對總體有一個理解了,咱們再來看一下observable.subscribe方法,結束後在進行總結。

public final Subscription subscribe(final Observer<? super T> observer) {
        if (observer instanceof Subscriber) {
            return subscribe((Subscriber<? super T>)observer);
        }
        if (observer == null) {
            throw new NullPointerException("observer is null");
        }
        // 這裏我門傳入的是new Observer,直接進入下面部分
        return subscribe(new ObserverSubscriber<T>(observer));
    }
    
    ...
    
	public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
	}
	
	static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
		...
		// 方法比較長,這裏只分析重要部分
		// 這裏的RxJavaHooks.onObservableStart方法返回了observable.onSubscribe
		// 再調用observable類中的局部變量onSubscribe的call方法,具體的實現就是咱們再例子中實現的call方法,調用了subscriber.onNext和subscriber.onCompleted
		RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
		return RxJavaHooks.onObservableReturn(subscriber);
	}
	
	// RxJavaHooks.onObservableStart分析
	public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        if (f != null) {
            // 會進入該方法,最後返回的就是Observable構造函數中存儲的局部變量onSubscribe,不要將這個call和我門本身實現的call方法弄混了
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }
複製代碼

其實分析到這裏就結束了,將整個調用鏈鏈接了起來

咱們再來看一下關於RxJavaHooks.onObservableStart分析,

分析這裏的返回值就是Observable構造函數中存儲的局部變量onSubscribe。

對RxJavaHooks.onObservableStart方法進行斷點調試,進入f!=null 判斷。

// Func2是一個接口,咱們繼續使用斷點調試查看具體的實現
	public interface Func2<T1, T2, R> extends Function {
		R call(T1 t1, T2 t2);
	}
複製代碼
// 經過斷點調試找到call方法的實現
	onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
		@Override
		public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
			return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
            }
        };
複製代碼
// 返回了第二個參數
    @Deprecated
    public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass through by default
        return onSubscribe;
    }
複製代碼

最後返回了第二個參數,就是observable.onSubscribe。

1.x總結

第一步:在調用create方法構造了一個Observable對象,而且在Observable對象的構造方法中,將局部變量onSubscribe賦值,該onSubscribe實現了call方法等待被回調,call方法中提供了一個subscriber實例,該實例實現了Observer,有onCompleted,onError,onNext方法能夠進行調用。

第二步:執行Observable實例的subscribe方法,傳入了Observer對象,實現了onCompleted,onError,onNext方法。再經過RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);方法回調到了第一步中OnSubscribe的call方法,這裏的subscriber方法就是第二步中實例的Observer。

總體就是在第一步中寫了一個回調,等待第二步subscribe方法調起。第二步中的Observer實現了一個回調的三個方法,供第一步中的回調函數內調用。

關注微信公衆號,最新技術乾貨實時推送

image
相關文章
相關標籤/搜索