Observable
.fromArray(1, 2, 3, 4)
.subscribe (object:Observer<Int>{
override fun onSubscribe(d: Disposable) { }
override fun onNext(t: Int) { }
override fun onError(e: Throwable) { }
override fun onComplete() { }
})
複製代碼
首先看Observable.fromArray(1, 2, 3, 4)
,進入Observable類java
class Observable{
public static <T> Observable<T> fromArray(T... items) {
//判空
ObjectHelper.requireNonNull(items, "items is null");
//顯而易見的,作一些優化,0個元素和1個元素處理邏輯要簡單不少。
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
//1.RxJavaPlugins.onAssembly()
//Calls the associated hook function,調用關聯的Hook函數。
//哦,能夠在外部提早關聯一些log啊什麼的,監控什麼的。那不影響主流程。
//後面的分析就都省略這個RxJavaPlugins了。
//2.建立了ObservableFromArray,因此這個是接下來的核心了。
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
@Override
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
}
複製代碼
看ObservableFromArray
代碼,樸實無華!
ObservableFromArray繼承Observable。子類的subscribeActual()被Observable.subscribe()調用。實際大量的邏輯是發生在這裏。分析視角也常在此處。git
刪減了fusionMode相關代碼,此處咱們邏輯走不到,後面章節分析。github
class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) { this.array = array; }
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
d.run();
}
static final class FromArrayDisposable<T> implements Disposable {
final Observer<? super T> actual;
final T[] array;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) actual.onComplete();
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
}
}
複製代碼
關於Disposable,RxJava爲咱們提供了一個開關,能夠用來取消訂閱。這像個追蹤器通常在代碼中傳來傳去,源碼讀着讀着就認不清這個了。仍是先看正向功能。 安全
Observable.fromArray(1, 2, 3, 4)
.map { it * 5 }
.subscribe { println(it) }
複製代碼
從這裏開始,代碼彷佛變得複雜。由於RxJava有多個操做符,爲了複用邏輯。作了不少抽象和封裝。
當ObservableMap.subscribeActual()時,source.subscribe(MapObserver(yourObserver)),MapObserver是上游的Observer。同時咱們把mapper fuction,it * 5
,傳給了MapObserver。
調用順序:
-->FromArrayDisposable.run()
-->MapObserver.onNext(value)
-->yourObserver.onNext(mapper.apply(value))markdown
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
protected final ObservableSource<T> source;
}
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v = mapper.apply(t);
actual.onNext(v);
}
}
}
複製代碼
BasicFuseableObserver去掉fusionmode的部分,仍是簡單的。MapObserver(即BasicFuseableObserver),包含上游Disposable和下游Observer。多線程
/** * Base class for a fuseable intermediate observer. * @param <T> the upstream value type * @param <R> the downstream value type */
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
/** The downstream subscriber. */
protected final Observer<? super R> actual;
/** The upstream subscription. */
protected Disposable s;
public BasicFuseableObserver(Observer<? super R> actual) {
this.actual = actual;
}
@Override
public final void onSubscribe(Disposable s) {
this.s = s;
actual.onSubscribe(this);
}
}
@Override
public void onError(Throwable t) { actual.onError(t); }
@Override
public void onComplete() { actual.onComplete();}
@Override
public void dispose() {s.dispose();}
@Override
public boolean isDisposed() {return s.isDisposed(); }
}
複製代碼
能夠看到map自身的MapObserver訂閱上游Observable。
app
回憶上面章節中提到的代碼ide
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.subscribeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
複製代碼
Schedulers.newThread()建立NewThreadScheduler。scheduler內容不是此部分重點。NewThreadScheduler.scheduleDirect(Runnable)最終調用ExecutorService.submit(Runnable)。把runnable扔到線程池中執行。
這裏的runnable是SubscribeTask。在新線程中執行source.subscribe(SubscribeOnObserver)
。若是上游沒有相關線程切換的操做。那麼整個執行過程從main線程切換到新線程。
整個鏈式過程等價於ExecutorService.submit{source.subscribe(SubscribeOnObserver)}
.
再看SubscribeOnObserver,包含了yourObserver。函數
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); }
@Override
public void onNext(T t) {actual.onNext(t);}
@Override
public void onError(Throwable t) {actual.onError(t);}
@Override
public void onComplete() {actual.onComplete();}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
@Override
public void run() {source.subscribe(parent);}
}
}
複製代碼
println("in main:${Thread.currentThread()}")
Observable.create<Int> {
println("in create:${Thread.currentThread()}");
it.onNext(1) }
.observeOn(Schedulers.newThread())
.subscribe { println("in next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next :Thread[RxNewThreadScheduler-1,5,main] 5
複製代碼
咱們來看ObservableObserveOn
代碼, 對照源碼,這裏面的代碼我作了大量的刪減。 但不考慮外部取消或者內部異常,確實是在這麼執行的。 刪掉了fusion的代碼,ObservableCreate還用不到。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//實際運行的線程相關
final Scheduler scheduler;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
Scheduler.Worker w = scheduler.createWorker();
//subscribe()仍然在源線程執行哦,而subscribeOn把整個source.subscribe扔進新線程。
//把Scheduler和yourObserver傳給了新的ObserveOnObserver。
source.subscribe(new ObserveOnObserver<T>(observer, w));
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
final Observer<? super T> actual;
final Scheduler.Worker worker;
SimpleQueue<T> queue;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker) {
this.actual = actual;
this.worker = worker;
}
@Override
public void onSubscribe(Disposable s) {
//SpscLinkedArrayQueue,我只知道是線程安全且無synchronized代碼的隊列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
//在Scheduler線程中執行run(),即drainNormal()。咱們發現上下游不在同一個線程了。
//生產者消費者問題來了,這裏咱們假定生產速度 > 消費速度。
@Override
public void onNext(T t) {
//放入線程共享的隊列,生產行爲
queue.offer(t);
schedule();
}
void schedule() {
//首次調用時,get值爲0,同時值+1,值理解爲須要處理的次數
//若是get值若是爲0,觸發消費行爲,不然不觸發。
if (getAndIncrement() == 0) worker.schedule(this);
}
@Override
public void run() { drainNormal(); }
//drain,咱們要把管道中的水給排幹。
void drainNormal() {
//可以進入drain,至少有一個數據。
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//初次看到這for-for,也是懵的。實際這是一種處理生產者消費者問題的代碼模型。
for (;;) {
for (;;) {
//1.先看內層循環,q即SpscLinkedArrayQueue,是不阻塞的。
//排空一個個數據,沒有數據了就跳出。
T v = q.poll();
if (v == null) break;
a.onNext(v);//yourObserver.onNext
}
//2.還記得上面的假定生產速度 > 消費速度。
//在消費過程當中,隊列又加入了一些數據,而getAndIncrement()!=0,沒法進入消費線程。
//missed記錄了當前內層循環須要處理的請求次數。
//更新missed值,當前請求次數-上輪處理的請求次數。
missed = addAndGet(-missed);
if (missed == 0) break;
//3.值得注意的是,這個地方請求次數與數據數量是相等的。但這個模型中,並不強制要求。
}
}
...
}
}
複製代碼
關於observeOn,大膽點講最終就只有yourObserver運行在新線程中。
上述代碼中第一次見識到了drain代碼。顯然, 能夠加鎖來解決。但這裏介紹一種wip技巧(Working-In-Progress)使用CAS(Compare And Set)來解決。RxJava2中,一般使用這種方式作多線程處理。
從上面來看,咱們大概知道Scheduler就是把代碼場景扔進另外的線程運行。Scheduler經過java線程池管理線程。
下面是一段僞代碼
public abstract class Scheduler {
public abstract Worker createWorker();
public Disposable scheduleDirect(@NonNull Runnable run) {
createWorker().schedule(run);
}
public abstract static class Worker implements Disposable {
ExecutorService executor;
public Disposable schedule(@NonNull Runnable run) {
Task task = new Task(run);
executor.submit(task)
}
}
//在RxJava中,Task也許叫ScheduledRunnable,也許叫ScheduledDirectTask
public static class Task implements Callable<Void>,Disposable{
private Runnable run;
@Override
public Void call() {run.run();return null;}
@Override
public void dispose() {...}
@Override
public boolean isDisposed() {...}
}
}
複製代碼
實際的調用是這樣的.
scheduler.worker.schedule(Runnable{須要扔進新線程的代碼})
executor.submit(Task(runnable))
複製代碼
在RxJava中,Task也許叫ScheduledRunnable或者ScheduledDirectTask等等。
Task包裹了咱們的runnable,同時提供了對runnable的控制。
在ObserveOnObserver中,每次next執行都會產生一個task。而在ObservableSubcribeOn中 source.subcribe,就只有一個task,由於是一次性的啊。
每一個Worker中的task是串行執行的。
Schedulers提供了適用不一樣場景的調度器:
默認,不指定線程
在一個新線程執行,實際真的只有一個線程。每一個worker都有個獨立的線程池。
適用CPU計算操做線程,限定了最大線程數量,數量爲JVM處理器個數。預先生成固定數量PoolWorker。會出現不一樣場景共用PoolWorker的狀況。
適用io操做線程,線程數量並無上限。有一個CachedWorkerPool,那實際的worker是能夠複用的。
當前線程執行,不會當即執行,等前一個任務完成。當前任務入隊執行。
相較於Schedulers.newThread(),single建立的全部worker公用線程池。
在Android主線程執行,內部經過Handler實現,也只能經過Handler。關於RxAndroid。