RxJava淺析

RxJavaReactiveX推出的一個開源庫,它是Reactive Extensions的Java VM實現,能夠很方便的在Java中實現響應式編程。解決了Java中繁瑣的異步切換、Callback hell等問題,使邏輯變得更加簡潔。java

一、操做符

RxJava提供了豐富&功能強大的操做符,能夠說這些操做符就是RxJava的基礎及核心,因此學習RxJava都是從這些操做符開始。但因爲RxJava的操做符種類繁多且網絡上已經出現了不少優秀的講解RxJava操做符的文章,因此本文僅列舉一些操做符講解。android

1.一、interval

Observable.interval(3000, TimeUnit.MILLISECONDS)//每隔3s發一個事件
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.i("tag", "start");
                }

                @Override
                public void onNext(Long aLong) {
                    Log.i("tag", "onNext:" + aLong);
                }

                @Override
                public void onError(Throwable e) {
                    Log.i("tag", "error:" + e.getMessage());
                }

                @Override
                public void onComplete() {
                    Log.i("tag", "onComplete");
                }
            });
複製代碼

interval操做符主要就是實現輪詢操做,經過該操做符來實現輪詢效果會比HandlerTimernewScheduledThreadPool更簡潔,更優雅。但從原理上來看,interval其實就是對newScheduledThreadPool的封裝。固然,咱們也能夠本身對HandlerTimer來進行封裝。git

1.二、concatMap操做符

Observable.create(new ObservableOnSubscribe<File>() {
        @Override
        public void subscribe(ObservableEmitter<File> emitter) throws Exception {
            File file = new File(path + File.separator + "blacklist");
            emitter.onNext(file);
        }
    }).concatMap(new Function<File, ObservableSource<File>>() {
        @Override
        public ObservableSource<File> apply(File file) throws Exception {
            if (!file.isDirectory()) {
                return Observable.empty();
            }
            return Observable.fromArray(file.listFiles());
        }
    }).subscribe(new Consumer<File>() {
        @Override
        public void accept(File file) throws Exception {
            LogUtils.i("getPackageNames", "刪除文件夾中已存在的文件");
            file.delete();
        }
    });
複製代碼

concatMap操做符主要是進行事件的拆分及合併。在上面示例中就實現了對文件夾的遍歷及得到文件夾下的每一個File對象。github

1.三、map及filter操做符

Observable.fromIterable(data)
            .map(new Function<PackageNameData, File>() {//類型轉換
                @Override
                public File apply(PackageNameData pkg) throws Exception {
                    LogUtils.i("getPackageNames", "pkg:" + pkg.toString());
                    String path = FileUtil.getWeikePath() + File.separator + "blacklist";
                    File file = new File(path);
                    if (file.exists() && file.isFile()) {
                        file.delete();
                    }
                    boolean b = file.mkdirs();
                    if (b) {
                        LogUtils.i("getPackageNames", "建立文件夾" + file + "成功");
                    } else {
                        LogUtils.i("getPackageNames", "建立文件夾" + file + "失敗");
                    }
                    path = path + File.separator + pkg.appPackageName.trim();
                    return new File(path);
                }
            })
            .filter(new Predicate<File>() {//篩選
                @Override
                public boolean test(File file) throws Exception {
                    return !file.exists();
                }
            })
            .subscribe(new Consumer<File>() {
                @Override
                public void accept(File file) throws Exception {
                    LogUtils.i("getPackageNames", "建立新的文件");
                    try {
                        boolean b = file.createNewFile();
                        if (!b) {
                            FileUtil.writeTxt(file.getAbsolutePath(), "");
                        }
                    } catch (IOException e) {
                        LogUtils.i("getPackageNames", "建立文件失敗:" + e.getMessage());
                        FileUtil.writeTxt(file.getAbsolutePath(), "");
                    }
                }
            });
複製代碼

filter操做符主要是作篩選操做,若是返回false,則不會繼續向下發送事件。因此若是想要在返回false的狀況下也要繼續發送事件的話,則不能使用該操做符。算法

map操做符主要是對類型的轉換,如上面示例中就是將PackageNameData類型轉換成一個File類型並向下傳遞。編程

關於RxJava操做符的更多內容能夠去閱讀Carson_HoRxJava系列文章、扔物線的給 Android 開發者的 RxJava 詳解等文章。數組

二、線程調度及同步機制

