友好 RxJava2.x 源碼解析(二)線程切換

系列文章:java

本文 csdn 地址:友好 RxJava2.x 源碼解析(二)線程切換git

本文基於 RxJava 2.1.3github

前言

本文基於讀者會使用 RxJava 2.x 而講解,基本原理不涉及,示例只純粹爲示例而示例。

示例代碼

示例源碼:
Observable
        .create(new ObservableOnSubscribe<String>() {
            @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    Log.e("TAG", "subscribe(): 所在線程爲 " + Thread.currentThread().getName());
                    emitter.onNext("1");
                    emitter.onComplete();
                }
            })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe(): 所在線程爲 " + Thread.currentThread().getName());
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG", "onNext(): 所在線程爲 " + Thread.currentThread().getName());
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
                Log.e("TAG", "onComplete(): 所在線程爲 " + Thread.currentThread().getName());
            }
        });
複製代碼

輸出結果:ide

E/TAG: onSubscribe(): 所在線程爲 main
E/TAG: subscribe(): 所在線程爲 RxCachedThreadScheduler-1
E/TAG: onNext(): 所在線程爲 main
E/TAG: onComplete(): 所在線程爲 main
複製代碼

源碼解析

咱們能夠發現,除了 Observable 的 subscribe(ObservableEmitter) 方法執行在 io 線程,Observer 的方法都是執行在 main 線程的,接下來就請各位讀者跟着筆者來分析了。函數

Observer#onSubscribe(Dispose)

看到標題部分讀者就疑惑了,明明是說線程切換,跟 Observer#onSubscribe() 方法有什麼關係呢?前方的 log 中展現 Observer#onSubscribe() 方法在主線程執行的,可是這個主線程是由 .observeOn(AndroidSchedulers.mainThread()) 所致使的嗎?爲了解決這個疑惑,咱們能夠在外面套一個子線程,而後去執行該邏輯,代碼以下:源碼分析

new Thread() {
    @Override
    public void run() {
	    Log.e("TAG", "run: 所在線程爲 " + Thread.currentThread().getName());
        // 添加示例代碼
    }
}.start();
複製代碼

打印結果:post

run: 所在線程爲 Thread-554
onSubscribe(): 所在線程爲 Thread-554
subscribe(): 所在線程爲 RxCachedThreadScheduler-1
onNext(): 所在線程爲 main
onComplete(): 所在線程爲 main
複製代碼

因此實際上 Observer#onSubscribe() 的執行線程是當前線程,它並不受 subscribe(Scheduler)observeOn(Scheduler) 所影響(由於筆者這段代碼寫在了 Android 主線程當中,因此當前線程是主線程)。本文不在此擴展緣由,具體源碼追溯和查看前一篇文章,簡而言之—— subscribe(Observer) -> subscribeActual(Observer) -> Observer#onSubscribe(),咱們能夠看到 subscribe(Observer) 的執行線程是當前線程,而在上面所述的數據流中也不存在數據切換的過程,因此 onSubscribe() 執行的線程也是當前線程。學習

Observable#observeOn(Scheduler)

此小節針對 Observable#observeOn(Scheduler) 講解,因此將示例代碼更改以下:this

new Thread() {
    @Override
    public void run() {
        Log.e("TAG", "run: 當前默認執行環境爲 " + Thread.currentThread().getName());
        Observable
            .create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                }
            })
            // 僅保留 observeOn(Scheduler)
            .observeOn(Schedulers.io())
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(String s) {
                    Log.e("TAG", "onNext(): 所在線程爲 " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
    }
} .start();
複製代碼

輸出結果:spa

E/TAG: run: 當前默認執行線程爲 Thread-610
E/TAG: onNext(): 所在線程爲 RxCachedThreadScheduler-1
複製代碼

不做用上游 Observable

一樣的,直接先進入 Observable#observeOn(Scheduler) 源碼查看一下,發現其最終會調用 Observable#的observeOn(Scheduler, boolean, int) 方法,該方法將會返回一個 Observable 對象。那麼老問題來了,是哪一個 Observable 對象調用的 observeOn() 方法,又返回了一個怎樣的 Observable 對象?

