Android異步框架RxJava 1.x系列(二) - 事件及事件序列轉換原理

前言

在介紹 RxJava 1.x 線程調度器以前,首先引入一個重要的概念 - 事件序列轉換。RxJava 提供了對事件序列進行轉換的支持,這是它的核心功能之一。java

 

 

正文

1. 事件序列轉換定義

所謂轉換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不一樣的事件或事件序列,有點相似 Java 1.8 中的流處理。web

2. 事件序列轉換API

首先看一個 map() 的例子:編程

Observable.just("images/logo.png") // 輸入類型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 參數類型 String return getBitmapFromPath(filePath); // 返回類型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 參數類型 Bitmap showBitmap(bitmap); } }); 複製代碼

這裏出現了一個叫 Func1 的類。它和 Action1 很是類似,也是 RxJava 的一個接口,用於包裝含有一個參數的方法。 Func1Action 的區別在於: Func1 包裝的是有返回值的方法。另外,和 ActionX 同樣,FuncX 也有多個,用於不一樣參數個數的方法。同理,FuncXActionX 的區別在 FuncX 包裝的是有返回值的方法。後端

能夠看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在通過 map() 方法後,事件的參數類型也由 String 轉爲了 Bitmap。緩存

這種直接轉換對象並返回的,是最多見的也最容易理解的變換。不過 RxJava 的轉換遠不止這樣,它不只能夠針對事件對象,還能夠針對整個事件隊列,這使得 RxJava 變得很是靈活。數據結構

下面給出幾個示例:多線程

map()

事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava 最經常使用的變換。 map() 的示意圖以下:架構

 

 

flatMap()

這是一個頗有用但很是難理解的變換。首先假設這麼一種需求:假設有一個數據結構『學生』,如今須要打印出一組學生的名字。實現方式很簡單:框架

Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } }; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber); 複製代碼

若是要打印出每一個學生所須要修的全部課程的名稱呢?需求的區別在於,每一個學生只有一個名字,但卻有多個課程,首先能夠這樣實現:異步

Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } }; Observable.from(students) .subscribe(subscriber); 複製代碼

若是我不想在 Subscriber 中使用 for 循環,而是但願 Subscriber 中直接傳入單個的 Course 對象呢(這對於代碼複用很重要)?用 map() 顯然是不行的,由於 map()一對一的轉化,而如今須要一對多的轉化。問題出現了:怎樣把一個 Student 轉化成多個 Course

這個時候,flatMap() 就派上了用場:

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); 複製代碼

從上面的代碼能夠看出,flatMap()map() 有一個相同點:它也是把傳入的參數轉化以後返回另外一個對象。

flatMap()map() 不一樣的是,flatMap() 返回的是個 Observable 對象,而且這個 Observable 對象並非被直接發送到 Subscriber 的回調方法中。

flatMap() 示意圖以下:

 

 

flatMap() 的原理是這樣的:

  1. 使用傳入的事件對象建立一個 Observable 對象;
  2. 並不當即發送這個 Observable, 而是將它激活,而後開始發送事件;
  3. 將每個建立出來的 Observable 發送的事件,都被匯入同一個 Observable

而這個 Observable 負責將這些事件統一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,經過一組新建立的 Observable 將初始的對象『鋪平』以後經過統一路徑分發了下去。而這個『鋪平』就是 flatMap() 所謂的 flat

3. 事件序列轉換原理

這些轉換雖然功能各有不一樣,但實質上都是針對事件序列的處理和再發送。而在 RxJava 的內部,它們是基於同一個基礎的轉換方法:lift(Operator)

lift()

首先看一下 lift() 的內部實現(核心代碼):

public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); } 複製代碼

這段代碼實現的功能,簡單來講就是建立了一個新的 Observable 並返回。若是看過上篇博客會發現有些蹊蹺。重溫一下 Observable.subscribe(Subscriber) 的實現(核心代碼):

public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; } 複製代碼

對比一下以上兩段代碼的方法體(忽略返回值),會發現一行突兀的代碼:

Subscriber newSubscriber = operator.call(subscriber);
複製代碼

解釋一下 lift() 方法完成的操做:

  1. 利用 Observable.create() 方法建立一個新的 Observable 對象,加上以前的原始 Observable,已經有兩個 Observable

  2. 建立 Observable 的同時建立一個新的 OnSubscribe 用於發出事件。

  3. 經過 lift() 傳入的 Operator 函數的 call() 方法構造一個新的 Subscriber 對象,並將新 Subscriber 和原始 Subscriber 進行關聯。

  4. 利用這個新 Subscriber 向原始 Observable 進行訂閱,實現事件序列的轉換。

這種實現基於代理模式,經過事件攔截和處理實現事件序列的變換。

Observable 執行了 lift(Operator) 方法以後,會返回一個新的 Observable,這個新的 Observable 會像一個代理同樣,負責接收原始的 Observable 發出的事件,並在處理後發送給 Subscriber

整個過程的思惟導圖以下:

 

 

或者能夠看動圖:

 

 

兩次和屢次的 lift() 同理,以下圖:

 

 

舉一個具體的 Operator 的實現。下面是一個將事件的 Integer 對象轉換成 String 的例子,僅供參考:

observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 將事件序列中的 Integer 對象轉換爲 String 對象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } }); 複製代碼

學習 lift() 的原理只是爲了更好地理解 RxJava ,從而能夠更好地使用它。然而RxJava 不建議開發者自定義 Operator 來直接使用 lift(),而是儘可能使用已有的 lift() 包裝方法(如 map() flatMap() 等)進行組合。

compose()

除了 lift() 以外,Observable 還有一個轉方法叫作 compose()。它和 lift() 的區別在於,lift() 是針對事件項事件序列的,而 compose() 是針對 Observable 自身進行轉換。

舉個例子,假設在程序中有多個 Observable 都須要應用一組相同的 lift() 進行轉換,一般會這樣寫:

observable1.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);

observable2.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber2);

observable3.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber3);

observable4.lift1()
    .lift2()
    .lift3()
    .lift4()
    .subscribe(subscriber1);
複製代碼

能夠發現有太多重複代碼,代碼重構以下:

private Observable liftAll(Observable observable) { return observable.lift1() .lift2() .lift3() .lift4(); } liftAll(observable1).subscribe(subscriber1); liftAll(observable2).subscribe(subscriber2); liftAll(observable3).subscribe(subscriber3); liftAll(observable4).subscribe(subscriber4); 複製代碼

可讀性、可維護性都提升了。但是 Observable 被一個方法包起來,這種方式對於 Observale 的靈活性進行了限制。怎麼辦?這個時候,就應該用 compose() 來解決了:

public class LiftAllTransformer implements Observable.Transformer<Integer, String> { @Override public Observable<String> call(Observable<Integer> observable) { return observable.lift1() .lift2() .lift3() .lift4(); } } Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4); 複製代碼

如上,使用 compose() 方法,Observable 能夠利用傳入的 Transformer 對象的 call 方法直接對自身進行處理,而不是被包在方法的裏面。

小結

本文主要介紹了 RxJava 事件及事件序列轉換原理,其中 lift() 方法的使用方法和實現原理是重點、難點。後續將會介紹的 RxJava 線程調度器底層也是基於它實現的。


歡迎關注技術公衆號: 零壹技術棧

 

零壹技術棧

 

本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。

相關文章
相關標籤/搜索