RxJava介紹3:源碼解析

源碼解析

create Observable

Observable
    .fromArray(1, 2, 3, 4)
 	.subscribe (object:Observer<Int>{
         override fun onSubscribe(d: Disposable) { }
         override fun onNext(t: Int) { }
         override fun onError(e: Throwable) { }
         override fun onComplete() { }
    })
複製代碼

首先看Observable.fromArray(1, 2, 3, 4),進入Observable類java

class Observable{
    public static <T> Observable<T> fromArray(T... items) {
        //判空
        ObjectHelper.requireNonNull(items, "items is null");
        //顯而易見的,作一些優化,0個元素和1個元素處理邏輯要簡單不少。
        if (items.length == 0) {
            return empty();
        } else
        if (items.length == 1) {
            return just(items[0]);
        }
        //1.RxJavaPlugins.onAssembly() 
        //Calls the associated hook function,調用關聯的Hook函數。
        //哦,能夠在外部提早關聯一些log啊什麼的,監控什麼的。那不影響主流程。
        //後面的分析就都省略這個RxJavaPlugins了。
        //2.建立了ObservableFromArray,因此這個是接下來的核心了。
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    
    @Override
    public final void subscribe(Observer<? super T> observer) {
       subscribeActual(observer);
    }
}
複製代碼

ObservableFromArray代碼,樸實無華!
ObservableFromArray繼承Observable。子類的subscribeActual()被Observable.subscribe()調用。實際大量的邏輯是發生在這裏。分析視角也常在此處。git

刪減了fusionMode相關代碼,此處咱們邏輯走不到,後面章節分析。github

class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) { this.array = array; }
   
    @Override
    public void subscribeActual(Observer<? super T> s) {
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        s.onSubscribe(d);
        d.run();
    }

    static final class FromArrayDisposable<T> implements Disposable {
        final Observer<? super T> actual;
        final T[] array;
        volatile boolean disposed;

        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }
        void run() {
            T[] a = array;
            int n = a.length;
            for (int i = 0; i < n && !isDisposed(); i++) {
                T value = a[i];
                if (value == null) {
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                actual.onNext(value);
            }
             if (!isDisposed()) actual.onComplete();
        }
        @Override
        public void dispose() {
            disposed = true;
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    
}
複製代碼

關於Disposable,RxJava爲咱們提供了一個開關,能夠用來取消訂閱。這像個追蹤器通常在代碼中傳來傳去,源碼讀着讀着就認不清這個了。仍是先看正向功能。 安全

map

Observable.fromArray(1, 2, 3, 4)
    .map { it * 5 }
 	.subscribe { println(it) }
複製代碼

從這裏開始,代碼彷佛變得複雜。由於RxJava有多個操做符,爲了複用邏輯。作了不少抽象和封裝。
當ObservableMap.subscribeActual()時,source.subscribe(MapObserver(yourObserver)),MapObserver是上游的Observer。同時咱們把mapper fuction,it * 5,傳給了MapObserver。
調用順序:
-->FromArrayDisposable.run()
-->MapObserver.onNext(value)
-->yourObserver.onNext(mapper.apply(value))markdown

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> {
    protected final ObservableSource<T> source;
}

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            U v = mapper.apply(t);
            actual.onNext(v);
        }
    }
}
複製代碼

BasicFuseableObserver去掉fusionmode的部分,仍是簡單的。MapObserver(即BasicFuseableObserver),包含上游Disposable和下游Observer。多線程

/** * Base class for a fuseable intermediate observer. * @param <T> the upstream value type * @param <R> the downstream value type */
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {

    /** The downstream subscriber. */
    protected final Observer<? super R> actual;

    /** The upstream subscription. */
    protected Disposable s;

    public BasicFuseableObserver(Observer<? super R> actual) {
        this.actual = actual;
    }

    @Override
    public final void onSubscribe(Disposable s) {
            this.s = s;
            actual.onSubscribe(this);     
        }
    }
    @Override
    public void onError(Throwable t) { actual.onError(t); }

    @Override
    public void onComplete() { actual.onComplete();}

    @Override
    public void dispose() {s.dispose();}

    @Override
    public boolean isDisposed() {return s.isDisposed(); }
}
複製代碼