在Java中,通常討論線程都會想到Thread類,但在RxJava中,咱們會發現,RxJava中的線程是能夠作定時、輪詢等操做。這究竟是怎麼實現的尼?或許會想到定時器類——Timer,但其實不是Timer,是經過一個可定時、輪詢執行操做的線程池——newScheduledThreadPool來實現的。在RxJava中,因爲該線程池有且僅有一個線程,所以能夠將該線程池理解爲一種特殊線程,一種僅在RxJava中使用的特殊線程。在後面內容中會將這種特殊的線程簡稱爲線程。 緩存

從圖中能夠看出,RxJava中線程都是在SchedulerPoolFactory類的create方法中建立的。網絡

public static ScheduledExecutorService create(ThreadFactory factory) {
        //建立線程爲1的一個線程池,它至關於RxJava中的特殊線程
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }
複製代碼

2.一、Schedulers.single()

RxJava中可使用Schedulers.single()來建立一個線程,該方法有且只會建立一個新的線程,相似於線程池中的newSingleThreadExecutor。因此該線程只會在當前任務執行完畢後才執行下一個任務——至關於串行執行。下面來看一下源碼裏的實現。多線程

public final class SingleScheduler extends Scheduler {
    final ThreadFactory threadFactory;
    final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
    /** The name of the system property for setting the thread priority for this Scheduler. */
    //至關於一個key,能夠經過設置KEY_SINGLE_PRIORITY對應的值來設置線程優先級
    private static final String KEY_SINGLE_PRIORITY = "rx2.single-priority";
    //能夠經過該參數來判斷執行的線程名稱
    private static final String THREAD_NAME_PREFIX = "RxSingleScheduler";
    ...
    public SingleScheduler() {
        this(SINGLE_THREAD_FACTORY);
    }
    public SingleScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        //建立一個線程並使用原子變量類AtomicReference來管理該線程
        //使用lazySet並不會讓值當即對全部線程可見,而set則是當即對全部線程可見的
        executor.lazySet(createExecutor(threadFactory));
    }
    //建立一個線程,SchedulerPoolFactory.create(threadFactory)該方法在上面前面已經講述
    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
        return SchedulerPoolFactory.create(threadFactory);
    }
    //線程中止執行,關於如何中止能夠去查看線程池的中止執行
    @Override
    public void shutdown() {...}
    ...
}

複製代碼

能夠看出,在SingleScheduler的構造方法中就經過createExecutor建立了一個線程,而SingleScheduler這個類僅會建立一次。因此當使用Schedulers.single()時僅會建立一個線程。

2.二、Schedulers.newThread()

RxJava中可使用Schedulers.newThread()來建立一個新線程,該線程不會被重用,線程數量會隨着調用次數的增長而增長。

public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }
    //建立一個新的線程
    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}
複製代碼

上面的NewThreadWorker是一個很是重要類,後面的Schedulers.computation()Schedulers.io()都是根據此類來建立線程的。

2.三、Schedulers.computation()

Schedulers.computation()主要用來作一些計算密集型操做,會根據當前設備的CPU數量來建立一組線程。而後給不一樣任務分配不一樣的線程。下面來看源碼的實現。

public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
    ...
    static {
        //最大線程數量,根據CPU數量計算出的
        MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
        ...
    }

    static int cap(int cpuCount, int paramThreads) {
        return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
    }
    //我認爲這裏實現了一個簡單的線程池
    static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
        final int cores;

        final PoolWorker[] eventLoops;
        long n;

        FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
            // initialize event loops
            //線程的數量
            this.cores = maxThreads;
            //建立一個數組,保存對應的線程
            this.eventLoops = new PoolWorker[maxThreads];
            //建立一組線程
            for (int i = 0; i < maxThreads; i++) {
                this.eventLoops[i] = new PoolWorker(threadFactory);
            }
        }
        //根據索引來給不一樣任務分配不一樣的線程。
        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % c)];
        }

        public void shutdown() {
            for (PoolWorker w : eventLoops) {
                w.dispose();
            }
        }

        @Override
        public void createWorkers(int number, WorkerCallback callback) {...}
    }

    public ComputationScheduler() {
        this(THREAD_FACTORY);
    }

    //建立線程
    public ComputationScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
        start();
    }

    ...
    @NonNull
    @Override
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
        //給任務分配一個線程
        PoolWorker w = pool.get().getEventLoop();
        return w.scheduleDirect(run, delay, unit);
    }

    @NonNull
    @Override
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
        //給任務分配一個線程
        PoolWorker w = pool.get().getEventLoop();
        return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
    }
    //建立一組要使用的線程
    @Override
    public void start() {
        FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    //中止線程執行
    @Override
    public void shutdown() {...}
    ...
    //在NewThreadWorker中建立了線程,這裏之因此不直接使用NewThreadWorker是由於這裏傳遞的threadFactory能夠根據名稱來區分線程
    static final class PoolWorker extends NewThreadWorker {
        PoolWorker(ThreadFactory threadFactory) {
            super(threadFactory);
        }
    }    
}
複製代碼