第一個問題很簡單,是 Observable.create(ObservableOnSubscribe) 對象返回的一個 Observable,並且這個 Observable 是一個 ObservableCreate 對象(這裏不理解的能夠查看第一篇文章)。可是 Observable#observeOn(Scheduler, boolean, int) 是沒有被任何子類重寫的,這意味着它的子類都是調用它的該方法。

第二個問題來了,返回了一個怎樣的 Observable 對象呢?實際上這裏的分析流程和第一篇文章中所闡述的流程是如出一轍的,咱們戳進 Observable#observeOn(Scheduler, boolean, int) 源碼,發現它最終會返回一個 new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize) 對象,這裏咱們只關注前兩個對象,第一個參數 this 是指上游的 Observable 對象,也就是咱們第一個問題中所涉及到的 Observable 對象,第二個參數 scheduler 毋庸置疑就是咱們所傳入的 Scheduler 對象了,在此也就是咱們的 AndroidSchedulers.mainThread()

經過第一篇的學習,咱們應該會輕車熟路地打開 ObservableObserveOn 類並查看它的核心 subscribeActual() 方法以及構造函數——

final Scheduler scheduler;
final boolean delayError;
final int bufferSize;

public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 若是傳入的 scheduler 是 Scheduler.trampoline() 的狀況
    // 該線程的意義是傳入當前線程,也就是不作任何線程切換操做
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}
複製代碼

直接進入第二個 case,首先先略去第19行代碼,看到第20行代碼,source(上游 Observable) 和 Observable#subscribe() 操做都沒有任何變化,惟一改變的地方就是將 Observer 進行了封裝,因此咱們能夠所以得出結論, Observable#observeOn(Scheduler) 並不會對上游線程執行環境有任何影響。(若是看到這裏不可以理解的話,後文中會有通俗易懂的僞代碼輔助理解)

做用下游 Observer

通過上文友好 RxJava2.x 源碼解析(一)基本訂閱流程一文的分析咱們知道 ObservableEmitter 的 onNext(T) 方法會觸發「下游」 Observer 的 onNext(T) 方法,而此時的「下游」 Observer 對象是通過 Observable#observeOn(Scheduler) 封裝的 ObserveOnObserver 對象,因此咱們不妨打開 ObserveOnObserver 的 onNext(T) 方法——

@Override
public void onNext(T t) {
    // 刪除無關源碼
    queue.offer(t);
    schedule();
}
複製代碼

能夠看到 onNext(T) 方法作了兩件事——一是將當前方法傳入的對象添加進隊列;另外一是執行 schedule() 方法,打開 schedule() 方法源碼——

void schedule() {
    // 刪除無關源碼
    worker.schedule(this);
}
複製代碼

因此將會執行 worker.schedule(Runnable),可向下繼續追溯到 schedule(Runnable, long, TimeUnit ) ,該方法是一個抽象方法,因此咱們能夠想到,調度器們就是經過實現該方法來建立各色各樣的線程的。因此咱們繼續追溯到 IoScheduler 的 schedule(Runnable, long, TimeUnit) 中,源碼以下:

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        // 刪除無關源碼
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    }
複製代碼

繼續追溯下去——

@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Future<?> f;
    if (delayTime <= 0) {
        f = executor.submit((Callable<Object>)sr);
    } else {
        f = executor.schedule((Callable<Object>)sr, delayTime, unit);
    }
    sr.setFuture(f);

    return sr;
}
複製代碼

executor 是一個 ScheduledExecutorService 對象,而 ScheduledExecutorService 的父接口是咱們所熟悉的 ExecutorService 接口,因此很清晰 ScheduledExecutorService 具備建立和調度線程的能力,而其具體的實如今此就不討論了。

最後,咱們不妨將上述所提到的幾段源代碼總體抽象結合一下:

@Override
public void onNext(T t) {
    // 刪除無關源碼
    if (delayTime <= 0) {
        f = executor.submit((Callable<Object>)this);
    } else {
        f = executor.schedule((Callable<Object>)this, delayTime, unit);
    }
}
複製代碼

總結一下:onNext(T) 方法會觸發 Scheduler 對象的 schedule(Runnable, long, TimeUnit) ,該方法是一個抽象方法,由子類實現,因此纔有了多元多樣的 Schedulers.io()/Schedulers.computation()/Schedulers.trampoline() 等調度器,具體調度器的內部會使用相關的線程來 submit() 或者 schedule() 任務。解決完調度器的問題,那麼接下來就是看看 Runnable#run() 裏面的邏輯是什麼樣的,回到 ObserveOnObserver 中——

