RxJava很方便的切換主子線程、指定任務運行的線程,在這個便利以後還隱藏着不少問題。好比IO scheduler是一個無上限線程池,若是短期併發量過大,在手機端可能出現OOM或者pthread_create錯誤。另外,在實際業務中咱們須要對執行的業務進行優先級區分,以便優先級高的任務先執行,想實現這個需求必然須要對RxJava默認的scheduler進行改造。本文將從RxJava IO scheduler分析、介紹線程池相關知識、如何對IO scheduler進行改造等方向進行介紹,而且對應用舊代碼作到無侵入式的替換。java
在介紹主體內容以前,咱們先回顧下線程池的相關知識,這樣能更好的理解本文章內容。從字面意思上來講,線程池確定是一個裝着線程的"池",小則是魚塘,裝的少,可是家裏沒礦只能承包這麼大的魚塘;固然若是是大佬,說不定這一片海都是他的。線程池確定不是簡單的承載容納線程的池子,既然做爲相似倉庫的屬性,必然有管理之意。git
線程是做爲一個任務的執行承載者,接收來自程序的諸多執行請求,其中子線程是合理利用cpu性能避免阻塞主線程的存在。好東西都是不可貪多,線程若是不加以管理,確定會被程序各處的代碼隨意建立,這樣會浪費或者影響cpu某個時刻的性能,甚至致使當前進程出現異常。github
阻塞隊列,這是線程池待執行任務的容器,負責管理要執行的任務。阻塞二字說明它的入隊和出隊是可控的,所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒。按照隊列的數據結構進行出入某一個任務時會阻塞,這樣在多線程環境下才更安全的生產任務和消費任務。緩存
比較通俗的一個例子有倉庫提取貨物,若是倉庫裏只有有限的幾個小車運輸貨物,此時有不少運輸員來提貨,確定要遇到爭奪小車、等待小車。小車相似於cpu的核數,也能夠理解爲線程池容許建立的總數。總而言之,當前倉庫(線程池)只有有限的幾輛小車同時工做,每一個運輸員(程序代碼塊)想要獲取貨物(執行代碼)就必需要爭奪小車資源。當有空閒小車時,會按照必定規則分配給某個經銷商,多是隊列的簡單前後入隊等待順序,也多是優先級(畢竟氪金無敵VIP)。固然有多是大佬,家裏礦多,說小車不夠,我去買,這樣會造成一個無限制上限線程池模型。不過這樣作有一種風險就是倉庫體積沒法隨意擴容(整個程序所承載的機器性能有限),買了太多小車放不下,而後整個倉庫就癱瘓了,這時候可能OOM或pthread_create錯誤。小車每次使用完後,都會繼續被分配到下一個任務,固然若是經銷商的事情都處理完了,可能就都閒置了,有可能晚上沒活,倉庫就把小車都封存起來,整理回收到固定地方(超時閒置後回收非核心線程),有可能留下幾輛預先說好的小車以便晚上有緊急貨物時處理(核心線程常駐)。 以下圖所示,大體概念以下。綠色的取貨車多是由於取貨車不夠,臨時採購或者借調的,相似於線程池臨時開啓新線程。紅色的取貨車區域是核心線程,有限的。安全
通常核心線程數根據CPU數量來肯定,線程池數大於CPU數量,看似是併發執行任務,實際上是操做系統幫咱們在按照必定時間片進行調度來執行任務,達到一種同時執行的效果,因此大量線程同時執行對CPU負載性能要求,會讓機器達到50%甚至100%高負荷運做,此時整機機器發生出錯的機率增大。因此同時執行不少任務致使頻繁切換線程自己也是一種額外的開銷 ,不建議如此操做,尤爲是部分任務是低優先級且不重要、可延遲的。bash
單個線程大概1MB左右開銷,在Java內部開發版的JDK中,加入fiber這種新的任務調度模型,開銷只有200KB左右,經過100W級任務調度測試,據介紹性能比thread優異不少,release版預測還會提升,特性相似於協程。不過咱們Android不太可能使用付費JDK。數據結構
經過上面的描述咱們簡單的瞭解線程池與線程之間的關係,線程和調用者之間的聯繫,而且線程池運做時是有本身的規則設定,調用者和每一個被管理的線程必須遵照規則。多線程
java經過Executors提供了四種經常使用線程池,這四種本質上也是封裝了自定義線程池的基本參數,簡化了建立流程,提供特定的功能。併發
先介紹下自定義線程池的基本參數和含義,這樣更好理解下面的java封裝好的線程池。ide
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
複製代碼
一個任務被提交時,它的大體流程都是類似的。下面是流程圖(下圖取自文章https://juejin.im/post/5b052dd7f265da0ba567e7f1)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}
複製代碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
通常都是繼承ThreadPoolExecutor類(不繼承也能夠,可是繼承是爲了作更多的方法執行監控),而後根據須要設置下面7種參數。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
比較重要的就是阻塞隊列workQueue和拒絕策略handler的選擇,這個會在後續RxJava的IO scheduler監控方案裏再次介紹。java提供的默認隊列的種類有無限大小和有限,帶優先級、帶延時等,根據須要能夠選擇不一樣類型的隊列。ThreadFactory 是構造新的thread的工廠,這裏自定義一個能夠進行新建線程的監控。拒絕策略handler有提供四種默認的策略,也能夠本身實現接口RejectedExecutionHandler本身作特殊策略,好比移交任務到另一個執行者,或者判斷下這個任務的重要性,而後再拋棄。
//ThreadPoolExecutor.CallerRunsPolicy:在調用者所在線程執行該任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
//ThreadPoolExecutor.AbortPolicy:放棄執行任務,拋出RejectedExecutionException異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
//ThreadPoolExecutor.DiscardPolicy:放棄執行任務,不拋出異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
//ThreadPoolExecutor.DiscardOldestPolicy:poll出隊一個最先任務,而後嘗試執行它
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
複製代碼
RxJavaSchedulersHook類 裏會生成IO Scheduler,默認調用CachedThreadScheduler。 裏面的CachedWorkerPool維護了一個線程管理的隊列expiringWorkerQueue, 默認是每隔60s就去經過evictor清除已通過期的線程,線程池沒有上限。所以若是短期內有大量任務要執行,會致使不停地建立新線程,因此存在出現pthread_create、OOM、耗費大量系統資源形成卡頓等問題。
public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
private static final long KEEP_ALIVE_TIME;
private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;
static final ThreadWorker SHUTDOWN_THREADWORKER;
static final CachedWorkerPool NONE;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
static {
SHUTDOWN_THREADWORKER = new ThreadWorker(RxThreadFactory.NONE);
SHUTDOWN_THREADWORKER.unsubscribe();
NONE = new CachedWorkerPool(null, 0, null);
NONE.shutdown();
KEEP_ALIVE_TIME = Integer.getInteger("rx.io-scheduler.keepalive", 60);
}
static final class CachedWorkerPool {
private final ThreadFactory threadFactory;
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
private final CompositeSubscription allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
this.threadFactory = threadFactory;
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeSubscription();
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(thread.getName() + " (Evictor)");
return thread;
}
});
NewThreadWorker.tryEnableCancelPolicy(evictor);
task = evictor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
evictorService = evictor;
evictorTask = task;
}
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
//每60s執行一次清除隊列中的已過期線程
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
long now() {
return System.nanoTime();
}
void shutdown() {
try {
if (evictorTask != null) {
evictorTask.cancel(true);
}
if (evictorService != null) {
evictorService.shutdownNow();
}
} finally {
allWorkers.unsubscribe();
}
}
}
public CachedThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update =
new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
@Override
public void shutdown() {
for (;;) {
CachedWorkerPool curr = pool.get();
if (curr == NONE) {
return;
}
if (pool.compareAndSet(curr, NONE)) {
curr.shutdown();
return;
}
}
}
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.once = new AtomicBoolean();
this.threadWorker = pool.get();
}
@Override
public void unsubscribe() {
if (once.compareAndSet(false, true)) {
// unsubscribe should be idempotent, so only do this once
// Release the worker _after_ the previous action (if any) has completed
threadWorker.schedule(this);
}
innerSubscription.unsubscribe();
}
@Override
public void call() {
pool.release(threadWorker);
}
@Override
public boolean isUnsubscribed() {
return innerSubscription.isUnsubscribed();
}
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } } static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } } } 複製代碼
在NewThreadWorker裏最終設置executor的初始化構造,這裏能夠看到是一個定時週期任務線程池,核心線程爲1.
/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
executor = exec;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue(), threadFactory);
}
複製代碼
本方案提供一些自定義的基礎類,也能夠選擇實現接口,自由替換爲本身想要的策略。
主要是利用RxJava自己RxJavaPlugins作替換線程池的操做。而後自定義一些scheduler所須要的類,完成相關邏輯。
RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHookImpl());
複製代碼
管理邏輯類:IOTaskPriorityType、IOMonitorManager、AppSchedulers IOMonitorManager:負責調用基礎監控類,而且替換默認RxJava的IO scheduler,做爲主要的邏輯管理類,負責初始化整個監控模塊,以及配置基本參數。
AppSchedulers做爲替換Schedulers的存在,用於應用層顯式地使用自定義IO Scheduler,區別於原來的RxJava的Sheduler.IO。
IOTaskPriorityType:定義不一樣的任務類型,用於區分管理。
大體結構以下圖所示。
改造後的任務執行流程也作了相應的改變,有別於上面提到的原生流程。對何時建立線程作了調整,優先在限制線程池大小範圍內建立新線程執行任務,而不是之前的無限建立或者先阻塞隊列等待執行。爲了方便對比,把上面的圖片也拷貝下來。
關於custominterface包:自定義IO scheduler或者scheduler所使用的線程池時,須要關注這個包下面的接口和抽象類 customScheduler包:已經自定義好的scheduler相關以及提供的基礎線程池,能夠參考這裏的實現,去自定義應用本身的線程池管理的scheduler
大部分時候你只須要關心IOMonitorManager這個入口管理類,其它只在須要自定義或者策略改動時才修改。
/*
* description 自定義IO線程監控管理類,配置監視器的基本參數,開啓各類debug方法等
* 監控要求:
* 一、自定義的scheduler須要繼承AbstractScheduler
* 二、參考或者直接使用ExecutorSchedulerWorker建立任務
* 三、若是是要替換原始RxJava的IO線程池,須要額外實現IThreadPool,建立本身的線程池類
* 四、IThreadPool的實現類裏,線程池的構造使用MonitorThreadPoolExecutor類,便於監控
* 五、編寫新的scheduler參考自定義的IOScheduler類
* 六、若是須要自定義SchedulerWork,須要實現Runnable, IBaseWork 接口,繼承Scheduler.Worker
* 七、默認提供IOThreadPool和LimitCoreThreadPool兩種基礎線程池,還有自定義的IOScheduler(用來替換本來RxJava的IOScheduler)
* 八、IOTaskPriorityType優先級類型,RxJava observable在subscribeOn時能夠選擇傳入
* 九、AbstractRejectedExecutionHandler能夠作一些拒絕任務的策略動做
*
* @see ExecutorSchedulerWorker
* 使用方式:
* 一、全部public方法提供配置參數
* 二、在基礎參數配置完後調用此方法startMonitor
* modify by
*/
//必須在應用第一次使用observable以前設置,這裏會替換rxjava的默認IO scheduler。
// 若是隻調用setReplaceIOScheduler方法,則替換時用基礎庫裏自帶的自定義IO scheduler
//LimitCoreThreadPool不是基礎庫默認的IO scheduler實現,通常都是替換線程池實現,不直接修改自定義的IO scheduler
IOMonitorManager.getInstance().setReplaceIOScheduler(true)
.setIOThreadPool(LimitCoreThreadPool.getInstance()
.build(2, BuildConfig.DEBUG ? 35 : 35, 15, 1000, false));
List<String> targetList = new ArrayList<>(2);
targetList.add("com.xtc");
//配置基本的監視器參數,下面參數能夠在IOMonitorManager的startMonitor後修改
IOMonitorManager.getInstance().setCostTimeLimit(0)
//超出這個活躍線程數就輸出到日誌
.setThreadActiveCountLimit(30)
//打印當前被監控的線程池信息
.setLogThreadPoolInfo(BuildConfig.DEBUG)
//打印其它非RxJava的IO線程信息
.setLogOtherThreadInfo(true)
.setPackageName(getPackageName())
// .setTargetNameList(list)
//監控器輪詢時間,每隔這麼久打印一些線程信息
.setMonitorIntervalTime(10)
//是否輸出更多日誌信息,看方法註釋
.setLogMoreInfo(BuildConfig.DEBUG)
//監控器是否啓用
.setMonitorEnable(BuildConfig.DEBUG)
//通常在調試時纔開啓,監控子線程重複切換線程
.setLogRepeatChangeThread(false)
//打印全部任務執行狀況,適合打樁分析當前時間段有哪些任務觸發
.setLogAllTaskRunningInfo(true)
//是否過濾堆棧
.setFilterStack(!BuildConfig.DEBUG);
複製代碼