原理仍是比較簡單的,用一個數組來保存一組線程,而後根據索引將任務分配給每一個線程,因爲每一個線程其實是一個線程池,而這個線程池會把多餘的任務放在隊列中等待執行,因此每一個線程後面任務的執行須要等待前面的任務執行完畢。

2.四、Schedulers.io()

Schedulers.io()能夠說是RxJava裏實現最複雜的,它不只會建立線程,也會清除線程。在IoScheduler中實現了一個緩存池,當線程執行完畢後會將線程放入緩存池中。下面來看一下源碼實現。

public final class IoScheduler extends Scheduler {
    ...
    //線程的存活時間
    public static final long KEEP_ALIVE_TIME_DEFAULT = 60;
    private static final long KEEP_ALIVE_TIME;
    //線程的存活時間單位
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
    ...
    //緩存池
    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;

        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeDisposable();
            this.threadFactory = threadFactory;

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                //建立一個線程,該線程默認會每60s執行一次,來清除已到期的線程
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                //設置定時任務
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }

        @Override
        public void run() {
            //執行清除時間到期的線程操做
            evictExpiredWorkers();
        }
        //每個任務都從隊列中獲取線程,若是隊列中有線程的話
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            //若是緩存池不爲空
            while (!expiringWorkerQueue.isEmpty()) {
                //從緩衝池中得到線程
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            //緩存池爲空,須要建立一個新的線程
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
        //將執行完畢的線程放入緩存隊列中
        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            //刷新線程的到期時間
            threadWorker.setExpirationTime(now() + keepAliveTime);
            //將執行完畢的線程放入緩存池中
            expiringWorkerQueue.offer(threadWorker);
        }
        //默認每60s執行一次,主要是清除隊列中的已過時線程
        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        //若是線程threadWorker已到期就將其從緩存中移除
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }
        ...
    }

    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    ...
    //建立一個新的線程
    static final class ThreadWorker extends NewThreadWorker {
        //到期時間,若是該線程到期後就會被清除
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
        //獲取線程的到期時間
        public long getExpirationTime() {
            return expirationTime;
        }
        //刷新到期時間
        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
}
複製代碼

CachedWorkerPool是一個很是重要的類,它內部有一個隊列及線程。該隊列主要是緩存已經使用完畢的線程,而CachedWorkerPool中的線程evictor主要就是作清除操做,默認是每60s就遍歷一遍隊列,若是線程過時就從隊列中將該線程移除。這裏的隊列沒有數量限制,因此理論上能夠建立無限多的線程。

2.五、Schedulers.trampoline()

Schedulers.trampoline()用的比較少,官方對於它的解釋是:

在當前線程上執行,但不會當即執行。任務會被放入隊列並在當前任務完成後執行。注意:是在當前線程執行,也就意味着不會進行線程切換

經過查看源碼能夠發現,當Schedulers.trampoline()沒有延遲任務時,Schedulers.trampoline()使用與沒有使用都沒區別。但執行延時任務時,就會將當前任務添加進隊列中,等待時間到了再執行。

public final class TrampolineScheduler extends Scheduler {
    private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();

    public static TrampolineScheduler instance() {
        return INSTANCE;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new TrampolineWorker();
    }

    /* package accessible for unit tests */TrampolineScheduler() {
    }
    //當不是延時任務時,直接執行該任務
    @NonNull
    @Override
    public Disposable scheduleDirect(@NonNull Runnable run) {
        RxJavaPlugins.onSchedule(run).run();
        return EmptyDisposable.INSTANCE;
    }
    ...
    //執行延時任務,就會將該任務添加進優先級隊列PriorityBlockingQueue中
    static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
        final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<TimedRunnable>();

        private final AtomicInteger wip = new AtomicInteger();

        final AtomicInteger counter = new AtomicInteger();

