RxJava是一種爲異步編程而實現的庫,異步是其重要特點,合理地利用異步編程可以提升系統的處理速度。可是異步也會帶來線程的安全問題,並且異步並不等於併發,與異步概念相對應的是同步。java
在默認狀況下,RxJava只在當前線程中運行,它是單線程的。此時Observable用於發射數據流,Observer用於接收和響應數據流,各類操做符(Operators)用於加工數據流,它們都在同一個線程中運行,實現出來的是一個同步的函數響應式。然而,函數響應式的實際應用是大部分操做都在後臺處理,前臺響應的一個過程。因此須要對剛纔的流程作一下修改,改爲Observable生成發射數據流,Operators加工數據流在後臺線程中進行,Observer在前臺線程中接受並響應數據。 此時會涉及使用多線程來操做RxJava,可使用RxJava的調度器(Scheduler)來實現。數據庫
Scheduler是RxJava對線程控制其的一個抽象,RxJava內置了多個Scheduler的實現,它們基本知足絕大多數使用場景,以下表:編程
Scheduler | 做用 |
---|---|
single | 使用定長爲1的線程池(new Sheduled Thread Pool(1)),重複利用這個線程 |
newThread | 每次都啓用新線程,並在新線程中執行操做 |
computation | 使用的固定的線程池(Fixed Scheduler Pool),大小爲CPU核數,適用於CPU密集型計算 |
io | 適用I/O操做(讀寫文件,讀寫數據庫,網絡信息交互等)所使用的Scheduler。行爲模式和newThread() 差很少,區別在於:io() 的內部實現是用一個無數量上限的線程池,可以重用空閒的線程,於是多數狀況下,io()比newThread()更有效率 |
tranpoline | 直接在當前線程運行,若是當前線程有其餘任務正在執行,則會先暫停其餘任務 |
Schedulers.from | 將java.util.concurrent.Executor轉換成一個調度器實例,便可以自定義一個Executor來做爲調度器 |
RxJava的被觀察者們在使用操做符時能夠利用線程調度器——Scheduler來切換線程,例如:安全
public void SchedulersTest() {
Observable.just("aaa", "bbb")
.observeOn(Schedulers.newThread())
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
System.out.println("Map對應的線程:" + Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
return s.toUpperCase();
}
})
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("subscribe對應的線程:" + Thread.currentThread().getName() + "\t" + Thread.currentThread().getId());
System.out.println(s);
}
});
}
複製代碼
下圖不一樣的箭頭顏色表示不一樣的線程:bash
其中,藍色表示主線程、橙色表示newThread、粉色表示I/O線程。網絡
Schedulers是一個靜態工廠類,經過分析Schedulers的源碼能夠看出他有多個不一樣類型的Scheduler。下面是Schedulers的各個工廠方法:多線程
computation()用於CPU密集型的計算任務,但並不適合I/O操做。併發
/**
* Creates and returns a {@link Scheduler} intended for computational work.
*
* @return a {@link Scheduler} meant for computation-bound work
*/
@NonNull
public static Scheduler computation() {
return RxJavaPlugins.onComputationScheduler(COMPUTATION);
}
複製代碼
io()用於I/O密集型任務,支持異步阻塞I/O操做,這個調度器的線程池會根據須要增加。對於普通的計算任務,請使用Schedulers.computation()
app
/**
* Creates and returns a {@link Scheduler} intended for IO-bound work.
*
* @return a {@link Scheduler} meant for IO-bound work
*/
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
複製代碼
在RxJava2中與在RxJava1中的做用不一樣。在RxJava2中表示當即執行,若是當前線程有任務在執行,則會將其暫停,等插入進來的新任務執行完成以後,再接着執行原先未完成的任務。在RxJava1中,表示在當前線程中等待其餘任務完成以後,再執行新的任務。異步
/**
* Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
* current work completes.
*
* @return a {@link Scheduler} that queues work on the current thread
*/
@NonNull
public static Scheduler trampoline() {
return TRAMPOLINE;
}
複製代碼
newThread()爲每一個任務建立一個新線程
/**
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
*
* @return a {@link Scheduler} that creates new threads
*/
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
複製代碼
single()擁有一個線程單例,全部的任務都在這一個線程中執行。當此線程中有任務執行時,它的任務將會按照先進先出的順序依次執行。
/**
* Returns the common, single-thread backed Scheduler instance.
*
* @return a {@link Scheduler} that shares a single backing thread.
* @since 2.0
*/
@NonNull
public static Scheduler single() {
return RxJavaPlugins.onSingleScheduler(SINGLE);
}
複製代碼
除此以外,還支持自定義的Executor來做爲調度器。
/**
* Converts an {@link Executor} into a new Scheduler instance.
*
* @param executor
* the executor to wrap
* @return the new Scheduler wrapping the Executor
*/
@NonNull
public static Scheduler from(@NonNull Executor executor) {
return new ExecutorScheduler(executor);
}
複製代碼
以下圖,Scheduler是RxJava的線程任務調度器,Worker是線程任務的具體執行者。從Scheduler源碼能夠看出,Scheduler在schedulerDirect()
、schedulerPeriodicallyDirect()
方法中建立了Worker,而後會分別調用worker的schedule()
、schedulePeriodically()
來執行任務。
圖片來源於:RxJava 線程模型分析
/**
* Schedules the execution of the given task with the given delay amount.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* @param run the task to schedule
* @param delay the delay amount, non-positive values indicate non-delayed scheduling
* @param unit the unit of measure of the delay amount
* @return the Disposable that let us one cancel this particular delayed task.
* @since 2.0
*/
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
複製代碼
/**
* Schedules a periodic execution of the given task with the given initial delay and period.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* <p>
* The periodic execution is at a fixed rate, that is, the first execution will be after the initial
* delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
*
* @param run the task to schedule
* @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
* @param period the period at which the task should be re-executed
* @param unit the unit of measure of the delay amount
* @return the Disposable that let us one cancel this particular delayed task.
* @since 2.0
*/
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
複製代碼
Worker也是一個抽象類,從圖2中能夠看到,每種Scheduler會對應一種具體的Worker。
...
/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
* <p>
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
* non-delayed scheduling of the first and any subsequent executions.
*
* @param run
* the Runnable to execute periodically
* @param initialDelay
* time to wait before executing the action for the first time; non-positive values indicate
* an non-delayed schedule
* @param period
* the time interval to wait each time in between executing the action; non-positive values
* indicate no delay between repeated schedules
* @param unit
* the time unit of {@code period}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
...
複製代碼