RxJava簡析

rxjava文檔地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 這個是中文版的
javascript

android studio 添加依賴  implementation 'io.reactivex.rxjava3:rxjava:3.0.4'php

首先,打印helloworld:css

public void hello(String args){ Flowable.fromArray(args).subscribe(s -> System.out.println("hello " + s + "!"));}

跟之前其餘語言不大同樣,看上去很麻煩,咱們一步步來看java

Flowable.fromArray(args)

這個方法最重要的就是裏面的最後一句react

new FlowableFromArray<>(items)

果真FlowableFromArray是Flowable的子類,因此真正的實如今子類裏面android

Flowable.fromArray(args).subscribe

 subscribe進到裏面的是git

public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError, @NonNull Action onComplete) { Objects.requireNonNull(onNext, "onNext is null"); Objects.requireNonNull(onError, "onError is null"); Objects.requireNonNull(onComplete, "onComplete is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
return ls;}

看上去最重要的就是這兩句了typescript

LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);

先進到subscribe(ls)中,發現這句數組

subscribeActual(flowableSubscriber)

跳進去發現是個抽象方法,那麼實現確定在子類啦,進到子類FlowableFromArray微信

@Overridepublic void subscribeActual(Subscriber<? super T> s) { if (s instanceof ConditionalSubscriber) { s.onSubscribe(new ArrayConditionalSubscription<>( (ConditionalSubscriber<? super T>)s, array)); } else { s.onSubscribe(new ArraySubscription<>(s, array)); }}

跳進去又發現onSubscribe是個抽象方法,那麼實現方法在哪呢,對啦,就是以前看到的LambdaSubscriber

public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this, s)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); onError(ex); } }}

這個onSubscribe.accept(this)跳過去就是接口Consumer的accept方法了

因此一開始的helloworld代碼也能夠改爲

FlowableFromArray flowableFromArray = new FlowableFromArray(new String[]{args});flowableFromArray.subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { System.out.println("hello " + s + "!"); }});

是否是很麻煩,饒了一大圈,不要緊,咱們繼續往下看

這裏給出一些名詞的翻譯

  • Reactive 直譯爲反應性的,有活性的,根據上下文通常翻譯爲反應式、響應式

  • Iterable 可迭代對象,支持以迭代器的形式遍歷,許多語言中都存在這個概念

  • Observable 可觀察對象,在Rx中定義爲更強大的Iterable,在觀察者模式中是被觀察的對象,一旦數據產生或發生變化,會經過某種方式通知觀察者或訂閱者

  • Observer 觀察者對象,監聽Observable發射的數據並作出響應,Subscriber是它的一個特殊實現

  • emit 直譯爲發射,發佈,發出,含義是Observable在數據產生或變化時發送通知給Observer,調用Observer對應的方法,文章裏一概譯爲發射

  • items 直譯爲項目,條目,在Rx裏是指Observable發射的數據項,文章裏一概譯爲數據,數據項

下面是經常使用的操做符列表:

  1. 建立操做 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer

  2. 變換操做 Buffer, FlatMap, GroupBy, Map, Scan和Window

  3. 過濾操做 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast

  4. 組合操做 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip

  5. 錯誤處理 Catch和Retry

  6. 輔助操做 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using

  7. 條件和布爾操做 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile

  8. 算術和集合操做 Average, Concat, Count, Max, Min, Reduce, Sum

  9. 轉換操做 To

  10. 鏈接操做 Connect, Publish, RefCount, Replay

  11. 反壓操做,用於增長特殊的流程控制策略的操做符

下面咱們來看第一個操做符:Create

Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); }
@Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
@Override public void onCompleted() { System.out.println("Sequence complete."); } });

咱們一塊兒來看源碼

首先是Observable的create方法

public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f));}

這裏沒什麼,就是返回建立一個Observable對象,可是要注意裏面的參數OnSubscribe

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity}
public interface Action1<T> extends Action { void call(T t);}

這個參數是一個接口,它的父類裏有個抽象待實現的方法call,並且call方法被傳了Subscriber進去

咱們來看Subscriber這個類,原來是個接口,並且它的父類Observer有三個很重要的方法

public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t);}

第一個create方法算是完成了,咱們能夠拆分來看

Observable<Integer> integerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 0; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } }});

第二個方法subscribe,它的參數也是Subscriber,即intergerObservable.subscribe(Subscriber)

因此咱們就看出來了,Observable這個被觀察者先是經過call增長一系列的監聽,而後經過subscribe訂閱監聽。這樣,當call裏的內容開始執行後,觸發監聽回調

下面我要放大招了,我把源碼簡化了一下

public interface MyOnSubscribe { void call(MySubscriber subscriber);}
public interface MySubscriber { void onNext();
void onCompleted();
void onError();}
public class MyObservable {
MyOnSubscribe onSubscribe;
public MyObservable(MyOnSubscribe onSubscribe) { this.onSubscribe = onSubscribe; }
public final static MyObservable create(MyOnSubscribe onSubscribe) { return new MyObservable(onSubscribe); }
public final void subscribe(MySubscriber subscriber) { onSubscribe.call(subscriber); }}

測試代碼

public void hello() { MyObservable.create(new MyOnSubscribe() { @Override public void call(MySubscriber subscriber) { try { for (int i = 0; i < 5; i++) { subscriber.onNext(); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(); } } }).subscribe(new MySubscriber() { @Override public void onNext() { System.out.println(1); }
@Override public void onCompleted() { System.out.println("onCompleted"); }
@Override public void onError() { System.out.println("onError"); } }); }

獲得的結果是同樣的。因此說,代碼萬變不離其中,只要靈活運用接口,接口就是用來監聽的

第二個操做符from

Integer[] items = {0, 1, 2, 3, 4, 5};Observable.from(items).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});

