RxJava
是ReactiveX推出的一個開源庫,它是Reactive Extensions
的Java VM實現,能夠很方便的在Java中實現響應式編程。解決了Java中繁瑣的異步切換、Callback hell等問題,使邏輯變得更加簡潔。java
RxJava
提供了豐富&功能強大的操做符,能夠說這些操做符就是RxJava
的基礎及核心,因此學習RxJava
都是從這些操做符開始。但因爲RxJava
的操做符種類繁多且網絡上已經出現了不少優秀的講解RxJava
操做符的文章,因此本文僅列舉一些操做符講解。android
Observable.interval(3000, TimeUnit.MILLISECONDS)//每隔3s發一個事件
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
Log.i("tag", "start");
}
@Override
public void onNext(Long aLong) {
Log.i("tag", "onNext:" + aLong);
}
@Override
public void onError(Throwable e) {
Log.i("tag", "error:" + e.getMessage());
}
@Override
public void onComplete() {
Log.i("tag", "onComplete");
}
});
複製代碼
interval
操做符主要就是實現輪詢操做,經過該操做符來實現輪詢效果會比Handler
、Timer
及newScheduledThreadPool
更簡潔,更優雅。但從原理上來看,interval
其實就是對newScheduledThreadPool
的封裝。固然,咱們也能夠本身對Handler
、Timer
來進行封裝。git
Observable.create(new ObservableOnSubscribe<File>() {
@Override
public void subscribe(ObservableEmitter<File> emitter) throws Exception {
File file = new File(path + File.separator + "blacklist");
emitter.onNext(file);
}
}).concatMap(new Function<File, ObservableSource<File>>() {
@Override
public ObservableSource<File> apply(File file) throws Exception {
if (!file.isDirectory()) {
return Observable.empty();
}
return Observable.fromArray(file.listFiles());
}
}).subscribe(new Consumer<File>() {
@Override
public void accept(File file) throws Exception {
LogUtils.i("getPackageNames", "刪除文件夾中已存在的文件");
file.delete();
}
});
複製代碼
concatMap
操做符主要是進行事件的拆分及合併。在上面示例中就實現了對文件夾的遍歷及得到文件夾下的每一個File
對象。github
Observable.fromIterable(data)
.map(new Function<PackageNameData, File>() {//類型轉換
@Override
public File apply(PackageNameData pkg) throws Exception {
LogUtils.i("getPackageNames", "pkg:" + pkg.toString());
String path = FileUtil.getWeikePath() + File.separator + "blacklist";
File file = new File(path);
if (file.exists() && file.isFile()) {
file.delete();
}
boolean b = file.mkdirs();
if (b) {
LogUtils.i("getPackageNames", "建立文件夾" + file + "成功");
} else {
LogUtils.i("getPackageNames", "建立文件夾" + file + "失敗");
}
path = path + File.separator + pkg.appPackageName.trim();
return new File(path);
}
})
.filter(new Predicate<File>() {//篩選
@Override
public boolean test(File file) throws Exception {
return !file.exists();
}
})
.subscribe(new Consumer<File>() {
@Override
public void accept(File file) throws Exception {
LogUtils.i("getPackageNames", "建立新的文件");
try {
boolean b = file.createNewFile();
if (!b) {
FileUtil.writeTxt(file.getAbsolutePath(), "");
}
} catch (IOException e) {
LogUtils.i("getPackageNames", "建立文件失敗:" + e.getMessage());
FileUtil.writeTxt(file.getAbsolutePath(), "");
}
}
});
複製代碼
filter
操做符主要是作篩選操做,若是返回false,則不會繼續向下發送事件。因此若是想要在返回false的狀況下也要繼續發送事件的話,則不能使用該操做符。算法
map
操做符主要是對類型的轉換,如上面示例中就是將PackageNameData
類型轉換成一個File
類型並向下傳遞。編程
關於RxJava
操做符的更多內容能夠去閱讀Carson_Ho的RxJava
系列文章、扔物線的給 Android 開發者的 RxJava 詳解等文章。數組
在Java中,通常討論線程都會想到Thread
類,但在RxJava中,咱們會發現,RxJava
中的線程是能夠作定時、輪詢等操做。這究竟是怎麼實現的尼?或許會想到定時器類——Timer
,但其實不是Timer
,是經過一個可定時、輪詢執行操做的線程池——newScheduledThreadPool
來實現的。在RxJava
中,因爲該線程池有且僅有一個線程,所以能夠將該線程池理解爲一種特殊線程,一種僅在RxJava
中使用的特殊線程。在後面內容中會將這種特殊的線程簡稱爲線程。 緩存
從圖中能夠看出,RxJava中線程都是在SchedulerPoolFactory
類的create
方法中建立的。網絡
public static ScheduledExecutorService create(ThreadFactory factory) {
//建立線程爲1的一個線程池,它至關於RxJava中的特殊線程
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
tryPutIntoPool(PURGE_ENABLED, exec);
return exec;
}
複製代碼
在RxJava
中可使用Schedulers.single()
來建立一個線程,該方法有且只會建立一個新的線程,相似於線程池中的newSingleThreadExecutor
。因此該線程只會在當前任務執行完畢後才執行下一個任務——至關於串行執行。下面來看一下源碼裏的實現。多線程
public final class SingleScheduler extends Scheduler {
final ThreadFactory threadFactory;
final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();
/** The name of the system property for setting the thread priority for this Scheduler. */
//至關於一個key,能夠經過設置KEY_SINGLE_PRIORITY對應的值來設置線程優先級
private static final String KEY_SINGLE_PRIORITY = "rx2.single-priority";
//能夠經過該參數來判斷執行的線程名稱
private static final String THREAD_NAME_PREFIX = "RxSingleScheduler";
...
public SingleScheduler() {
this(SINGLE_THREAD_FACTORY);
}
public SingleScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
//建立一個線程並使用原子變量類AtomicReference來管理該線程
//使用lazySet並不會讓值當即對全部線程可見,而set則是當即對全部線程可見的
executor.lazySet(createExecutor(threadFactory));
}
//建立一個線程,SchedulerPoolFactory.create(threadFactory)該方法在上面前面已經講述
static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {
return SchedulerPoolFactory.create(threadFactory);
}
//線程中止執行,關於如何中止能夠去查看線程池的中止執行
@Override
public void shutdown() {...}
...
}
複製代碼
能夠看出,在SingleScheduler
的構造方法中就經過createExecutor
建立了一個線程,而SingleScheduler
這個類僅會建立一次。因此當使用Schedulers.single()
時僅會建立一個線程。
在RxJava
中可使用Schedulers.newThread()
來建立一個新線程,該線程不會被重用,線程數量會隨着調用次數的增長而增長。
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
//建立一個新的線程
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
複製代碼
上面的NewThreadWorker
是一個很是重要類,後面的Schedulers.computation()
及Schedulers.io()
都是根據此類來建立線程的。
Schedulers.computation()
主要用來作一些計算密集型操做,會根據當前設備的CPU數量來建立一組線程。而後給不一樣任務分配不一樣的線程。下面來看源碼的實現。
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
...
static {
//最大線程數量,根據CPU數量計算出的
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));
...
}
static int cap(int cpuCount, int paramThreads) {
return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
}
//我認爲這裏實現了一個簡單的線程池
static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
final int cores;
final PoolWorker[] eventLoops;
long n;
FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
// initialize event loops
//線程的數量
this.cores = maxThreads;
//建立一個數組,保存對應的線程
this.eventLoops = new PoolWorker[maxThreads];
//建立一組線程
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
}
}
//根據索引來給不一樣任務分配不一樣的線程。
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) {
return SHUTDOWN_WORKER;
}
// simple round robin, improvements to come
return eventLoops[(int)(n++ % c)];
}
public void shutdown() {
for (PoolWorker w : eventLoops) {
w.dispose();
}
}
@Override
public void createWorkers(int number, WorkerCallback callback) {...}
}
public ComputationScheduler() {
this(THREAD_FACTORY);
}
//建立線程
public ComputationScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
start();
}
...
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
//給任務分配一個線程
PoolWorker w = pool.get().getEventLoop();
return w.scheduleDirect(run, delay, unit);
}
@NonNull
@Override
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
//給任務分配一個線程
PoolWorker w = pool.get().getEventLoop();
return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
}
//建立一組要使用的線程
@Override
public void start() {
FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
//中止線程執行
@Override
public void shutdown() {...}
...
//在NewThreadWorker中建立了線程,這裏之因此不直接使用NewThreadWorker是由於這裏傳遞的threadFactory能夠根據名稱來區分線程
static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
}
}
}
複製代碼
原理仍是比較簡單的,用一個數組來保存一組線程,而後根據索引將任務分配給每一個線程,因爲每一個線程其實是一個線程池,而這個線程池會把多餘的任務放在隊列中等待執行,因此每一個線程後面任務的執行須要等待前面的任務執行完畢。
Schedulers.io()
能夠說是RxJava
裏實現最複雜的,它不只會建立線程,也會清除線程。在IoScheduler
中實現了一個緩存池,當線程執行完畢後會將線程放入緩存池中。下面來看一下源碼實現。
public final class IoScheduler extends Scheduler {
...
//線程的存活時間
public static final long KEEP_ALIVE_TIME_DEFAULT = 60;
private static final long KEEP_ALIVE_TIME;
//線程的存活時間單位
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
...
//緩存池
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
//建立一個線程,該線程默認會每60s執行一次,來清除已到期的線程
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
//設置定時任務
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
@Override
public void run() {
//執行清除時間到期的線程操做
evictExpiredWorkers();
}
//每個任務都從隊列中獲取線程,若是隊列中有線程的話
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
//若是緩存池不爲空
while (!expiringWorkerQueue.isEmpty()) {
//從緩衝池中得到線程
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
//緩存池爲空,須要建立一個新的線程
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
//將執行完畢的線程放入緩存隊列中
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
//刷新線程的到期時間
threadWorker.setExpirationTime(now() + keepAliveTime);
//將執行完畢的線程放入緩存池中
expiringWorkerQueue.offer(threadWorker);
}
//默認每60s執行一次,主要是清除隊列中的已過時線程
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
//若是線程threadWorker已到期就將其從緩存中移除
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
...
}
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
...
//建立一個新的線程
static final class ThreadWorker extends NewThreadWorker {
//到期時間,若是該線程到期後就會被清除
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
//獲取線程的到期時間
public long getExpirationTime() {
return expirationTime;
}
//刷新到期時間
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
}
複製代碼
CachedWorkerPool
是一個很是重要的類,它內部有一個隊列及線程。該隊列主要是緩存已經使用完畢的線程,而CachedWorkerPool
中的線程evictor
主要就是作清除操做,默認是每60s就遍歷一遍隊列,若是線程過時就從隊列中將該線程移除。這裏的隊列沒有數量限制,因此理論上能夠建立無限多的線程。
Schedulers.trampoline()
用的比較少,官方對於它的解釋是:
在當前線程上執行,但不會當即執行。任務會被放入隊列並在當前任務完成後執行。注意:是在當前線程執行,也就意味着不會進行線程切換。
經過查看源碼能夠發現,當Schedulers.trampoline()
沒有延遲任務時,Schedulers.trampoline()
使用與沒有使用都沒區別。但執行延時任務時,就會將當前任務添加進隊列中,等待時間到了再執行。
public final class TrampolineScheduler extends Scheduler {
private static final TrampolineScheduler INSTANCE = new TrampolineScheduler();
public static TrampolineScheduler instance() {
return INSTANCE;
}
@NonNull
@Override
public Worker createWorker() {
return new TrampolineWorker();
}
/* package accessible for unit tests */TrampolineScheduler() {
}
//當不是延時任務時,直接執行該任務
@NonNull
@Override
public Disposable scheduleDirect(@NonNull Runnable run) {
RxJavaPlugins.onSchedule(run).run();
return EmptyDisposable.INSTANCE;
}
...
//執行延時任務,就會將該任務添加進優先級隊列PriorityBlockingQueue中
static final class TrampolineWorker extends Scheduler.Worker implements Disposable {
final PriorityBlockingQueue<TimedRunnable> queue = new PriorityBlockingQueue<TimedRunnable>();
private final AtomicInteger wip = new AtomicInteger();
final AtomicInteger counter = new AtomicInteger();
volatile boolean disposed;
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action) {
//將任務壓入隊列中
return enqueue(action, now(TimeUnit.MILLISECONDS));
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
long execTime = now(TimeUnit.MILLISECONDS) + unit.toMillis(delayTime);
//將任務壓入隊列中
return enqueue(new SleepingRunnable(action, this, execTime), execTime);
}
//將任務添加進隊列中等待執行
Disposable enqueue(Runnable action, long execTime) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());
queue.add(timedRunnable);
if (wip.getAndIncrement() == 0) {
int missed = 1;
for (;;) {
for (;;) {
if (disposed) {
queue.clear();
return EmptyDisposable.INSTANCE;
}
//獲取一個要執行的任務
final TimedRunnable polled = queue.poll();
if (polled == null) {
break;
}
if (!polled.disposed) {
//執行任務
polled.run.run();
}
}
//重置wip的值
missed = wip.addAndGet(-missed);
if (missed == 0) {
break;
}
}
return EmptyDisposable.INSTANCE;
} else {
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
return Disposables.fromRunnable(new AppendToQueueTask(timedRunnable));
}
}
...
}
...
}
複製代碼
AndroidSchedulers.mainThread()
是RxAndroid
中的的API。因爲在android中須要在主線程更新UI,因此須要該API來切換回主線程。在Android中想要切換回主線程,就只有經過Handler
來實現,而AndroidSchedulers.mainThread()
也不例外。很是簡單,就是經過Handler
向主線程發送消息。
final class HandlerScheduler extends Scheduler {
//傳遞進來的Handler已是主線程的Handler了,只要經過該Handler發送消息便可
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
...
//發送消息,切換回主線程
handler.sendMessageDelayed(message, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
@SuppressLint("NewApi") // Async will only be true when the API is available to call.
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
...
//發送消息,切換回主線程
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
...
}
...
}
複製代碼
在Android及一些開源庫(如OKHttp
、Glide
等)中,多線程之間的數據同步問題通常都是採用synchronized
來實現,由於它是使用最簡單也最深刻人心的一種實現方式,也是性能比較高的一種實現方法。但它倒是一種悲觀鎖——不論是否有線來程競爭都會加鎖,這就致使了在線程競爭比較低的狀況下,它的性能不如樂觀鎖——一種經過CAS來實現的鎖機制。而RxJava
中大量使用的原子變量類Atomicxxxxxx
就是一種樂觀鎖,也是CAS
的一種實現。
CAS
全稱爲Compare And Swap
,即比較並替換。它包含了3個操做數——須要讀寫的內存位置V、進行比較的值A和擬寫入的新值B。當且僅當V的值等於A時,CAS
纔會經過原子方式用新值B來更新V的值,不然不會執行任何操做。關於更多CAS
能夠參考筆者的Java之CAS無鎖算法這篇文章。
在RxJava
中都會使用裝飾模式將Observer
包裹成與操做符對應的類xxxxxxObserver
,如FlatMap
、merge
等操做符對應的類——MergeObserver
、subscribeOn
操做符對應的類——SubscribeOnObserver
、observeOn
對應的類——ObserveOnObserver
等。而MergeObserver
、SubscribeOnObserver
及ObserveOnObserver
都分別繼承自AtomicInteger
、AtomicReference
及AtomicInteger
。也就是經過原子變量類來實現了線程之間的數據同步。
在Flowable
中也是如此,只不過由xxxxxxObserver
變爲了xxxxxSubscriber
而已。
生產者——消費者模式其實就是一種線程間協做的思想。在學習多線程時,實現的買票與賣票案例,就是該模型的實現。或許在開發中不多主動使用到該模型,但基本上都會被動使用該模型。如音視頻的下載與解碼、網絡圖片的下載與展現、RxJava事件的發送與接收等。到這裏,咱們會疑惑,該模型與RxJava有什麼關聯?是何種聯繫尼?其實RxJava的異步訂閱就是該模型的一種實現,也所以會在上游發送事件的速度超出下游處理事件的速度時,拋MissingBackpressureException
異常。
Backpressure
既是你們所說的背壓,可是我認爲這個翻譯是有一點問題的,沒有一目瞭然的表達Backpressure
,筆者認爲扔物線在如何形象的描述反應式編程中的背壓(Backpressure)機制?中的回答就很好的闡述了Backpressure
。 產生的緣由——主要是在異步場景下,上游發送事件的速度超過了下游處理事件的速度,使buffer溢出,從而拋出MissingBackpressureException
異常,這裏重點在於buffer的溢出(RxJava 2.x中的默認buffer大小爲128)。在1.x的版本中,解決該問題的方案不是很完全,但在2.x的版本中則分出一個新類Flowable
來處理這個問題。它與Observable
處理事件的流程恰好相反,Observable
的事件是由被觀察者主動發送的,觀察者沒法控制速度,只能被動接受,而Flowable
則是由觀察者主動獲取事件,從而解決了MissingBackpressureException
異常。下面來看一個示例。
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) throws Exception {
for (int i = 0; i < 200; i++) {
emitter.onNext("str" + i);
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
Log.w("Flowable", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.w("Flowable", "s:" + s);
}
@Override
public void onError(Throwable t) {
Log.w("Flowable", "error:" + t.toString());
}
@Override
public void onComplete() {
Log.w("Flowable", "onComplete");
}
});
複製代碼
Flowable
的create
方法的第二個參數是設置Backpressure
的模式,它有以下幾種模式:
BackpressureStrategy.MISSING
:上游不作任何事件緩存及丟棄,所有交給下游處理,若是有溢出的話,上游無論,交給下游處理。BackpressureStrategy.ERROR
:當下遊沒法及時處理事件從而致使緩存隊列已滿時,會給出MissingBackpressureException
異常提示,默認是該策略。BackpressureStrategy.BUFFER
:緩存隊列無限大,因此不會拋出MissingBackpressureException
異常。直到下游處理完畢全部事件爲止,也意味着內存會隨着事件的增多而增大。BackpressureStrategy.DROP
:若是下游沒法及時處理事件從而當緩存隊列已滿時,會刪除最近的事件。BackpressureStrategy.LATEST
:若是下游沒法及時處理事件從而當緩存隊列已滿時,會保留最新的事件,其餘的事件會被覆蓋。因此運行上面代碼就會給出MissingBackpressureException
異常提示,須要咱們經過request
方法來獲取及消費事件及設置Backpressure
策略來解決該問題。在使用其餘操做符的時候,沒法主動設置Backpressure
策略,則會在緩存池滿了之後給出MissingBackpressureException
異常提示。
toFlowable
是Observable
中的一個方法,經過該方法能夠主動來設置Backpressure
策略,從而低成本的解決在Observable
中拋出的MissingBackpressureException
異常。
Observable.interval(1000,TimeUnit.MILLISECONDS)
//設置`Backpressure`策略
.toFlowable(BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
複製代碼
相信閱讀到這裏,就已經對RxJava及源碼都有了必定的瞭解。可是你們有沒有想過如下幾個問題,也是我在學習RxJava時一直思考的幾個問題。
首先來看問題一,RxJava
的應用能夠說很是普遍,好比輪詢、網絡出錯重連、網絡請求嵌套回調、聯合判斷、從緩存中獲取數據等,但上面的一些場景也能夠不用RxJava來實現,這也就致使了在使用時不會第一時間想到RxJava
。因此筆者認爲若是想要熟練的使用RxJava,則須要在思想上進行一次轉變,由於RxJava是響應式編程的一種實現,它不會像OkHttp
、Glide
、Dbflow
等開源庫只會應用在某一領域。
關於學習RxJava的意義,我認爲最好就是可以熟練使用並在可使用RxJava
的時候可以第一時間想到RxJava
,固然因爲RxJava
學習門檻較高且須要思惟的轉變,因此在不能熟練使用時,就須要咱們可以看懂別人寫的RxJava
代碼了。固然RxJava
的異步切換、Callback hell問題的解決也是很好的學習RxJava
的理由。
那麼你們怎麼看RxJava
尼???
【參考資料】
關於RxJava最友好的文章——背壓(Backpressure)