本篇文章已受權微信公衆號 YYGeeker
獨家發佈轉載請標明出處java
CSDN學院課程地址react
- RxJava2從入門到精通-初級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-中級篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-進階篇:edu.csdn.net/course/deta…
- RxJava2從入門到精通-源碼分析篇:edu.csdn.net/course/deta…
自定義Operator屬於RxJava的高級用法,能夠本身自定義一些適用於常見應用場景的操做符。實現自定義Operator很簡單,只須要實現RxJava提供的ObservableOperator
接口,實現對應的功能便可,同時,使用lift
操做符將自定義操做符應用到咱們的程序中。下面咱們使用自定義Operator,該操做符的做用是將List
集合轉換成String
類型的輸出編程
一、實現ObservableOperator,建立自定義Operatorbash
public class CustomOperator implements ObservableOperator<String, List<String>> {
@Override
public Observer<? super List<String>> apply(final Observer<? super String> observer) throws Exception {
return new Observer<List<String>>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(List<String> strings) {
observer.onNext(strings.toString());
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
複製代碼
二、使用lift操做符添加自定義Operator微信
public class Main {
public static void main(String[] args) {
//建立被觀察者
Observable.create(new ObservableOnSubscribe<List<String>>() {
@Override
//默認在主線程裏執行該方法
public void subscribe(@NonNull ObservableEmitter<List<String>> e) throws Exception {
ArrayList<String> list = new ArrayList<>();
list.add("1");
list.add("2");
list.add("3");
list.add("4");
e.onNext(list);
e.onComplete();
}
})
.lift(new CustomOperator())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println("onNext=" + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
}
}
複製代碼
三、輸出結果app
onNext=[1, 2, 3, 4]
onComplete
複製代碼
自定義Transformer表示一個批量操做符的變換器,若是你在不少Observable中使用相同的一系列操做符,好比每次都要使用到map
+take
+doOnNext
等操做,那麼就能夠定義一個通用的Transformer對象,裏面能夠將須要重複用到的操做符打包成Transformer對象,使用compose操做符將Transformer對象應用到咱們的Observable上便可ide
實現自定義Transformer很簡單,只須要實現RxJava提供的ObservableTransformer
接口,實現對應的功能便可,同時,使用compose
操做符將自定義Transformer應用到咱們的程序中。下面咱們使用自定義Transformer,該Transformer的做用是將發射的數據從Integer
轉換成String
,並取2個數據項,同時在發射的時候監聽發射事件,進行輸出的打印函數
一、實現ObservableTransformer,建立自定義Transformer源碼分析
public class CustomTransformer implements ObservableTransformer<Integer, String> {
@Override
public ObservableSource<String> apply(io.reactivex.Observable<Integer> upstream) {
return upstream.take(2).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "序號:" + integer + "發射成功";
}
}).doOnNext(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s + ",準備發射");
}
});
}
}
複製代碼
二、使用compose操做符添加自定義Transformerui
public class Main {
public static void main(String[] args) {
//建立被觀察者
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
})
.compose(new CustomTransformer())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
複製代碼
三、輸出結果
序號:1發射成功,準備發射
序號:1發射成功
序號:2發射成功,準備發射
序號:2發射成功
複製代碼
在安卓開發中,一般咱們也會自定義Transformer來實現咱們經常使用的線程切場景,具體以下
public static <T> ObservableTransformer<T, T> schedulersTransformer() {
return new ObservableTransformer<T, T>() {
@Override
public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
public static <T> FlowableTransformer<T, T> schedulersTransformerForFlowable() {
return new FlowableTransformer<T, T>() {
@Override
public Publisher<T> apply(Flowable<T> upstream) {
return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
}
};
}
複製代碼
自定義Plugin表示自定義插件,自定義插件能夠在RxJavaPlugins中提供的接口中去插入本身的一段代碼操做,相似於面向切面編程,或者理解成Android的Hook。若是你須要在全部的訂閱事件中去插入一段統一的操做,或者是監聽全部訂閱事件發生異常時的回調,均可以使用自定義插件。在實際應用中,目前並未發現有什麼做用
實現自定義Plugin只須要調用RxJavaPlugins提供的set方法便可,下面咱們經過例子輸出Observable和Observer的地址信息,來驗證每次訂閱的時候,回調自定義Plugin的方法中,插件對象和源對象是否爲同一個對象
一、經過設置ObservableSubscribe,每次對Observable操做的時候回調
public class Main {
public static void main(String[] args) {
RxJavaPlugins.setOnObservableAssembly(new CustomObservableAssembly());//任意操做符都有回調
RxJavaPlugins.setOnObservableSubscribe(new CustomObservableSubscribe());//每次subscribe時候有回調
Observable observable = getObservable();
Observer<Integer> observer = getObserver();
System.out.println("main observable.toString:" + observable.toString());
System.out.println("main observer.toString:" + observer.toString());
observable.subscribe(observer);
}
public static Observable getObservable() {
return Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(5);
emitter.onNext(2);
emitter.onNext(3);
}
});
}
public static Observer<Integer> getObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println("onNext=" + integer);
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
@Override
public void onComplete() {
}
};
}
}
複製代碼
二、CustomObservableAssembly
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
複製代碼
三、CustomObservableSubscribe
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
return observer;
}
}
複製代碼
四、輸出結果
地址相同說明是同個對象,自定義插件Hook成功
CustomObservableAssembly observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca
main observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
CustomObservableSubscribe observable.toString:io.reactivex.internal.operators.observable.ObservableCreate@1a93a7ca,observer.toString:com.hensen.rxjavalearning.Chapter7.Chapter7o3.Main$2@3d82c5f3
onNext=5
onNext=2
onNext=3
複製代碼
補充:
能夠經過設置ErrorHandler,發生異常時會回調
RxJavaPlugins.setErrorHandler();
複製代碼
能夠經過設置SchedulerHandler來Hook到對應的schedule
RxJavaPlugins.setIoSchedulerHandler();
RxJavaPlugins.setNewThreadSchedulerHandler();
RxJavaPlugins.setComputationSchedulerHandler();
RxJavaPlugins.setSingleSchedulerHandler();
複製代碼
錯誤演示:
因爲CustomObservableAssembly是在任意操做符操做的時候都會回調,因此在回調裏面是不能夠對observable再進行操做符的操做,不然回調裏面observable的操做符仍是會回調CustomObservableAssembly自身,致使死循環,發生StackOverflowError
public class CustomObservableAssembly implements Function<Observable, Observable> {
@Override
public Observable apply(Observable observable) throws Exception {
System.out.println("CustomObservableAssembly observable.toString:" + observable.toString());
observable.take(2);
return observable;
}
}
複製代碼
因爲CustomObservableSubscribe是在subscribe以後進行的回調,若是在回調裏面對observable進行操做符的操做,這個時候是不會生效的,由於在subscribe以後onNext的函數是不會再處理後面新添的操做符,原理與源碼有關
public class CustomObservableSubscribe implements BiFunction<Observable, Observer, Observer> {
@Override
public Observer apply(Observable observable, Observer observer) throws Exception {
System.out.println("CustomObservableSubscribe observable.toString:" + observable.toString() + ",observer.toString:" + observer.toString());
observable.take(2);
return observer;
}
}
複製代碼