@Override
public void run() {
    drainNormal();
}
複製代碼

drainNormal() 源碼以下:

void drainNormal() {
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;

    for (;;) {
        T v;

        try {
            v = q.poll();
        } catch (Throwable ex) {
        }
        boolean empty = v == null;

        if (empty) {
            break;
        }

        a.onNext(v);
    }
}
複製代碼

能夠看到實際上最後一行執行了 Observer#onNext(T) 方法,也就是意味着「ObserveOnObserver 中觸發下一層 Observer 的 onNext(T) 操做」在指定線程執行,也就達到了切換線程的目的了。

來個複雜的例子——

示例圖

通過友好 RxJava2.x 源碼解析(一)基本訂閱流程一文咱們知道,Observer 的傳遞是由下往上的,從源頭開始,咱們自定義的 Observer 向上傳遞的時候到達第六個 Observable 的時候被線程封裝了一層,咱們不妨使用僞代碼演示一下——

public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其餘操做
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其餘操做
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其餘操做
        new Thread("Android mainThread") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}
複製代碼

Observer 繼續向上被傳遞,Observable#map() 中並未對 Observer 進行線程切換;再向上走,到達第四個 observeOn(Scheduler) 的時候,被 computation 線程嵌套了一層——

public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其餘操做
        new Thread("computation") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其餘操做
        new Thread("computation") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其餘操做
        new Thread("computation") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}
複製代碼

固然,繼續向上直到頂端 Observable——

public class Observer {
    Observer oldObserver;

    public Observer(Observer observer) {
        oldObserver = observer;
    }

    public void onNext(T t) {
        // 一些其餘操做
        new Thread("io") {
            @Override
            public void run() {
                oldObserver.onNext(t);
            }
        } .start();
    }

    public void onError(Throwable e) {
        // 一些其餘操做
        new Thread("io") {
            @Override
            public void run() {
                oldObserver.onError(e);
            }
        } .start();
    }

    public void onComplete() {
        // 一些其餘操做
        new Thread("io") {
            @Override
            public void run() {
                oldObserver.onComplete();
            }
        } .start();
    }
}
複製代碼

甚至更精簡的操做以下:

new Thread("Scheduler io") {
    @Override
    public void run() {
        // flatMap() 操做
        flatMap();
        System.out.println("flatMap 操做符執行線程:" + Thread.currentThread().getName());
        System.out.println("第二個 observeOn() 執行線程:" + Thread.currentThread().getName());
        // 第二個 observeOn() 操做
        new Thread("Scheduler computation") {
            @Override
            public void run() {
                // map() 操做
                map();
                System.out.println("map 操做符執行線程:" + Thread.currentThread().getName());
                System.out.println("第三個 observeOn() 執行線程:" + Thread.currentThread().getName());
                // 第三個 observeOn() 操做
                new Thread("Android mainThread") {
                    @Override
                    public void run() {
                        // Observer#onNext(T)/onComplete()/onError() 執行線程
                        System.out.println("Observer#onNext(T)/onComplete()/onError() 執行線程:" +
                                           Thread.currentThread().getName());
                    }
                } .start();
            }
        } .start();
    }
} .start();
複製代碼

輸出結果:

flatMap 操做符執行線程:Scheduler io
第二個 observeOn() 執行線程:Scheduler io
map 操做符執行線程:Scheduler computation
第三個 observeOn() 執行線程:Scheduler computation
Observer#onNext(T)/onComplete()/onError() 執行線程:Android mainThread
複製代碼

由此便將 Observable#observeOn(Scheduler) 是如何將下游 Observer 置於指定線程執行的流程分析完了。簡而言之 Observable#observeOn(Scheduler) 的實現原理在於將目標 Observer 的 onNext(T)/onError(Throwable)/onComplete() 置於指定線程中運行

這裏特別要注意的一點是——【線程操做符切換的是其餘的流,自身這條流是不會受到影響的。】看過知乎前一段時間的 rx 分享視頻的小夥伴應該有注意到楊凡前輩的 PPT 中有這麼一圖:

這裏寫圖片描述
想要提出兩點—— observeOn(Schedulers.io()) 所對應的 Observable 應該是受到了 subscribeOn(AndroidSchedulers.mainThread()) 影響,因此它建立的這條流應該執行於主線程;而 subscribeOn(AndroidSchedulers.mainThread()) 所對應的 Observable 則受到了 subscribeOn(Schedulers.computation) 影響,因此它建立的這條流應該執行於 computation 線程。

Observable#subscribeOn(Scheduler)

切換 subscribe 線程

示例代碼:
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
            Log.e("TAG", "被觀察者所在的線程 " + Thread.currentThread().getName());
        }
    })
    .subscribeOn(Schedulers.io())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("TAG", "onSubscribe: " + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String s) {
            Log.e("TAG", "觀察者所在線程爲 " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });
複製代碼

輸出結果:

E/TAG: onSubscribe: main
E/TAG: 觀察者所在線程爲 RxCachedThreadScheduler-1
E/TAG: 被觀察者所在的線程 RxCachedThreadScheduler-1
複製代碼

一樣地,戳進 Observable#subscirbeOn(Scheduler) 源碼,點進 ObservableSubscribeOn 查看 subscribeActual(Observer) 的具體實現,相信這對於各位讀者來講已經輕車熟路了——

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    Disposeable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
    parent.setDisposable(disposable);
}
複製代碼

第一行老套路,對下游 Observer 進行了一層封裝;第二行由於它不涉及線程切換因此此處也不作擴展;第三行就是咱們的關鍵了 Scheduler#scheduleDirect(Runnable) 方法能夠追溯到 Scheduler#schedule(Runnable, long, TimeUnit),這部分在前面已經闡述過了,就不作擴展了。SubscribeTask 是一個 Runnable,它的 run() 核心方法——

@Override
public void run() {
	source.subscribe(parent);
}
複製代碼

至此謎團解開了,Observable#subscribeOn(Scheduler)Observable#subscribe(Observer) 的執行過程移到了指定線程(在上述中也就是 io 線程),同時 Observable 和 Observer 中並未作新的線程切換處理,因此它們的訂閱、發射等操做就執行在了 io 線程。

第一次有效原理

示例代碼:
Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
            Log.e("TAG", "被觀察者所在的線程 " + Thread.currentThread().getName());
        }
    })
    .subscribeOn(Schedulers.io())
    .subscribeOn(Schedulers.computation())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("TAG", "onSubscribe: " + Thread.currentThread().getName());
        }

        @Override
        public void onNext(String s) {
            Log.e("TAG", "觀察者所在線程爲 " + Thread.currentThread().getName());
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });
複製代碼

打印結果:

onSubscribe: main
觀察者所在線程爲 RxCachedThreadScheduler-1
被觀察者所在的線程 RxCachedThreadScheduler-1
複製代碼

咱們知道,只有第一個 Observable#subscribeOn(Scheduler) 操做纔有用,然後續的 Observable#subscribeOn(Scheduler) 並不會影響整個流程中 Observerable 。一樣的,來張圖——

示例圖

前面咱們分析到,Observable#subscribeOn(Scheduler) 其實是將 Observable#subscribe(Observer) 的操做放在了指定線程,而經過友好 RxJava2.x 源碼解析(一)基本訂閱流程一文咱們知道了 subscribe 的過程是由下往上的。因此首先是第三個 Observable 調用 Observable#subscribe(Observer) 啓動訂閱,在其內部會激活第二個 Observable 的 Observable#subscribe(Observer) 方法,可是此時該方法外部被套入了一個 Schedulers.computation() 線程,因而這個訂閱的過程就被運行在了該線程中。一樣的,咱們不妨用僞代碼演示一下——

public class Observable {
    // 第「二」個 Observable
    Observable source;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("computation") {
            @Override
            public void run() {
                // 第「二」個 Observable 訂閱
                source.subscribe(observer);
            }
        }
    }
}
複製代碼

再往上走,第二個 Observable 訂閱內部會激活第一個 Observable 的 Observable#subscribe(Observer) 方法,一樣的,該方法被套在了 Schedulers.io() 線程中,以下——

public class Observable {
    // 第「一」個 Observable
    Observable source;
    Observer observer;

    public Observable(Observable source, Observer observer) {
        this.source = source;
        this.observer = observer;
    }