能夠看到map自身的MapObserver訂閱上游Observable。
截屏2021-05-11 下午4.48.42.png app

subscribeOn

回憶上面章節中提到的代碼ide

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.subscribeOn(Schedulers.newThread())
    .subscribe { println("in next :${Thread.currentThread()} $it") }

//運行結果
in main:Thread[main,5,main]
in create:Thread[RxNewThreadScheduler-1,5,main]
in next  :Thread[RxNewThreadScheduler-1,5,main] 5
複製代碼

Schedulers.newThread()建立NewThreadScheduler。scheduler內容不是此部分重點。NewThreadScheduler.scheduleDirect(Runnable)最終調用ExecutorService.submit(Runnable)。把runnable扔到線程池中執行。
這裏的runnable是SubscribeTask。在新線程中執行source.subscribe(SubscribeOnObserver)。若是上游沒有相關線程切換的操做。那麼整個執行過程從main線程切換到新線程。
整個鏈式過程等價於ExecutorService.submit{source.subscribe(SubscribeOnObserver)}.
再看SubscribeOnObserver,包含了yourObserver。函數

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        @Override
        public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); }
        @Override
        public void onNext(T t) {actual.onNext(t);}
        @Override
        public void onError(Throwable t) {actual.onError(t);}
        @Override
        public void onComplete() {actual.onComplete();}
    }
    
    final class SubscribeTask implements Runnable {
    	private final SubscribeOnObserver<T> parent;
    	@Override
    	public void run() {source.subscribe(parent);}
	}
}
複製代碼

�subscibeOn能夠這個理解
截屏2021-05-11 下午8.14.03.png oop

observeOn

println("in main:${Thread.currentThread()}")
Observable.create<Int> {
        println("in create:${Thread.currentThread()}");
        it.onNext(1) }
	.observeOn(Schedulers.newThread())
    .subscribe { println("in next :${Thread.currentThread()} $it") }
//運行結果
in main:Thread[main,5,main]
in create:Thread[main,5,main]
in next  :Thread[RxNewThreadScheduler-1,5,main] 5
複製代碼

咱們來看ObservableObserveOn代碼, 對照源碼,這裏面的代碼我作了大量的刪減。 但不考慮外部取消或者內部異常,確實是在這麼執行的。 刪掉了fusion的代碼,ObservableCreate還用不到。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    //實際運行的線程相關
    final Scheduler scheduler;
    
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        Scheduler.Worker w = scheduler.createWorker();
        //subscribe()仍然在源線程執行哦,而subscribeOn把整個source.subscribe扔進新線程。
        //把Scheduler和yourObserver傳給了新的ObserveOnObserver。
		source.subscribe(new ObserveOnObserver<T>(observer, w));
    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        SimpleQueue<T> queue;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker) {
            this.actual = actual;
            this.worker = worker;
        }
        @Override
        public void onSubscribe(Disposable s) {
            //SpscLinkedArrayQueue,我只知道是線程安全且無synchronized代碼的隊列
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
			actual.onSubscribe(this);
        }

        //在Scheduler線程中執行run(),即drainNormal()。咱們發現上下游不在同一個線程了。
        //生產者消費者問題來了,這裏咱們假定生產速度 > 消費速度。
        @Override
        public void onNext(T t) {
            //放入線程共享的隊列,生產行爲
            queue.offer(t);
            schedule();
        }
        void schedule() {
            //首次調用時,get值爲0,同時值+1,值理解爲須要處理的次數
            //若是get值若是爲0,觸發消費行爲,不然不觸發。
            if (getAndIncrement() == 0) worker.schedule(this);
        }
        @Override
        public void run() { drainNormal(); } 

       //drain,咱們要把管道中的水給排幹。
        void drainNormal() {
            //可以進入drain,至少有一個數據。
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
            //初次看到這for-for,也是懵的。實際這是一種處理生產者消費者問題的代碼模型。
            for (;;) {
                for (;;) {
                    //1.先看內層循環,q即SpscLinkedArrayQueue,是不阻塞的。
                    //排空一個個數據,沒有數據了就跳出。
                    T v = q.poll();
					if (v == null)  break;
                   	a.onNext(v);//yourObserver.onNext
                }
                //2.還記得上面的假定生產速度 > 消費速度。
                //在消費過程當中,隊列又加入了一些數據,而getAndIncrement()!=0,沒法進入消費線程。
                //missed記錄了當前內層循環須要處理的請求次數。
                //更新missed值,當前請求次數-上輪處理的請求次數。
                missed = addAndGet(-missed);
                if (missed == 0) break;
                //3.值得注意的是,這個地方請求次數與數據數量是相等的。但這個模型中,並不強制要求。
            }
        }
       ...
    }
}
複製代碼

