前言
在介紹 RxJava 1.x
線程調度器以前,首先引入一個重要的概念 - 事件序列轉換。RxJava
提供了對事件序列進行轉換的支持,這是它的核心功能之一。java
![](http://static.javashuo.com/static/loading.gif)
正文
1. 事件序列轉換定義
所謂轉換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不一樣的事件或事件序列,有點相似 Java 1.8
中的流處理。web
2. 事件序列轉換API
首先看一個 map()
的例子:編程
Observable.just("images/logo.png") // 輸入類型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 參數類型 String return getBitmapFromPath(filePath); // 返回類型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 參數類型 Bitmap showBitmap(bitmap); } }); 複製代碼
這裏出現了一個叫 Func1
的類。它和 Action1
很是類似,也是 RxJava
的一個接口,用於包裝含有一個參數的方法。 Func1
和 Action
的區別在於: Func1
包裝的是有返回值的方法。另外,和 ActionX
同樣,FuncX
也有多個,用於不一樣參數個數的方法。同理,FuncX
和 ActionX
的區別在 FuncX
包裝的是有返回值的方法。後端
能夠看到,map() 方法將參數中的 String 對象轉換成一個 Bitmap 對象後返回,而在通過 map() 方法後,事件的參數類型也由 String 轉爲了 Bitmap。緩存
這種直接轉換對象並返回的,是最多見的也最容易理解的變換。不過 RxJava
的轉換遠不止這樣,它不只能夠針對事件對象,還能夠針對整個事件隊列,這使得 RxJava
變得很是靈活。數據結構
下面給出幾個示例:多線程
map()
事件對象的直接變換,具體功能上面已經介紹過。它是 RxJava
最經常使用的變換。 map()
的示意圖以下:架構
![](http://static.javashuo.com/static/loading.gif)
flatMap()
這是一個頗有用但很是難理解的變換。首先假設這麼一種需求:假設有一個數據結構『學生』,如今須要打印出一組學生的名字。實現方式很簡單:框架
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } }; Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(subscriber); 複製代碼
若是要打印出每一個學生所須要修的全部課程的名稱呢?需求的區別在於,每一個學生只有一個名字,但卻有多個課程,首先能夠這樣實現:異步
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } }; Observable.from(students) .subscribe(subscriber); 複製代碼
若是我不想在 Subscriber
中使用 for
循環,而是但願 Subscriber
中直接傳入單個的 Course
對象呢(這對於代碼複用很重要)?用 map()
顯然是不行的,由於 map()
是一對一的轉化,而如今須要一對多的轉化。問題出現了:怎樣把一個 Student
轉化成多個 Course
?
這個時候,flatMap()
就派上了用場:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber); 複製代碼
從上面的代碼能夠看出,flatMap()
和 map()
有一個相同點:它也是把傳入的參數轉化以後返回另外一個對象。
flatMap()
和 map()
不一樣的是,flatMap()
返回的是個 Observable
對象,而且這個 Observable
對象並非被直接發送到 Subscriber
的回調方法中。
flatMap()
示意圖以下:
![](http://static.javashuo.com/static/loading.gif)
flatMap()
的原理是這樣的:
- 使用傳入的事件對象建立一個
Observable
對象; - 並不當即發送這個
Observable
, 而是將它激活,而後開始發送事件; - 將每個建立出來的
Observable
發送的事件,都被匯入同一個Observable
。
而這個 Observable
負責將這些事件統一交給 Subscriber
的回調方法。這三個步驟,把事件拆成了兩級,經過一組新建立的 Observable
將初始的對象『鋪平』以後經過統一路徑分發了下去。而這個『鋪平』就是 flatMap()
所謂的 flat
。
3. 事件序列轉換原理
這些轉換雖然功能各有不一樣,但實質上都是針對事件序列的處理和再發送。而在 RxJava
的內部,它們是基於同一個基礎的轉換方法:lift(Operator)
。
lift()
首先看一下 lift()
的內部實現(核心代碼):
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); } 複製代碼
這段代碼實現的功能,簡單來講就是建立了一個新的 Observable
並返回。若是看過上篇博客會發現有些蹊蹺。重溫一下 Observable.subscribe(Subscriber)
的實現(核心代碼):
public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; } 複製代碼
對比一下以上兩段代碼的方法體(忽略返回值),會發現一行突兀的代碼:
Subscriber newSubscriber = operator.call(subscriber);
複製代碼
解釋一下 lift()
方法完成的操做:
-
利用
Observable.create()
方法建立一個新的Observable
對象,加上以前的原始Observable
,已經有兩個Observable
。 -
建立
Observable
的同時建立一個新的OnSubscribe
用於發出事件。 -
經過
lift()
傳入的Operator
函數的call()
方法構造一個新的Subscriber
對象,並將新Subscriber
和原始Subscriber
進行關聯。 -
利用這個新
Subscriber
向原始Observable
進行訂閱,實現事件序列的轉換。
這種實現基於代理模式,經過事件攔截和處理實現事件序列的變換。
在 Observable
執行了 lift(Operator)
方法以後,會返回一個新的 Observable
,這個新的 Observable
會像一個代理同樣,負責接收原始的 Observable
發出的事件,並在處理後發送給 Subscriber
。
整個過程的思惟導圖以下:
![](http://static.javashuo.com/static/loading.gif)
或者能夠看動圖:
![](http://static.javashuo.com/static/loading.gif)
兩次和屢次的 lift()
同理,以下圖:
![](http://static.javashuo.com/static/loading.gif)
舉一個具體的 Operator
的實現。下面是一個將事件的 Integer
對象轉換成 String
的例子,僅供參考:
observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 將事件序列中的 Integer 對象轉換爲 String 對象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } }); 複製代碼
學習 lift() 的原理只是爲了更好地理解 RxJava ,從而能夠更好地使用它。然而RxJava 不建議開發者自定義 Operator 來直接使用 lift(),而是儘可能使用已有的 lift() 包裝方法(如 map() flatMap() 等)進行組合。
compose()
除了 lift()
以外,Observable
還有一個轉方法叫作 compose()
。它和 lift()
的區別在於,lift()
是針對事件項和事件序列的,而 compose()
是針對 Observable
自身進行轉換。
舉個例子,假設在程序中有多個 Observable
都須要應用一組相同的 lift()
進行轉換,一般會這樣寫:
observable1.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
複製代碼
能夠發現有太多重複代碼,代碼重構以下:
private Observable liftAll(Observable observable) { return observable.lift1() .lift2() .lift3() .lift4(); } liftAll(observable1).subscribe(subscriber1); liftAll(observable2).subscribe(subscriber2); liftAll(observable3).subscribe(subscriber3); liftAll(observable4).subscribe(subscriber4); 複製代碼
可讀性、可維護性都提升了。但是 Observable
被一個方法包起來,這種方式對於 Observale
的靈活性進行了限制。怎麼辦?這個時候,就應該用 compose()
來解決了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> { @Override public Observable<String> call(Observable<Integer> observable) { return observable.lift1() .lift2() .lift3() .lift4(); } } Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4); 複製代碼
如上,使用 compose()
方法,Observable
能夠利用傳入的 Transformer
對象的 call
方法直接對自身進行處理,而不是被包在方法的裏面。
小結
本文主要介紹了 RxJava
事件及事件序列轉換原理,其中 lift()
方法的使用方法和實現原理是重點、難點。後續將會介紹的 RxJava
線程調度器底層也是基於它實現的。
歡迎關注技術公衆號: 零壹技術棧
![零壹技術棧](http://static.javashuo.com/static/loading.gif)
本賬號將持續分享後端技術乾貨,包括虛擬機基礎,多線程編程,高性能框架,異步、緩存和消息中間件,分佈式和微服務,架構學習和進階等學習資料和文章。