先看Observable的from方法

public final static <T> Observable<T> from(T[] array) { return from(Arrays.asList(array));}

其實就是把數組轉成list,可是再點from進去就很重要

public final static <T> Observable<T> from(Iterable<? extends T> iterable) { return create(new OnSubscribeFromIterable<T>(iterable));}public OnSubscribeFromIterable(Iterable<? extends T> iterable) { if (iterable == null) { throw new NullPointerException("iterable must not be null"); } this.is = iterable;}

OnSubscribeFromIterable是繼承自OnSubscribe的,因此後面調的call方法,其實是調的OnSubscribeFromIterable裏的call方法,咱們來看一下

@Overridepublic void call(final Subscriber<? super T> o) { final Iterator<? extends T> it = is.iterator(); if (!it.hasNext() && !o.isUnsubscribed()) o.onCompleted(); else  o.setProducer(new IterableProducer<T>(o, it));}

真相大白了,在這裏作了迭代。還有一個操做符just,其實底層裏面調的就是from,只不過還限制了參數個數,並且參數類型必須相同,感受用處不大

第三個操做符repeat

Observable.just(1, 2).repeat(4).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});

repeat點進去是OnSubcribRedo.repeat,緊追着count這個參數,會看到一個RedoFinite類

public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> { private final long count;
public RedoFinite(long count) { this.count = count; }
@Override public Observable<?> call(Observable<? extends Notification<?>> ts) { return ts.map(new Func1<Notification<?>, Notification<?>>() {
int num=0; @Override public Notification<?> call(Notification<?> terminalNotification) { if(count == 0) { return terminalNotification; } num++; if(num <= count) { return Notification.createOnNext(num); } else { return terminalNotification; } } }).dematerialize(); }}

這裏就看到了,有個num++和num<=count判斷,就知道是怎麼重複的了

第4個操做符Map和flapMap

這兩個變換操做符可謂很是重要,常常用到,我寫了4個例子,請仔細區別,就能夠知道它們到底作了什麼

Student student1 = new Student("stark", new Course[]{new Course("Chinese"), new Course("English")});Student student2 = new Student("adam", new Course[]{new Course("Math"), new Course("Physical")});
Student[] students = new Student[]{student1, student2};Observable.from(students).subscribe(new Action1<Student>() { @Override public void call(Student student) { System.out.println(student.getName()); }});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); }}).subscribe(new Action1<String>() { @Override public void call(String name) { System.out.println(name); }});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, Course[]>() { @Override public Course[] call(Student student) { return student.getCourses(); }}).subscribe(new Action1<Course[]>() { @Override public void call(Course[] courses) { System.out.println(courses[0].getName()); System.out.println(courses[1].getName()); }});
System.out.println("-------------");
Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); }}).subscribe(new Action1<Course>() { @Override public void call(Course course) { System.out.println(course.getName()); }});
輸出:starkadam-------------starkadam-------------ChineseEnglishMathPhysical-------------ChineseEnglishMathPhysical

若是你仔細看代碼,就會發現map就是一對一的轉換,flatMap是一對多的轉換,轉換的先後類型在方法Func1中已經標的很清楚。例子:Func1(Student,String)就表明傳參是Student,返回類型是String,具體的實如今call裏面student.getName()

map和flatMap能夠看做是將咱們常常用到的嵌套循環for(i){for(j)...}...給解耦了,看起來更清楚一些,中間能夠插入更多的操做

源碼裏面的實現就是迭代,沒什麼好說

第5個操做符filter:

Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer integer) { return integer<4; }}).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});

先過濾再循環輸出

第6個組合操做符and/then/when

implementation 'io.reactivex:rxjava-joins:0.22.0'
Observable<String> just1 = Observable.just("A", "B");Observable<Integer> just2 = Observable.just(1, 2, 3);Pattern2<String, Integer> pattern = JoinObservable.from(just1).and(just2);Plan0<String> plan = pattern.then(new Func2<String, Integer, String>() { @Override public String call(String s, Integer integer) { return s + integer; }});JoinObservable.when(plan).toObservable().subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); }});
輸出:A1B2

第7個組合操做符merge:

Observable<Integer> odds = Observable.just(1, 3, 5);Observable<Integer> evens = Observable.just(2, 4, 6);Observable.merge(odds,evens).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});
輸出:135246

第8個操做符doOnNext:

Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() { @Override public void call(Integer integer) { if (integer > 1) { throw new RuntimeException("item exceeds maximum value"); } }}).subscribe(new Subscriber<Integer>() { @Override public void onCompleted() { System.out.println("onCompleted"); }
@Override public void onError(Throwable e) { System.out.println("onError"); }
@Override public void onNext(Integer integer) { System.out.println("next:" + integer); }});
輸出:next:1onError

第9個操做符SubscribeOn(Scheduler):即申明在哪一個調度器工做

第10個:android例子:

Observable.from(new String[]{"one", "two", "three", "four", "five"}) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } });

大體瞭解了rxjava的使用和基本原理以後,在後續的使用中遇到不懂的再看文檔https://mcxiaoke.gitbooks.io/rxdocs/content/,還有必定要看源碼,而後本身親自嘗試,才能加深理解

歡迎關注個人微信公衆號:安卓圈

本文分享自微信公衆號 - 安卓圈(gh_df75572d44e4)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索