關於observeOn,大膽點講最終就只有yourObserver運行在新線程中。
截屏2021-05-11 下午8.56.43.png
上述代碼中第一次見識到了drain代碼。顯然, 能夠加鎖來解決。但這裏介紹一種wip技巧(Working-In-Progress)使用CAS(Compare And Set)來解決。RxJava2中,一般使用這種方式作多線程處理。

Scheduler

從上面來看,咱們大概知道Scheduler就是把代碼場景扔進另外的線程運行。Scheduler經過java線程池管理線程。

核心代碼

下面是一段僞代碼

public abstract class Scheduler {
    
    public abstract Worker createWorker();

    public Disposable scheduleDirect(@NonNull Runnable run) {
        createWorker().schedule(run);
    }
    public abstract static class Worker implements Disposable {
        ExecutorService  executor;
        public Disposable schedule(@NonNull Runnable run) {
            Task task = new Task(run);
            executor.submit(task)
        }
    }
    //在RxJava中,Task也許叫ScheduledRunnable,也許叫ScheduledDirectTask
    public static class Task implements Callable<Void>,Disposable{
        private Runnable run;
    	@Override
    	public Void call() {run.run();return null;}
        @Override
        public void dispose() {...}
        @Override
        public boolean isDisposed() {...}
    }
}
複製代碼

實際的調用是這樣的.

scheduler.worker.schedule(Runnable{須要扔進新線程的代碼})
executor.submit(Task(runnable))
複製代碼

關於Task

在RxJava中,Task也許叫ScheduledRunnable或者ScheduledDirectTask等等。
Task包裹了咱們的runnable,同時提供了對runnable的控制。
在ObserveOnObserver中,每次next執行都會產生一個task。而在ObservableSubcribeOn中 source.subcribe,就只有一個task,由於是一次性的啊。
每一個Worker中的task是串行執行的。

關於Scheduler

Schedulers提供了適用不一樣場景的調度器:

  • Schedulers.immediate()

默認,不指定線程

  • Schedulers.newThread():

在一個新線程執行,實際真的只有一個線程。每一個worker都有個獨立的線程池。

  • Schedulers.computation():

適用CPU計算操做線程,限定了最大線程數量,數量爲JVM處理器個數。預先生成固定數量PoolWorker。會出現不一樣場景共用PoolWorker的狀況。

  • Schedulers.io():

適用io操做線程,線程數量並無上限。有一個CachedWorkerPool,那實際的worker是能夠複用的。

  • Schedulers.trampoline():

當前線程執行,不會當即執行,等前一個任務完成。當前任務入隊執行。

  • Schedulers.single():

相較於Schedulers.newThread(),single建立的全部worker公用線程池。

  • AndroidSchedulers.mainThread()

在Android主線程執行,內部經過Handler實現,也只能經過Handler。關於RxAndroid。​

相關文章
相關標籤/搜索