        volatile boolean disposed;

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action) {
            //將任務壓入隊列中
            return enqueue(action, now(TimeUnit.MILLISECONDS));
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            long execTime = now(TimeUnit.MILLISECONDS) + unit.toMillis(delayTime);
            //將任務壓入隊列中
            return enqueue(new SleepingRunnable(action, this, execTime), execTime);
        }
        //將任務添加進隊列中等待執行
        Disposable enqueue(Runnable action, long execTime) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }
            final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
            queue.add(timedRunnable);

            if (wip.getAndIncrement() == 0) {
                int missed = 1;
                for (;;) {
                    for (;;) {
                        if (disposed) {
                            queue.clear();
                            return EmptyDisposable.INSTANCE;
                        }
                        //獲取一個要執行的任務
                        final TimedRunnable polled = queue.poll();
                        if (polled == null) {
                            break;
                        }
                        if (!polled.disposed) {
                            //執行任務
                            polled.run.run();
                        }
                    }
                    //重置wip的值
                    missed = wip.addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }

                return EmptyDisposable.INSTANCE;
            } else {
                // queue wasn't empty, a parent is already processing so we just add to the end of the queue
                return Disposables.fromRunnable(new AppendToQueueTask(timedRunnable));
            }
        }
        ...
    }
    ...
}

複製代碼

2.六、AndroidSchedulers.mainThread()

AndroidSchedulers.mainThread()RxAndroid中的的API。因爲在android中須要在主線程更新UI,因此須要該API來切換回主線程。在Android中想要切換回主線程,就只有經過Handler來實現,而AndroidSchedulers.mainThread()也不例外。很是簡單,就是經過Handler向主線程發送消息。

final class HandlerScheduler extends Scheduler {
    //傳遞進來的Handler已是主線程的Handler了,只要經過該Handler發送消息便可
    private final Handler handler;
    private final boolean async;

    HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        ...
        //發送消息,切換回主線程
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        private final boolean async;

        private volatile boolean disposed;

        HandlerWorker(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }

        @Override
        @SuppressLint("NewApi") // Async will only be true when the API is available to call.
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...
            //發送消息,切換回主線程
            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        ...
    }
    ...
}
複製代碼

2.七、線程同步機制

在Android及一些開源庫(如OKHttpGlide等)中,多線程之間的數據同步問題通常都是採用synchronized來實現,由於它是使用最簡單也最深刻人心的一種實現方式,也是性能比較高的一種實現方法。但它倒是一種悲觀鎖——不論是否有線來程競爭都會加鎖,這就致使了在線程競爭比較低的狀況下,它的性能不如樂觀鎖——一種經過CAS來實現的鎖機制。而RxJava中大量使用的原子變量類Atomicxxxxxx就是一種樂觀鎖,也是CAS的一種實現。

CAS全稱爲Compare And Swap,即比較並替換。它包含了3個操做數——須要讀寫的內存位置V、進行比較的值A和擬寫入的新值B。當且僅當V的值等於A時,CAS纔會經過原子方式用新值B來更新V的值,不然不會執行任何操做。關於更多CAS能夠參考筆者的Java之CAS無鎖算法這篇文章。

RxJava中都會使用裝飾模式將Observer包裹成與操做符對應的類xxxxxxObserver,如FlatMapmerge等操做符對應的類——MergeObserversubscribeOn操做符對應的類——SubscribeOnObserverobserveOn對應的類——ObserveOnObserver等。而MergeObserverSubscribeOnObserverObserveOnObserver都分別繼承自AtomicIntegerAtomicReferenceAtomicInteger。也就是經過原子變量類來實現了線程之間的數據同步。

Flowable中也是如此,只不過由xxxxxxObserver變爲了xxxxxSubscriber而已。

三、生產者-消費者模型

生產者——消費者模式其實就是一種線程間協做的思想。在學習多線程時,實現的買票與賣票案例,就是該模型的實現。或許在開發中不多主動使用到該模型,但基本上都會被動使用該模型。如音視頻的下載與解碼、網絡圖片的下載與展現、RxJava事件的發送與接收等。到這裏,咱們會疑惑,該模型與RxJava有什麼關聯?是何種聯繫尼?其實RxJava的異步訂閱就是該模型的一種實現,也所以會在上游發送事件的速度超出下游處理事件的速度時,拋MissingBackpressureException異常。

3.一、Backpressure

