RxJava系列2:Schedulers

 

學習小目標java

一、瞭解Schedulers類,以及經過其建立的各類Scheduler的用途react

二、結合上一篇RxJava系列1:網絡請求中的線程切換理解該類建立的各種型線程做用android

 

上一篇 RxJava系列1:網絡請求中的線程切換數據庫

講到了線程的切換,其中使用到了observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())bash

這篇文章主要是學習Schedulers.io()和AndroidSchedulers.mainThread()等一些列相關參數,首先咱們須要瞭解Schedulers類網絡

 

一、Schedulers類app

用於返回標準Scheduler實例的靜態工廠方法異步

該類經過靜態內部類的方式返回不一樣的Scheduler,先看源碼,而後對各實例進行解釋async

 

Schedulers源碼:ide

public final class Schedulers {
    static final Scheduler SINGLE;

    static final Scheduler COMPUTATION;

    static final Scheduler IO;

    static final Scheduler TRAMPOLINE;

    static final Scheduler NEW_THREAD;

    static final class SingleHolder {
        static final Scheduler DEFAULT = new SingleScheduler();
    }

    static final class ComputationHolder {
        static final Scheduler DEFAULT = new ComputationScheduler();
    }

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

    static final class NewThreadHolder {
        static final Scheduler DEFAULT = NewThreadScheduler.instance();
    }

    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return SingleHolder.DEFAULT;
            }
        });

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return ComputationHolder.DEFAULT;
            }
        });

        IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        });

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return NewThreadHolder.DEFAULT;
            }
        });
    }

    /** Utility class. */
    private Schedulers() {
        throw new IllegalStateException("No instances!");
    }

    /**
     * Creates and returns a {@link Scheduler} intended for computational work.
     * <p>
     * This can be used for event-loops, processing callbacks and other computational work.
     * <p>
     * Do not perform IO-bound work on this scheduler. Use {@link #io()} instead.
     * <p>
     * Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
     *
     * @return a {@link Scheduler} meant for computation-bound work
     */
    public static Scheduler computation() {
        return RxJavaPlugins.onComputationScheduler(COMPUTATION);
    }

    /**
     * Creates and returns a {@link Scheduler} intended for IO-bound work.
     * <p>
     * The implementation is backed by an {@link Executor} thread-pool that will grow as needed.
     * <p>
     * This can be used for asynchronously performing blocking IO.
     * <p>
     * Do not perform computational work on this scheduler. Use {@link #computation()} instead.
     * <p>
     * Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
     *
     * @return a {@link Scheduler} meant for IO-bound work
     */
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }

    /**
     * Creates and returns a {@link Scheduler} that queues work on the current thread to be executed after the
     * current work completes.
     *
     * @return a {@link Scheduler} that queues work on the current thread
     */
    public static Scheduler trampoline() {
        return TRAMPOLINE;
    }

    /**
     * Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
     * <p>
     * Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
     *
     * @return a {@link Scheduler} that creates new threads
     */
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }

    /**
     * Returns the common, single-thread backed Scheduler instance.
     * <p>
     * Uses:
     * <ul>
     * <li>main event loop</li>
     * <li>support Schedulers.from(Executor) and from(ExecutorService) with delayed scheduling</li>
     * <li>support benchmarks that pipeline data from the main thread to some other thread and
     * avoid core-bashing of computation's round-robin nature</li>
     * </ul>
     * @return a {@link Scheduler} that shares a single backing thread.
     * @since 2.0
     */
    public static Scheduler single() {
        return RxJavaPlugins.onSingleScheduler(SINGLE);
    }

    /**
     * Converts an {@link Executor} into a new Scheduler instance.
     *
     * @param executor
     *          the executor to wrap
     * @return the new Scheduler wrapping the Executor
     */
    public static Scheduler from(Executor executor) {
        return new ExecutorScheduler(executor);
    }

    /**
     * Shuts down those standard Schedulers which support the SchedulerLifecycle interface.
     * <p>The operation is idempotent and thread-safe.
     */
    public static void shutdown() {
        computation().shutdown();
        io().shutdown();
        newThread().shutdown();
        single().shutdown();
        trampoline().shutdown();
        SchedulerPoolFactory.shutdown();
    }

    /**
     * Starts those standard Schedulers which support the SchedulerLifecycle interface.
     * <p>The operation is idempotent and thread-safe.
     */
    public static void start() {
        computation().start();
        io().start();
        newThread().start();
        single().start();
        trampoline().start();
        SchedulerPoolFactory.start();
    }
}

 

public static Scheduler computation()

 

建立並返回用於計算工做的Scheduler,能夠用於事件循環,處理回調和其餘計算工做,不能在此調度程序上執行IO綁定工做

 

 

public static Scheduler io() 

建立並返回用於IO綁定工做的Scheduler,該實現由Executor線程池支持,該線程池將根據須要增加,這可用於異步執行阻塞IO, 不要在此調度程序上執行計算工做。

在類線程上可執行io操做,如網絡訪問,數據庫操做

 

public static Scheduler trampoline()

 

建立並返回一個Scheduler,該Scheduler將在當前工做完成以後在當前線程按照列隊執行。

 

public static Scheduler newThread()

 

建立並返回Scheduler,該Scheduler會爲每一個工做單元建立一個新的線程,也就是說每一個任務都會在一個新的線程中執行

 

public static Scheduler single()

建立並返回Scheduler,該Scheduler是一個單獨的後臺線程實例,將被全部的任務共用

 

public static Scheduler from(Executor executor) 

將傳入的Executor轉換爲新的Scheduler實例

 

AndroidSchedulers.mainThread()來至另一個包,須要gradle中導入

implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

 

AndroidSchedulers源碼以下:

public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}

 

 public static Scheduler mainThread()

會返回一個在Android主線程上執行操做的Scheduler;

public static Scheduler from(Looper looper)

返回一個在傳入的Looper對應的線程上執行的Scheduler

 

二、各種型線程的做用

根據源碼的解釋咱們能夠天然而然的知道,

subscribeOn(Schedulers.io())的意思是在IO線程上註冊對被觀察者的監聽,也就是讓咱們的網絡操做在io線程上執行,並被觀察

observeOn(AndroidSchedulers.mainThread())的意思是在主線程監聽數據變化,並能夠執行UI相關操做。

就目前我也只是用過以上兩種,其餘的使用後續再作補充

相關文章
相關標籤/搜索