    public void subscribe(Observer Observer) {
        new Thread("io") {
            @Override
            public void run() {
                // 第「一」個 Observable 訂閱
                source.subscribe(observer);
            }
        }
    }
}
複製代碼

此時到達第一個 Observable 了以後就要開始發射事件了,此時的執行線程很明顯是 io 線程。還能夠換成 Thread 僞代碼來表示 ——

new Thread("computation") {
    @Override
    public void run() {
        // 第二個 Observable.subscribe(Observer) 的實質
        // 就是切換線程,效果相似以下
        new Thread("io") {
            @Override
            public void run() {
                // 第一個 Observable.subscribe(Observer) 的實質
                // 就是發射事件
                System.out.println("onNext(T)/onError(Throwable)/onComplete() 的執行線程是: " + Thread
                                   .currentThread().getName());
            }
        } .start();
    }
} .start();
複製代碼

輸出結果:

onNext(T)/onError(Throwable)/onComplete() 的執行線程是: io
複製代碼

Observable#observeOn(Scheduler) 和 Observable#subscribeOn(Scheduler)

若是針對前面的內容你已經懂了,那麼後續的內容能夠直接跳過啦,本文就結束了~若是你還沒懂,筆者再彙總一次。

通過友好 RxJava2.x 源碼解析(一)基本訂閱流程一文咱們知道,Observable#subscribe(Observer) 的順序是由下往上的,本遊會將 Observer 進行「封裝」,而後「激活上游Observable 訂閱這個 Observer」

咱們不妨抽象一個 Observer,以下:

public class Observer<T> {
    public void onNext(T t){}
    public void onCompelete(){}
    public void onError(Throwable t){}
}
複製代碼

對於 Observable#observeOn(Schedulers.computation()) 操做來講,它對 Observer 進行了怎樣的封裝呢?

public class NewObserver<T> {
    // 下游 Observer
    Observer downStreamObserver;
    public NewObserver(Observer observer) {
        downStreamObserver = observer;
    }

    public void onNext(T t) {
        new Thread("computation") {
            downStreamObserver.onNext(t);
        }
    }

    public void onError(Throwable e) {
        new Thread("computation") {
            downStreamObserver.onError(e);
        }
    }

    public void onComplete() {
        new Thread("computation") {
            downStreamObserver.onComplete();
        }
    }
}
複製代碼

Observable#observeOn(Scheduler) 內部,其對下游的 Observer 進行了相似如上的封裝,這就致使了其「下游」 Observer 在指定線程內執行。因此 Observable#observeOn(Scheduler) 是能夠屢次調用並有效的。

而對於 Observable#subscribe(Scheduler) 來講,它並未對下游 Observer 進行封裝,可是對於「激活上游 Observable 訂閱這個 Observer」這個操做它作了一點小小的手腳,也就是切換線程,咱們抽象以下——

public class ComputationObservable {
    public void subscribe(observer) {
        new Thread("computation") {
            // upstreamObservable 是上游 Observable,咱們不妨假設是下文中所提到的 IOObservable
            upstreamObservable.subscribe(observer);
        }
    }
}
複製代碼

而當它在往上遇到了一個新的 Observable#subscribe(Scheduler) 操做的時候——

public class IOObservable {
    public void subscribe(observer) {
        new Thread("io") {
            // upstreamObservable 是上游 Observable,咱們不妨下文中所提到的 TopObservable
            upstreamObservable.subscribe(observer);
        }
    }
}
複製代碼

咱們不妨假設此時已經到達了最頂端開始發射事件了——

public class TopObservable {
    public void subscribe(observer) {
        observer.onNext(t);
    }
}
複製代碼

此時的 Observer#onNext(t) 的執行環境固然就是由最後一個 subscribeOn(Scheduler) 操做符(此處的最後一個是指訂閱流程中的最後一個,它與實際寫代碼的順序相反,也就是咱們代碼中的第一個 subscribeOn(Scheduler) 操做符)所決定的了,在上述僞代碼中也就是 io 線程,僞代碼對應的源碼以下——

Observable
    .create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("1");
        }
    })
    .subscribeOn(Schedulers.io())
    .subscribeOn(Schedulers.computation())
    .subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(String s) {
        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    });複製代碼
相關文章
相關標籤/搜索