rxjava文檔地址https://mcxiaoke.gitbooks.io/rxdocs/content/ 這個是中文版的
javascript
android studio 添加依賴 implementation 'io.reactivex.rxjava3:rxjava:3.0.4'php
首先,打印helloworld:css
public void hello(String args){ Flowable.fromArray(args).subscribe(s -> System.out.println("hello " + s + "!"));}
跟之前其餘語言不大同樣,看上去很麻煩,咱們一步步來看java
Flowable.fromArray(args)
這個方法最重要的就是裏面的最後一句react
new FlowableFromArray<>(items)
果真FlowableFromArray是Flowable的子類,因此真正的實如今子類裏面android
Flowable.fromArray(args).subscribe
subscribe進到裏面的是git
public final Disposable subscribe( Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) { Objects.requireNonNull(onNext, "onNext is null"); Objects.requireNonNull(onError, "onError is null"); Objects.requireNonNull(onComplete, "onComplete is null");
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
return ls;}
看上去最重要的就是這兩句了typescript
LambdaSubscriber<T> ls = new LambdaSubscriber<>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
subscribe(ls);
先進到subscribe(ls)中,發現這句數組
subscribeActual(flowableSubscriber)
跳進去發現是個抽象方法,那麼實現確定在子類啦,進到子類FlowableFromArray微信
public void subscribeActual(Subscriber<? super T> s) { if (s instanceof ConditionalSubscriber) { s.onSubscribe(new ArrayConditionalSubscription<>( (ConditionalSubscriber<? super T>)s, array)); } else { s.onSubscribe(new ArraySubscription<>(s, array)); }}
跳進去又發現onSubscribe是個抽象方法,那麼實現方法在哪呢,對啦,就是以前看到的LambdaSubscriber
public void onSubscribe(Subscription s) { if (SubscriptionHelper.setOnce(this, s)) { try { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.cancel(); onError(ex); } }}
這個onSubscribe.accept(this)跳過去就是接口Consumer的accept方法了
因此一開始的helloworld代碼也能夠改爲
FlowableFromArray flowableFromArray = new FlowableFromArray(new String[]{args});flowableFromArray.subscribe(new Consumer<String>() { public void accept(String s) throws Throwable { System.out.println("hello " + s + "!"); }});
是否是很麻煩,饒了一大圈,不要緊,咱們繼續往下看
這裏給出一些名詞的翻譯
Reactive 直譯爲反應性的,有活性的,根據上下文通常翻譯爲反應式、響應式
Iterable 可迭代對象,支持以迭代器的形式遍歷,許多語言中都存在這個概念
Observable 可觀察對象,在Rx中定義爲更強大的Iterable,在觀察者模式中是被觀察的對象,一旦數據產生或發生變化,會經過某種方式通知觀察者或訂閱者
Observer 觀察者對象,監聽Observable發射的數據並作出響應,Subscriber是它的一個特殊實現
emit 直譯爲發射,發佈,發出,含義是Observable在數據產生或變化時發送通知給Observer,調用Observer對應的方法,文章裏一概譯爲發射
items 直譯爲項目,條目,在Rx裏是指Observable發射的數據項,文章裏一概譯爲數據,數據項
下面是經常使用的操做符列表:
建立操做 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
變換操做 Buffer, FlatMap, GroupBy, Map, Scan和Window
過濾操做 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, TakeLast
組合操做 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, Zip
錯誤處理 Catch和Retry
輔助操做 Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, Using
條件和布爾操做 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, TakeWhile
算術和集合操做 Average, Concat, Count, Max, Min, Reduce, Sum
轉換操做 To
鏈接操做 Connect, Publish, RefCount, Replay
反壓操做,用於增長特殊的流程控制策略的操做符
下面咱們來看第一個操做符:Create
Observable.create(new Observable.OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { public void onNext(Integer item) { System.out.println("Next: " + item); }
public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); }
public void onCompleted() { System.out.println("Sequence complete."); } });
咱們一塊兒來看源碼
首先是Observable的create方法
public final static <T> Observable<T> create(OnSubscribe<T> f) { return new Observable<T>(hook.onCreate(f));}
這裏沒什麼,就是返回建立一個Observable對象,可是要注意裏面的參數OnSubscribe
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> { // cover for generics insanity}
public interface Action1<T> extends Action { void call(T t);}
這個參數是一個接口,它的父類裏有個抽象待實現的方法call,並且call方法被傳了Subscriber進去
咱們來看Subscriber這個類,原來是個接口,並且它的父類Observer有三個很重要的方法
public interface Observer<T> { void onCompleted(); void onError(Throwable e); void onNext(T t);}
第一個create方法算是完成了,咱們能夠拆分來看
Observable<Integer> integerObservable = Observable.create(new Observable.OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 0; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } }});
第二個方法subscribe,它的參數也是Subscriber,即intergerObservable.subscribe(Subscriber)
因此咱們就看出來了,Observable這個被觀察者先是經過call增長一系列的監聽,而後經過subscribe訂閱監聽。這樣,當call裏的內容開始執行後,觸發監聽回調
下面我要放大招了,我把源碼簡化了一下
public interface MyOnSubscribe { void call(MySubscriber subscriber);}
public interface MySubscriber { void onNext();
void onCompleted();
void onError();}
public class MyObservable {
MyOnSubscribe onSubscribe;
public MyObservable(MyOnSubscribe onSubscribe) { this.onSubscribe = onSubscribe; }
public final static MyObservable create(MyOnSubscribe onSubscribe) { return new MyObservable(onSubscribe); }
public final void subscribe(MySubscriber subscriber) { onSubscribe.call(subscriber); }}
測試代碼
public void hello() { MyObservable.create(new MyOnSubscribe() { public void call(MySubscriber subscriber) { try { for (int i = 0; i < 5; i++) { subscriber.onNext(); } subscriber.onCompleted(); } catch (Exception e) { subscriber.onError(); } } }).subscribe(new MySubscriber() { public void onNext() { System.out.println(1); }
public void onCompleted() { System.out.println("onCompleted"); }
public void onError() { System.out.println("onError"); } }); }
獲得的結果是同樣的。因此說,代碼萬變不離其中,只要靈活運用接口,接口就是用來監聽的
第二個操做符from
Integer[] items = {0, 1, 2, 3, 4, 5};Observable.from(items).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});
先看Observable的from方法
public final static <T> Observable<T> from(T[] array) { return from(Arrays.asList(array));}
其實就是把數組轉成list,可是再點from進去就很重要
public final static <T> Observable<T> from(Iterable<? extends T> iterable) { return create(new OnSubscribeFromIterable<T>(iterable));}public OnSubscribeFromIterable(Iterable<? extends T> iterable) { if (iterable == null) { throw new NullPointerException("iterable must not be null"); } this.is = iterable;}
OnSubscribeFromIterable是繼承自OnSubscribe的,因此後面調的call方法,其實是調的OnSubscribeFromIterable裏的call方法,咱們來看一下
public void call(final Subscriber<? super T> o) { final Iterator<? extends T> it = is.iterator(); if (!it.hasNext() && !o.isUnsubscribed()) o.onCompleted(); else o.setProducer(new IterableProducer<T>(o, it));}
真相大白了,在這裏作了迭代。還有一個操做符just,其實底層裏面調的就是from,只不過還限制了參數個數,並且參數類型必須相同,感受用處不大
第三個操做符repeat
Observable.just(1, 2).repeat(4).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});
repeat點進去是OnSubcribRedo.repeat,緊追着count這個參數,會看到一個RedoFinite類
public static final class RedoFinite implements Func1<Observable<? extends Notification<?>>, Observable<?>> { private final long count;
public RedoFinite(long count) { this.count = count; }
public Observable<?> call(Observable<? extends Notification<?>> ts) { return ts.map(new Func1<Notification<?>, Notification<?>>() {
int num=0; public Notification<?> call(Notification<?> terminalNotification) { if(count == 0) { return terminalNotification; } num++; if(num <= count) { return Notification.createOnNext(num); } else { return terminalNotification; } } }).dematerialize(); }}
這裏就看到了,有個num++和num<=count判斷,就知道是怎麼重複的了
第4個操做符Map和flapMap
這兩個變換操做符可謂很是重要,常常用到,我寫了4個例子,請仔細區別,就能夠知道它們到底作了什麼
Student student1 = new Student("stark", new Course[]{new Course("Chinese"), new Course("English")});Student student2 = new Student("adam", new Course[]{new Course("Math"), new Course("Physical")});
Student[] students = new Student[]{student1, student2};Observable.from(students).subscribe(new Action1<Student>() { @Override public void call(Student student) { System.out.println(student.getName()); }});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); }}).subscribe(new Action1<String>() { @Override public void call(String name) { System.out.println(name); }});
System.out.println("-------------");
Observable.from(students).map(new Func1<Student, Course[]>() { @Override public Course[] call(Student student) { return student.getCourses(); }}).subscribe(new Action1<Course[]>() { @Override public void call(Course[] courses) { System.out.println(courses[0].getName()); System.out.println(courses[1].getName()); }});
System.out.println("-------------");
Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); }}).subscribe(new Action1<Course>() { @Override public void call(Course course) { System.out.println(course.getName()); }});
輸出:starkadam-------------starkadam-------------ChineseEnglishMathPhysical-------------ChineseEnglishMathPhysical
若是你仔細看代碼,就會發現map就是一對一的轉換,flatMap是一對多的轉換,轉換的先後類型在方法Func1中已經標的很清楚。例子:Func1(Student,String)就表明傳參是Student,返回類型是String,具體的實如今call裏面student.getName()
map和flatMap能夠看做是將咱們常常用到的嵌套循環for(i){for(j)...}...給解耦了,看起來更清楚一些,中間能夠插入更多的操做
源碼裏面的實現就是迭代,沒什麼好說
第5個操做符filter:
Observable.just(1,2,3,4,5).filter(new Func1<Integer, Boolean>() { public Boolean call(Integer integer) { return integer<4; }}).subscribe(new Action1<Integer>() { public void call(Integer integer) { System.out.println(integer); }});
先過濾再循環輸出
第6個組合操做符and/then/when
implementation 'io.reactivex:rxjava-joins:0.22.0'
Observable<String> just1 = Observable.just("A", "B");Observable<Integer> just2 = Observable.just(1, 2, 3);Pattern2<String, Integer> pattern = JoinObservable.from(just1).and(just2);Plan0<String> plan = pattern.then(new Func2<String, Integer, String>() { public String call(String s, Integer integer) { return s + integer; }});JoinObservable.when(plan).toObservable().subscribe(new Action1<String>() { public void call(String s) { System.out.println(s); }});
輸出:A1B2
第7個組合操做符merge:
Observable<Integer> odds = Observable.just(1, 3, 5);Observable<Integer> evens = Observable.just(2, 4, 6);Observable.merge(odds,evens).subscribe(new Action1<Integer>() { @Override public void call(Integer integer) { System.out.println(integer); }});
輸出:135246
第8個操做符doOnNext:
Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() { public void call(Integer integer) { if (integer > 1) { throw new RuntimeException("item exceeds maximum value"); } }}).subscribe(new Subscriber<Integer>() { public void onCompleted() { System.out.println("onCompleted"); }
public void onError(Throwable e) { System.out.println("onError"); }
public void onNext(Integer integer) { System.out.println("next:" + integer); }});
輸出:next:1onError
第9個操做符SubscribeOn(Scheduler):即申明在哪一個調度器工做
第10個:android例子:
Observable.from(new String[]{"one", "two", "three", "four", "five"}) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<String>() { public void call(String s) { System.out.println(s); } });
大體瞭解了rxjava的使用和基本原理以後,在後續的使用中遇到不懂的再看文檔https://mcxiaoke.gitbooks.io/rxdocs/content/,還有必定要看源碼,而後本身親自嘗試,才能加深理解
歡迎關注個人微信公衆號:安卓圈
本文分享自微信公衆號 - 安卓圈(gh_df75572d44e4)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。