如今RxJava已是2.0以上版本了,看到不少資料上還在講1.0的版本,所以作個簡單的筆記,供你們參考,不足之處請兄弟們留言我添加完善java
一、概述編程
RxJava(Reactive Extensions Java),RxJava基於觀察者模式,是一種響應式編程模型,目的是提供統一的接口幫助開發者便捷的處理異步數據流。app
RxJava本質上是一個異步庫,使用簡潔的邏輯處理繁瑣複雜任務。異步
二、觀察者模式ide
觀察者模式涉及到2主角函數
1)Observable :被觀察者spa
2)Observer:觀察者線程
被觀察者中有一個觀察者的列表,當被觀察者發生變化的時候回根據這張表一一通知觀察者。code
三、1.0和2.0的區別 orm
3.一、背壓
1.0背壓集中在Obserable中處理,致使有點混亂,。
2.0版本把對背壓的處理提出來了,出現了兩種觀察者模式:
Observable(被觀察者)/Observer(觀察者) 不支持背壓(Backpressure)
Flowable(被觀察者)/Subscriber(觀察者) 支持背壓(Backpressure)
Flowable是新增的
3.二、操做符修改
1.0版本
Action這類接口命名Action0、Action1…(數字表明可接受的參數),如今作出了改動
1.x | 2.x |
Action0 | Action |
Action1 | Consumer |
Action2 | BiConsumer |
... | |
ActionN |
Funx修改
1.x | 2.x |
Func | Function<T, R> A functional interface that takes a value and returns another value |
Func2 | BiFunction<T1, T2, R> |
Func3 ~ Func9 | Function3 ~ Function9
Function3<T1, T2, T3, R> Function9<T1, T2, T3, T4, T5, T6, T7, T8, T9, R> |
FuncN | Function<T, R> |
First()
1.x | 2.x |
first | firstElement |
first(Func1) | filter(predicate).first() |
firstOrDefault(T) | first(T) |
firstOrDefault(Func1, T) | first(T) |
3.三、線程調度 Schedulers
2.0去掉了Schedulers.immediate()、Schedulers.test()
3.4 、建立被觀察者的變化
1.0版本 Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("hello"); subscriber.onNext("hello"); subscriber.onNext("hello"); subscriber.onCompleted(); } }); 2.0版本 變化點1:create參數Observable.OnSubscribe改成ObservableOnSubscribe 變化點2:回到函數call(Subscriber)改成 subscribe(ObservableEmitter) 變化點3:可發射三種事件:emitter.onNext(T value)、onComplete、onError(Throwable e) Observable<Integer> observable=Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { e.onNext("hello!"); e.onNext("world!"); e.onNext("are you ok!"); e.onError(new Throwable("error")); e.onComplete(); } });
3.五、建立觀察者的變化
1.0 中建立觀察者Observer有兩種方式 方式1:採用 Observer 接口 Observer<String> observer = new Observer<String>() { @Override public void onNext(String s) { } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; 方式2:採用Subscriber接口建立, 其對 Observer接口進行了擴展,增長了onStart()、unSubscribe() Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { } @Override public void onCompleted() { } @Override public void onError(Throwable e) { } }; 2.0 建立觀察者Observer 變化點1:增長了回調方法onSubscribe(),其最早被調用,可用來作一些初始化的工做 變化點2:onCompleted()改爲 onComplete() Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { } @Override public void onError(Throwable e) { } // 注意 @Override public void onComplete() { } }
3.六、接口都增長了異常拋出
public interface Action { /** * Runs the action and optionally throws a checked exception. * @throws Exception if the implementation wishes to throw a checked exception */ void run() throws Exception; } public interface BiConsumer<T1, T2> { /** * Performs an operation on the given values. * @param t1 the first value * @param t2 the second value * @throws Exception on error */ void accept(T1 t1, T2 t2) throws Exception; } public interface Function<T, R> { /** * Apply some calculation to the input value and return some other value. * @param t the input value * @return the output value * @throws Exception on error */ R apply(T t) throws Exception; } /** * A functional interface (callback) that accepts a single value. * @param <T> the value type */ public interface Consumer<T> { /** * Consume the given value. * @param t the value * @throws Exception on error */ void accept(T t) throws Exception; }
3.7 from 變化
from -> fromArray
1.0版本 private void doFrom() { String[] items = {"item1", "item2", "item3"}; Observable.from(items) .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); } 2.0版本 private void doFrom() { String[] items = {"item1", "item2", "item3"}; Observable.fromArray(items).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }); }