RxJava源碼分析及版本對比(二)——2.x版本基本使用分析與對比1.x版本

在基本使用上(無背壓) 2.x版本和1.x版本極爲類似,針對擴展性作了優化java

一樣的方法再盤他一遍!bash

2.x基本使用

Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            if (!e.isDisposed()) {
                e.onNext("1");
                e.onComplete();
            }
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("onSubscribe");
            d.dispose();
        }
        @Override
        public void onNext(String value) {
            System.out.println("onNext:" + value);
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onComplete() {
            System.out.println("onCompleted");
        }
    });
複製代碼

一樣的拆分一下,分爲兩部分
微信

  1. 構建一個Observable對象
  2. 調用Observable對象的subscribe方法
// 經過Observable的create靜態方法,傳入一個ObservableOnSubscribe對象
	// 這個OnSubscribe對象附帶了一個call方法,用於回調
	// 整個create方法返回了一個Observable對象的實例
	
	// 這裏對於1.x版本的區別是
	// Observable.OnSubscribe->ObservableOnSubscribe
	// Observer-> ObservableEmitter
	// subscriber.isUnsubscribed->e.isDisposed
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            if (!e.isDisposed()) {
                e.onNext("1");
                e.onComplete();
            }
        }
    });
    
    // 調用observable的subscribe方法,傳入一個Observer對象
    // 這個Observer對象實現了onSubscribe方法,onNext方法,onError方法,onComplete方法。
    
    // 這裏對比1.x版本的區別是
    // Observer接口增長了void onSubscribe(@NonNull Disposable d);方法須要實現
    // 主動設置狀態d.dispose();增長了靈活性
    observable.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("onSubscribe");
            d.dispose();
        }
        @Override
        public void onNext(String value) {
            System.out.println("onNext:" + value);
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onComplete() {
            System.out.println("onCompleted");
        }
    });
複製代碼

上面的程序打印出的效果是:
onSubscribe
ide

d.dispose()註釋掉優化

上面的程序打印出的效果是:
onSubscribe
onNext:1
onCompleted
ui

2.x源碼解析

1.一樣的,從從Observable.create方法開始this

// 在1.x版本中,Observable.create方法是棄用的
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
複製代碼

ObservableCreate的構造方法spa

final ObservableOnSubscribe<T> source;
	// 對局部變量賦值,保存,等待回調調用
	public ObservableCreate(ObservableOnSubscribe<T> source) {
		this.source = source;
	}
複製代碼

與1.x版本的實現方式大體相同,細節部分有些改變code

這裏經過RxJavaPlugins.onAssembly方法,返回了一個ObservableCreate對象,該對象繼承與Observable,構造方法中傳入了一個ObservableOnSubscribe實例,賦值給局部變量,等待被回調。該實例就是咱們在基本使用的例子中建立的對象,而且實現了subscribe(ObservableEmitter<String> e)方法。cdn

回顧1.x版本,思路也是保存一個咱們本身實現的實例到局部變量,等待被回調。能夠說二者在基本使用這種狀況下沒有本質區別。

2.再來看看咱們的subscribe(ObservableEmitter<String> e)方法是如何被回調的

從第二部分的observable.subscribe()方法入手分析

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
    	// 省略一些判斷及異常捕捉。
       ...
       subscribeActual(observer);
       ...
    }
複製代碼

這裏的 subscribeActual是一個抽象方法,根據1中分析能夠得知,調用者observable是一個ObservableCreate對象,咱們進入該類查找subscribeActual(observer)方法。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        // 構造了一個CreateEmitter對象,這個對象實現了Emitter、Disposable接口
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 調用傳入observer的onSubscribe方法,這裏的observer就是咱們在基本使用第2步中構造出的observer實例,回調咱們本身實現的onSubscribe,傳入parent能夠進行對d.dispose()方法進行控制,標記是否已經進行了處理。
        observer.onSubscribe(parent);

        try {
            // 調用第1步中保存的source,是一個咱們本身實現ObservableOnSubscribe對象,在這裏被回調。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
複製代碼

再來看一下爲何經過基本使用例子裏面的e.onNext("1");e.onComplete();就調用到了咱們在限免實現Observer裏面的的方法

@Override
    public void onNext(T t) {
        // 判空 
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        // 調用了observer.onNext
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    
    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                // 調用了observer.onComplete();

                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }
複製代碼

這樣下來,就經過subscribeActual(observer)就實現了第1步和第2步的關聯

1)在內部先調用observer.onSubscribe(parent)
進入咱們本身的邏輯執行了System.out.println("onSubscribe")
2)再調用了source.subscribe(parent)
進入咱們本身的邏輯執行了

if (!e.isDisposed()) {
		e.onNext("1");
		e.onComplete();
	}
複製代碼

接着執行了

@Override
	public void onNext(String value) {
		System.out.println("onNext:" + value);
	}
	
	@Override
	public void onComplete() {
		System.out.println("onCompleted");
	}
複製代碼

將咱們的代碼鏈接了起來。

總結

其實在2.x版本的基本使用中,有這麼幾個對象:

  • Observable 可觀察的,也能夠說是一個被觀察對象,在構造的時候傳入一個ObservableOnSubscribe對象實現subscribe(ObservableEmitter<String> e)方法等待被回調

  • Observer 觀察者,被觀察的對象。在Observable調用subscribe(new Observer)方法的時候傳入一個觀察者,經過該方法,先調用了本身的onSubscribe方法,能夠進行Disposable的標記,再回調了等待被回調的被觀察者Observable的subscribe方法

  • Disposable 可處理的,對當前的流程進行標記,是否放棄操做

  • ObservableOnSubscribe 在構造被觀察對象時傳入的實例,實現subscribe方法等待被回調

  • Emiter 這裏用到的是CreateEmitter,實現了ObservableOnSubscribe對象與Observer關聯,至關因而一個橋的調用。

關注微信公衆號,最新技術乾貨實時推送

image
相關文章
相關標籤/搜索