1、Rxjava的產生背景
1、進行耗時任務
傳統解決辦法:
傳統手動開啓子線程,聽過接口回調的方式獲取結果
傳統解決辦法的缺陷:
隨着項目的深刻、擴展。代碼量的增大會產生回調之中套回調的,耦合度高度增長的不利場景。對代碼維護和擴展是很嚴重的問題。
RxJava本質上是一個異步操做庫
優勢:
使用簡單的邏輯,處理複雜 ,困難的異步操做事件庫;在必定程度上替代handler、AsyncTask等等
2、傳統的觀察者模式
使用場景
一、一個方面的操做依賴於另外一個方面的狀態變化
二、若是在更改一個對象的時候,須要同時連帶改變其餘的對象(不肯定有多少對象須要改變)
三、當一個對象必須通知其餘的對象,可是又但願這個對象和其餘被通知的對象是鬆散耦合度的關係
在App開發過程當中,有一個對象的狀態數據須要時常關注,不少個頁面的Ui都跟這個對象又有綁定關係。當這個對象發生改變的時候就須要通知全部跟他有關係的Ui都進行相應的改變。這種狀況下就是一種觀察者模式的使用場景。
簡單來講:
A對象 對B對象的數據高度敏感,當B對象變化的一瞬間,A對象要作出反應。這時候A對象就是觀察者,B對象就是被觀察者
觀察者模式說白了就是衆多的觀察者對被觀察者的數據高度敏感變化的自身的一種反應。其反應的是一種 多對一的 關係。
組成:
(一)<interface>Observerable 被觀察者接口
a、registerObserver() :將觀察者註冊到被觀察者當中,是一個訂閱方法
b、removeObserver():將觀察者從被觀察者中移除,取消訂閱
c、notifyObservers():當被觀察者狀態改變的時候,該方法就會被調用 *****
ps:內部會調用觀察者的 update() 函數,來通知觀察者作出相應的數據改變,依次循環遍歷整個觀察者數量並獲取到觀察者並調用update()方法 進行相應的更新操做
(二)<class> ConcreteObserverable 被觀察者具體的實現
實現了被觀察者接口中的abc方法,而且定義了一個List<Observer>observers 用來保存註冊好的觀察者對象的
ps: 因爲集合的範型參數 是接口 類型 因此不能是具體的Observer 實現類,只能是Observer的接口
接口的定義和設計 要爲了之後的拓展而考慮
解析:這樣作的緣由
讓一個被觀察者可能會有多個實現類的觀察者都有可能實現了Observerable 這個接口 。
這樣就能把觀察者和被觀察者 經過List 這個集合 進行解耦
(三)<interface> Observer 觀察者接口
Update() :接口
與被觀察者的notifyObservers()關聯進行相應的數據變化
(四)<class > ConcreteObserver 具體的觀察者
實現 update() 或是其餘方法
3、Rxjava觀察者模式和基本用法
Rxjava 四要素
一、被觀察者
二、觀察者
三、訂閱
四、事件
事件
響應式編程 是基於異步數據流概念的編程模式。響應式編程的一個核心概念 事件
步驟
一、建立被觀察者:create
rxjava中:決定何時觸發事件,以及決定觸發怎樣的事件
//第一步、建立被觀察者:Create
//Observable 也會轉換成subscriber進行響應的處理
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
//按代碼當中的順序進行響應式調用
//這裏的Subscriber就是觀察者,在被觀察者的回調中調用了觀察者的方法實際上就是一種事件的傳遞
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onCompleted();
}
});
//第二種建立 被觀察者對象的方法:
//經過just 方法 來建立被觀察者對象
Observable observableJust = Observable.just("1","2」); //最多有10個String參數
//第三種建立,經過from方法,把參數做爲字符數組,而後添加到參數裏
String[] parameters = {「1」,」2"};
Observable observableFrom = Observable.from(parameters);
//第二步、建立觀察者:observer
//第二步、建立觀察者Observer,決定事件觸發會有怎樣的行爲
Observer<Object> observer = new Observer<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
//實際就是傳統觀察者模式中的update()方法
@Override
public void onNext(Object o) {
}
};
//第三步,訂閱,經過被觀察者.subscribe(觀察者)
public void doRxjava() {
//第三部,訂閱
observable.subscribe(observer);
}
ps:注意 是被觀察者 訂閱 觀察者 !爲了經過流式api 進行不一樣的操做符操做、線程控制都能經過鏈式調用來完善。
4、Rxjava建立Observable & observer
一、Observable(被觀察者)
二、OnSubscribe 對象 :被觀察者用來 通知觀察者的notifyObservers() 函數
三、Subscriber 觀察者
四、subscribe() ;經過該方法完成觀察者與被觀察之間的訂閱
A、建立被觀察者、Observable:
同時建立Observable 內部的一個onSubscribe對象做爲參數傳遞到create()方法當中
Observable observable =
Observable.create(new Observable.OnSubscribe<String>()
//內部傳入一個OnSubscribe參數,最終會賦值給Observable的成員變量 onSubscribe
public static <T> Observable<T> create(OnSubscribe<T> f) {
ps:hook 能夠被看做是一抽象的代理類,該代理類默認狀況下不會對onSubscribe作任何處理。
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
//create()構造了一個新的被觀察者Observable對象,同時將參數賦值給Observable的成員變量 onSubscribe,生成被觀察者對象
return new Observable<T>(hook.onCreate(f));
}
B、建立觀察者、Subscriber or Observer :實現了Observer接口,和Subscription接口;
Subscription接口的實現:
void unsubscribe();//進行接觸綁定——再也不有訂閱事件了,訂閱事件列表爲空了調用發方法
boolean isUnsubscribed();//判斷是否接觸綁定,判斷是否已經取消了訂閱事件
public abstract class Subscriber<T> implements Observer<T>, Subscription
private final SubscriptionList subscriptions;//訂閱事件的集合 ,在這個集合List當中保存了全部這個觀察者的訂閱事件,當取消訂閱的時候,該List會有事件被刪除
C、訂閱關係:調用observable 內部的subscribe() 完成訂閱
public final Subscription subscribe(final Observer<? super T> observer) {
//傳入一個observer ,調用subscribe,最終轉型成subscribe
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber,
Observable<T> observable) {
…
subscriber.onStart();//空方法實現,須要的時候本身調用並實現
if (!(subscriber instanceof SafeSubscriber)) { //將subscriber 包裝成SafeSubscriber
ps:在SafeSubscriber()中執行了onCompled()、和onError()方法 ,就不會再執行onNext()方法
subscriber = new SafeSubscriber<T>(subscriber);
}
…
//調用完call方法,意味着完成了一次訂閱
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
ps:完成訂閱後默認會觸發 Observable.Onsubscribe<String>中的call 函數
5、Rxjava 的操做符
變換: 就是將事件序列中的對象 或整個序列進行加工再處理,轉換成不一樣的事件或是事件序列
map操做符:就是用來把一個事件轉換爲另外一個事件
/**
* 經過被觀察者Observabled 調用just方法建立被觀察者並傳入圖片的路徑地址,調用map操做符,對原來的觀察者進行數據流的變化操做。將String類型的圖片路徑轉換成bitmap,來完成map操做符的調用,map操做符會建立一個新的Observable 對象而後再鏈式調用subscribe完成訂閱關係
*/
private void map() {
Observable.just("map/image/map.png")
//經過map的鏈式調用,將String轉換成bitmap對象
.map(new Func1<String, Bitmap>() {
//Func1 是Rxjava中的接口,用於包裝含有參數的方法,
// func1中第一個參數的類型就表明Observable發射當前的類型;第二個參數是String類型將要轉換的類型
@Override
public Bitmap call(String filepath) {
return getBitmap(filepath);
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
//...
}
});
}
map()函數接受一個Func1類型的參數,而後把這個Func1應用到每個由Observable發射的值上,將發射的值轉換爲咱們指望的值。
將參數重String類型參數 轉換成Bitmap 並返回
6、Rxjava 的map操做符的原理
lift()方法是Rxjava 全部操做符的核心方法
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
ps:OperatorMap 是實現了Operator操做符的一個接口
public final class OperatorMap<T, R> implements Operator<R, T>
在該類中核心方法call方法內,接受外部傳遞給它的subscriber觀察者
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
...
@Override
public void onNext(T t) {
try {
//經過調用onNext()完成觀察數據流的轉化
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
…
}
而transformer 是定義在func1 這個接口下的,經過Func1類中 call()方法的調用完成T —> 轉換成 R的操做
final Func1<? super T, ? extends R> transformer;
//transformer的做用就是將範型<T , > 轉換成 範型 < ,R>
public interface Func1<T, R> extends Function {
R call(T t);
}
//基本全部操做符內部都會用到lift ()函數的內部相關原理 ;
lift : 本質上是針對事件序列的處理和再發送
lift():
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//一、首先內部生成了一個新的Observable 被觀察者對象並返回
ps:new Observable——代理主要負責接收原始的Observable 發出的事件,當建立好了 new Observable 會將其發送給下面
的 Subscriber<? super T> st 讓其進行處理
returnnew Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//在新對象的OnSubscribe對象當中,經過call方法調用 拿到以前的生成的Observable,生成一個新的Subscriber對象
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
//將新建的Subscriber做爲參數傳遞到call 方法中,在這給call方法中 完成了訂閱工做
onSubscribe.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
});
}
7、Rxjava 的flatmap操做符
flatmap 和map有一個共同點 : 均是用來進行事件轉化的
map 是將 String 類型 ——轉化——>bitmap ,一一對應,映射成一個新的事件
flatmap 是將 String 類型——轉化——>Observable————將全部的事件轉化成一個Observable而後由這個Observable進行統一的事件分發
/**
* flatMap -輸入URi地址返回Uri 列表
*/
private Subscription processNetAddress() {
return Observable.just(
)
//一、將傳入的String類型 事件對象,轉換成Observable()類型對象
.flatMap(new Func1<String, Observable<String>>() {
//二、不會直接發送這個Observable ,而是將這個Observable激活讓他本身開始發送事件
@Override
public Observable<String> call(String s) {
return null;
}
})
//三、每個建立出來的Observable發送的事件,都被匯入同一個Observable
//接收到上面一連串的字符串完成輸出
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
}
8、Rxjava 的線程控制
在默認不指定線程的狀況下,Rxjava 遵循的是線程不變的原則。
也就算說在哪一個線程調用的subscribe ()訂閱方法,就會在哪一個線程生產事件;因此在哪一個線程生產了事件,就在哪一個線程消費事件
Schedulers——線程控制符
Rxjava 經過該類進行線程的調度
Schedulers.immediate()
默認狀況下,在當前線程運行,不切換任何線程默認操做
Schedulers.newThread()
老是啓用新線程,在新線程中執行相應的操做
Schedulers -io()
執行IO 流操做,讀寫文件之類的
ps:區別於newThread(),io()內部實現是一個無數量的線程池,能夠更好的利用線程效率,優於newThread()
Schedulers-computation()
cpu密集計算使用的Schedulers
AndroidSchedulers.mainThread()
將指定的操做放在Android 的主線程中執行
線程控制
一、subscribeOn()
指定subscribe 訂閱觀察者時候所發生的線程,也就是Observerable內部的OnSubscribe()被激活時候所處的線程,通知各個觀察者開始執行相應的操做
二、observeOn()
指定subscribeOn()所運行在的線程,事件消費所在的線程
public void doThreadWithScheduler() {
Observable.just("x", "y", "z」)
//前面經過just()方法,所建立的3個事件內容的發出,會在io線程發出,後臺線程進行數據讀取
.subscribeOn(Schedulers.io())
//指定了subscribeOn訂閱的時候call方法裏函數將發生在主線程中,主線程顯示數據
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("thread" + s);
}
});
}
public void doWeatherCompute() {
//create 方法建立Observable對象,內部傳入OnSubscribe對象,該方法能夠被理解成爲notifyObservers()方法,
由它通知觀察者去進行相應的操做,也就是調用觀察者的 onNext()操做
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("");
// ...
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
}
});
經過線程Schedulers控制符+subscribeOn()和observeOn()完成主線程和子線程間的工做切換,替代複雜 ,易發生錯誤的newThreard() 和handler()
public void doWeatherCompute() {
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("");
// ...
}
})
//指定subscribe(),發生在io線程
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() { //訂閱上面的call()方法發生在io線程,能夠進行耗時操做
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
//ui 顯示工做的 邏輯 在onNext()
}
});
}
}
思考:subscribeOn 和 observeOn 各能調用幾回呢? 1/無限
答 :
observeOn()指定的是以後的工做所在的線程,所以有屢次切換線程的需求,只要在每個切換線程的位置調用一次observeOn()就能知足需求。因此說observeOn()支持屢次調用!
subscribeOn()位置能夠放在observeOn()先後均可以 。可是!! subscribeOn()只能調用一次。
————subscribeOn()源碼
public final Observable<T> subscribeOn(Scheduler scheduler) {//返回值是一個Observable對象,返回建立一個新的被觀察者,經過新的被觀察者進行下面的事件
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
ps:OperatorSubscribeOn
implements OnSubscribe<T> 實現了該接口
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
public void call(final Subscriber<? super T> subscriber) {
//用於線程控制 實現了Subscription接口 用於看是否取消訂閱的操做,取消後就不會接受觀察者發送的各種事件了
final Workerinner = scheduler.createWorker();
subscriber.add(inner);
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
//根據傳遞的subscriber 建立一個新的Subscriber
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
//根據新的subscriber 通知到目標subscriber並調用其onNext()方法
subscriber.onNext(t);
}
…
//內部對線程進行一些判斷
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
//在該方法裏進行線程的控制操做
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
source.unsafeSubscribe(s);//source其實是一個Obsrvable對象
…
}
同理,觀察者 Subscriber 也是現實了Subscription、和Observer接口的緣由是當咱們的觀察者Subscriber取消訂閱的時候,將持有事件列表中的全部Subscription 訂閱所有取消。也就不會再接受訂閱事件。
createWorker():
public Worker createWorker() {
return new NewThreadWorker(THREAD_FACTORY);//經過newThreadWorker()完成建立
}
NewThreadWorker():
public NewThreadWorker(ThreadFactory threadFactory) {
//經過線程池 建立並操做線程
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}
inner.schedule(): 是經過一個抽象類實現的
abstract Subscription schedule,在其具體的實現類之一NewThreadWoker中經過schedule()方法實現具體邏輯
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {//經過executor 的方法能夠判斷其仍是根據線程池完成操做
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
source.unsafeSubscribe():
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
// new Subscriber so onStart it
subscriber.onStart();
// allow the hook to intercept and/or decorate
//在這裏調用call()函數代表整個subscribeOn()操做已經完成
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
subscriber.onError(hook.onSubscribeError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
hook.onSubscribeError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}
SubscribeOn方法小結
一、會新生成一個Observable
二、在其內部,onSubscribe對象會在目標Subscriber訂閱時候使用傳入的Scheduler的worker做爲線程調度執行者
三、在對應的線程中通知原始Observable發送消息給這個過程當中臨時生成的Subscriber
四、這個Subscriber又會通知到目標Subscriber,從而完成咱們的subscribeOn的過程
————observeOn()源碼
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//最終經過lift() 函數操做
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
class OperatorObserveOn<T> implements Operator<T, T>實現了操做符的接口
…
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
//完成線程切換實際的對象ObserveOnSubscriber
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0{
…
@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
return;
}
//將結果緩存到隊列當中
if (!queue.offer(on.next(t))) {
onError(new MissingBackpressureException());
return;
}
schedule(); //開啓真正的線程切換
}
protected void schedule() {
if (counter.getAndIncrement() == 0) {
recursiveScheduler.schedule(this);
}
}
}
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
//線程調度方法,同上面的方法同樣 也是使用executor併發庫來進行線程調度
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
...
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
...
}
SubscribeOn ()和 ObserveOn()
subscribeOn ()是經過新建Observable的方式,使用OnSubscribe()類的方式去作到線程切換的。不斷的調用subscribeOn()實際上最終只會使用第一個subscribeOn()的方法。
observeOn()是經過opeartor 操做符的形式去完成線程切換的,因此它的做用域和其餘操做符同樣,是調用observeOn(0以後的鏈路
一、observeOn()指定的是它以後的操做所在的線程,經過observeOn()的屢次調用,程序實現了線程的屢次切換;
二、subscribeOn()的位置放在哪裏均可以,但它是隻能調用一次的,緣由就是subscribeOn是經過新建Observable的方法建立的