手寫極簡版的Rxjava

本文已受權微信公衆號:鴻洋(hongyangAndroid)在微信公衆號平臺原創首發

你是否是看過了不少分析Rxjava源碼的文章,但依舊沒法在心中勾勒出Rxjava原理的樣貌。是什麼讓咱們閱讀Rxjava源碼變得如此艱難?是Rxjava的代碼封裝,以及各類細節問題的解決。本文我把Rxjava的各類封裝、抽象通通剝去,只專一於基本的事件變換。在理解了事件變換大概是作了件什麼事情時,再去看源碼,考慮一些其它問題就會更加容易。java

說明:這是一篇Rxjava源碼分析的入門文章。旨在讓讀者腦中有個概念Rxjava最主要乾了件什麼事情,幾個經常使用操做符的主要原理。從此再去看其它源碼分析文章或源碼可以更容易理解。所以本文先不去考慮Rxjava源碼中複雜的抽象封裝,線程間通訊,onComplete、onError、dispose等方法,僅專一於「onNext」的最基本調用方式。

項目源碼git


本文目錄:

  1. 手寫Rxjava核心代碼,create,nullMap(核心)操做符
  2. map,observeOn,subscribeOn,flatMap操做符
  3. 響應式編程思想的理解

手寫Rxjava核心代碼,create,nullMap操做符

Create操做符

咱們先來看一個最簡單調用github

MainActivity.java

Observable.create(new Observable<String>() {
            @Override
            public void subscribe(Observer<String> observer) {
                observer.onNext("hello");
                observer.onNext("world");
                observer.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e("yxj",s);
            }

            @Override
            public void onComplete() {
                Log.e("yxj","onComplete");
            }
        });
複製代碼
Observable.java

public abstract class Observable<T> {

    public abstract void subscribe(Observer<T> observer);

    public static <T> Observable<T> create(Observable<T> observable){
        return observable;
    }

}
複製代碼
Observer.java

public interface Observer<T> {

    void onNext(T t);
    void onComplete();
}
複製代碼
本篇文章我把Observable稱爲「節點」,Observer稱爲「處理者」,一是由於我被觀察者、被觀察者、誰訂閱誰給繞暈了,更重要的是我以爲這個名稱比較符合Rxjava的設計思想。

Observable調用create方法建立一個本身,重寫subscribe方法說:若是 我有一個處理者Observer,我就把「hello」,「world」交給它處理。編程

Observable調用了subscribe方法,真的找到了Observer。因而兌現承諾,完成整個調用邏輯。bash

這裏是「若是」有處理者,須要subscribe方法被調用時,「若是」才成立。Rxjava就是創建在一系列的「若是」(回調)操做上的。微信

「nullMap」操做符(核心)

1.建立一個observable
2.調用空map操做符作變換
3.交給observer處理

MainActivity.java

Observable.create(new Observable<String>() {
            @Override
            public void subscribe(Observer<String> observer) {
                observer.onNext("hello");
                observer.onNext("world");
                observer.onComplete();
            }
        })
        .nullMap()
        .subscribe(new Observer<String>() {
            @Override
            public void onNext(String s) {
                Log.e("yxj",s);
            }

            @Override
            public void onComplete() {
                Log.e("yxj","onComplete");
            }
        });

複製代碼
nullMap()等價於 下面這段代碼
即把上個節點的數據不作任何修改的傳遞給下一節點的map操做
 
.map(new Function<String, String>() {
    @Override
    public String apply(String s) throws Exception {
        return s;
    }
})

複製代碼

"nullMap"操做符在Rxjava源碼裏並不存在,是我方便你們理解Rxjava運行機制寫出來的。 由於nullMap操做是一個 base變換操做,map,flatMap,subscribeOn,observeOn操做符都是在nullMap上修改而來。因此Rxjava的變換的基礎就是nullMap操做符。app

Observable.java
// 這就是Rxjava的變換核心

public Observable<T> nullMap() {

        return new Observable<T>() {
            @Override
            public void subscribe(final Observer<T> observerC) {

                Observer<T> observerB = new Observer<T>() {
                    @Override
                    public void onNext(T t) {
                        observerC.onNext(t);
                    }

                    @Override
                    public void onComplete() {
                        observerC.onComplete();
                    }
                };
                Observable.this.subscribe(observerB);
            }
        };
    }
複製代碼

「nullMap」操做符作了件什麼事情:異步

  1. 上一個節點Observable A調用nullMap(),在內部new一個新的節點Observable B。
  2. 節點B重寫subscribe方法,說"若是"本身有操做者Observer C,就new一個操做者Observer B,而後讓節點A subscribe 操做者B。
  3. 節點A subscribe 操做者B,讓操做者B執行onNext方法。操做者B的onNext方法內部,調用了操做者C的onNext。從而完成了整個調用。

