RxJava源碼解析(1) —— subscribe

RxJava是一款實現異步的庫,因爲具備比較高的抽象性,使用它能夠讓咱們的項目代碼邏輯更加的清晰,從而更利於維護。異步

今天咱們分析下RxJava的最基礎的部分:subscribe()的實現。ide

首先看RxJava最基本的使用方式:this

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("Hello world");
                subscriber.onCompleted();
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onStart() {
                super.onStart();
            }
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String o) {
                System.out.println(o);
            }
        });

上述代碼很簡單,就是經過Observable.create新建了一個Observable 而後 訂閱(subscribe) 一個Subscriber。spa

下面咱們就來看看subscribe的內部究竟做了哪些事?code

 public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

 

private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
  
        //合法性校驗
        if (subscriber == null) {
            throw new IllegalArgumentException("observer can not be null");
        }
        if (observable.onSubscribe == null) {
            throw new IllegalStateException("onSubscribe function can not be null.");
        }
        //onStart()回調
        subscriber.onStart();
//包裝成SafeSubscriber if (!(subscriber instanceof SafeSubscriber)) { subscriber = new SafeSubscriber<T>(subscriber); }
//調用observable.onSubscribe.call()方法 try { hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); //返回subscriber return hook.onSubscribeReturn(subscriber); } catch (Throwable e) { Exceptions.throwIfFatal(e); try { subscriber.onError(hook.onSubscribeError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); hook.onSubscribeError(r); throw r; } //解綁 return Subscriptions.unsubscribed(); } }

由上可知最關鍵的代碼在這一句:server

 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);

而觀察源碼能夠知道:
 public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
        // pass-thru by default
        return onSubscribe;
    }

因此上述代碼等價於:
observable.onSubscribe.call(subscriber);實際上就是把subscriber做爲onSubscribe.call()方法的參數傳了進去,而這裏的onSubscribe就是Observale.create的時候new出來的那個,因此在subscribe的過程當中會依次調用subscriber.onNext  和 onComplete方法。至此,就很明確了,訂閱subscribe的過程就是把最終的subscriber做爲Observable內部onSubscribe的call方法參數傳入,最終在onSubscribe的call方法內部不斷的調用subscriber的回調方法。這是典型的觀察者模式。