Backpressure既是你們所說的背壓,可是我認爲這個翻譯是有一點問題的,沒有一目瞭然的表達Backpressure,筆者認爲扔物線在如何形象的描述反應式編程中的背壓(Backpressure)機制?中的回答就很好的闡述了Backpressure。 產生的緣由——主要是在異步場景下,上游發送事件的速度超過了下游處理事件的速度,使buffer溢出,從而拋出MissingBackpressureException異常,這裏重點在於buffer的溢出(RxJava 2.x中的默認buffer大小爲128)。在1.x的版本中,解決該問題的方案不是很完全,但在2.x的版本中則分出一個新類Flowable來處理這個問題。它與Observable處理事件的流程恰好相反,Observable的事件是由被觀察者主動發送的,觀察者沒法控制速度,只能被動接受,而Flowable則是由觀察者主動獲取事件,從而解決了MissingBackpressureException異常。下面來看一個示例。

Flowable.create(new FlowableOnSubscribe<String>() {
        @Override
        public void subscribe(FlowableEmitter<String> emitter) throws Exception {
            for (int i = 0; i < 200; i++) {
                emitter.onNext("str" + i);
            }
        }
    }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    Log.w("Flowable", "onSubscribe");
                }

                @Override
                public void onNext(String s) {
                    Log.w("Flowable", "s:" + s);
                }

                @Override
                public void onError(Throwable t) {
                    Log.w("Flowable", "error:" + t.toString());
                }

                @Override
                public void onComplete() {
                    Log.w("Flowable", "onComplete");
                }
            });

複製代碼

Flowablecreate方法的第二個參數是設置Backpressure的模式,它有以下幾種模式:

  • BackpressureStrategy.MISSING:上游不作任何事件緩存及丟棄,所有交給下游處理,若是有溢出的話,上游無論,交給下游處理。
  • BackpressureStrategy.ERROR:當下遊沒法及時處理事件從而致使緩存隊列已滿時,會給出MissingBackpressureException異常提示,默認是該策略。
  • BackpressureStrategy.BUFFER:緩存隊列無限大,因此不會拋出MissingBackpressureException異常。直到下游處理完畢全部事件爲止,也意味着內存會隨着事件的增多而增大。
  • BackpressureStrategy.DROP:若是下游沒法及時處理事件從而當緩存隊列已滿時,會刪除最近的事件。
  • BackpressureStrategy.LATEST:若是下游沒法及時處理事件從而當緩存隊列已滿時,會保留最新的事件,其餘的事件會被覆蓋。

因此運行上面代碼就會給出MissingBackpressureException異常提示,須要咱們經過request方法來獲取及消費事件及設置Backpressure策略來解決該問題。在使用其餘操做符的時候,沒法主動設置Backpressure策略,則會在緩存池滿了之後給出MissingBackpressureException異常提示。

3.二、toFlowable

toFlowableObservable中的一個方法,經過該方法能夠主動來設置Backpressure策略,從而低成本的解決在Observable中拋出的MissingBackpressureException異常。

Observable.interval(1000,TimeUnit.MILLISECONDS)
            //設置`Backpressure`策略
            .toFlowable(BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Long>() {
                @Override
                public void onSubscribe(Subscription s) {

                }

                @Override
                public void onNext(Long aLong) {

                }

                @Override
                public void onError(Throwable t) {

                }

                @Override
                public void onComplete() {

                }
            });
複製代碼

四、總結

相信閱讀到這裏,就已經對RxJava及源碼都有了必定的瞭解。可是你們有沒有想過如下幾個問題,也是我在學習RxJava時一直思考的幾個問題。

  • RxJava的應用場景在哪?
  • 學習RxJava的意義何在?

首先來看問題一,RxJava的應用能夠說很是普遍,好比輪詢、網絡出錯重連、網絡請求嵌套回調、聯合判斷、從緩存中獲取數據等,但上面的一些場景也能夠不用RxJava來實現,這也就致使了在使用時不會第一時間想到RxJava。因此筆者認爲若是想要熟練的使用RxJava,則須要在思想上進行一次轉變,由於RxJava是響應式編程的一種實現,它不會像OkHttpGlideDbflow等開源庫只會應用在某一領域。

關於學習RxJava的意義,我認爲最好就是可以熟練使用並在可使用RxJava的時候可以第一時間想到RxJava,固然因爲RxJava學習門檻較高且須要思惟的轉變,因此在不能熟練使用時,就須要咱們可以看懂別人寫的RxJava代碼了。固然RxJava的異步切換、Callback hell問題的解決也是很好的學習RxJava的理由。

那麼你們怎麼看RxJava尼???

【參考資料】

關於RxJava最友好的文章——背壓(Backpressure)

如何形象的描述反應式編程中的背壓(Backpressure)機制?

RxJava 沉思錄(四):總結

我爲何再也不推薦RxJava

關於 RxJava 背壓

相關文章
相關標籤/搜索