請注意2中的」若是「。意味着,當節點B中的subscribe方法沒有被調用的時候,2,3步驟都不會執行(他們都是回調),沒有Observer B,節點A也不會調用subscribe方法。 接下來分兩種狀況:ide

  • 節點B調用了subscribe方法,則執行2,3,完成整個流程。
  • 節點B調用nullMap,重新走一遍1,2,3步驟,至關於節點B把任務交給了下一個節點C。

概況一下就是:源碼分析

Observable每調用一次操做符,其實就是建立一個新的Observable。新Observable內部經過subscribe方法「逆向的」與上一Observable關聯。在新Observable中的new出來的Observer內的onNext方法中作了和下一個Observer之間的關聯。

github上有nullMap詳細註釋版的代碼


map,observeOn,subscribeOn,flatMap操做符

接下來讓咱們看看這4個操做符,僅僅是在nullMap中作了小改動而已。 操做符源碼

map操做符

Observable.java

public <R> Observable<R> map(final Function<T, R> function) {

        return new Observable<R>() {
            @Override
            public void subscribe(final Observer<R> observer1) {
                Observable.this.subscribe(new Observer<T>() {
                    @Override
                    public void onNext(T t) {
                        R r = function.apply(t); // 僅僅在這裏加了變換操做
                        observer1.onNext(r);
                    }

                    @Override
                    public void onComplete() {
                        observer1.onComplete();
                    }
                });
            }
        };
    }
複製代碼

和「nullMap」相比,僅僅加了一行代碼function.apply() 方法的調用。

observeOn操做符

Observable.java

public Observable<T> observeOn() {
        return new Observable<T>() {
            @Override
            public void subscribe(final Observer<T> observer) {
                Observable.this.subscribe(new Observer<T>() {
                    @Override
                    public void onNext(final T t) {
	        	//模擬切換到主線程(一般上個節點是運行在子線程的狀況)
                        handler.post(new Runnable() {
                            @Override
                            public void run() {
                                observer.onNext(t);
                            }
                        });
                    }

                    @Override
                    public void onComplete() {
                        
                    }
                });
            }
        };
    }
複製代碼

與「nullMap」相比,修改了最內部的onNext方法執行所在的線程。Rxjava源碼會更加靈活,observerOn方法參數讓你能夠指定切換到的線程,其實就是傳入了一個線程調度器,用於指定observer.onNext()方法要在哪一個線程執行。原理是同樣的。我這裏就簡寫,直接寫了切換到主線程,這你確定能看明白。

subscribeOn操做符

Observable.java

public Observable<T> subscribeOn() {
        return new Observable<T>() {
            @Override
            public void subscribe(final Observer<T> observer) {
                
                new Thread() {
                    @Override
                    public void run() {
                    // 這裏簡寫了,沒有new Observer作中轉,github上有完整代碼
                        Observable.this.subscribe(observer);
                    }
                }.start();
            }
        };
    }
複製代碼

將上一個節點切換到新的線程,修改了Observable.this.subscribe()運行的線程,Observable.this指的是調用subscribeOn()的Observable,即上一個節點。所以subscribeOn操做符修改了上一個節點的運行所在的線程

flatMap操做符

public <R> Observable<R> flatMap(final Function<T, Observable<R>> function) {

        return new Observable<R>() {
            @Override
            public void subscribe(final Observer<R> observer) {
                Observable.this.subscribe(new Observer<T>() {
                    @Override
                    public void onNext(T t) {
                        try {
                            Observable<R> observable = function.apply(t);
                            observable.subscribe(observer);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }
        };

    }
複製代碼

flatmap和map極爲類似,只不過function.apply()的返回值是一個Observable。

Observable是一個節點,既能夠用來封裝異步操做,也能夠用來封裝同步操做(封裝同步操做 == map操做符)。因此這樣就能夠很方便的寫出一個 耗時1操做 —> 耗時2操做 —> 耗時3操做...的操做

到這裏相信你們已經對Rxjava怎樣運行,幾個常見的操做符內部基本原理有了初步的理解,本文的目的就已經達到了。在以後看Rxjava源碼或者其它分析文章時,就能少受各類變換的干擾。接下來就能夠思考Rxjava是如何對各個Observable作封裝,線程之間如何通訊,onComplete、onError、dispose等方法如何實現了。

響應式編程思想的理解

響應式編程是一種面向數據流和變化傳播的編程範式。

直接看這句話其實不太容易理解。讓咱們換個說法,實際編程中是什麼會干擾咱們,使咱們沒法專一於數據流和變化傳播呢?答案是:異步,它會讓咱們的代碼造成嵌套,不夠順序化。

由於異步,咱們的業務邏輯會寫成回調嵌套的形式,致使過一段時間看本身代碼看不懂,語義化不強,不是按着順序一個節點一個節點的往下執行的。

Rxjava將全部的業務操做變成一步一步,每一步無論你是同步、異步,通通用一個節點包裹起來,節點與節點之間是同步調用的關係。如此,整個代碼的節點都是按順序執行的。

限於做者我的水平有限,本文部分表述不免有不對之處,請留言指出,相互交流。
相關文章
相關標籤/搜索