RxJava自帶線程池監控和管理的探索之旅

背景

RxJava很方便的切換主子線程、指定任務運行的線程,在這個便利以後還隱藏着不少問題。好比IO scheduler是一個無上限線程池,若是短期併發量過大,在手機端可能出現OOM或者pthread_create錯誤。另外,在實際業務中咱們須要對執行的業務進行優先級區分,以便優先級高的任務先執行,想實現這個需求必然須要對RxJava默認的scheduler進行改造。本文將從RxJava IO scheduler分析、介紹線程池相關知識、如何對IO scheduler進行改造等方向進行介紹,而且對應用舊代碼作到無侵入式的替換。java

線程池相關知識回顧

1. 線程以及線程池的含義

在介紹主體內容以前,咱們先回顧下線程池的相關知識,這樣能更好的理解本文章內容。從字面意思上來講,線程池確定是一個裝着線程的"池",小則是魚塘,裝的少,可是家裏沒礦只能承包這麼大的魚塘;固然若是是大佬,說不定這一片海都是他的。線程池確定不是簡單的承載容納線程的池子,既然做爲相似倉庫的屬性,必然有管理之意。git

線程是做爲一個任務的執行承載者,接收來自程序的諸多執行請求,其中子線程是合理利用cpu性能避免阻塞主線程的存在。好東西都是不可貪多,線程若是不加以管理,確定會被程序各處的代碼隨意建立,這樣會浪費或者影響cpu某個時刻的性能,甚至致使當前進程出現異常。github

阻塞隊列,這是線程池待執行任務的容器,負責管理要執行的任務。阻塞二字說明它的入隊和出隊是可控的,所謂阻塞,在某些狀況下會掛起線程(即阻塞),一旦條件知足,被掛起的線程又會自動被喚醒。按照隊列的數據結構進行出入某一個任務時會阻塞,這樣在多線程環境下才更安全的生產任務和消費任務。緩存

比較通俗的一個例子有倉庫提取貨物,若是倉庫裏只有有限的幾個小車運輸貨物,此時有不少運輸員來提貨,確定要遇到爭奪小車、等待小車。小車相似於cpu的核數,也能夠理解爲線程池容許建立的總數。總而言之,當前倉庫(線程池)只有有限的幾輛小車同時工做,每一個運輸員(程序代碼塊)想要獲取貨物(執行代碼)就必需要爭奪小車資源。當有空閒小車時,會按照必定規則分配給某個經銷商,多是隊列的簡單前後入隊等待順序,也多是優先級(畢竟氪金無敵VIP)。固然有多是大佬,家裏礦多,說小車不夠,我去買,這樣會造成一個無限制上限線程池模型。不過這樣作有一種風險就是倉庫體積沒法隨意擴容(整個程序所承載的機器性能有限),買了太多小車放不下,而後整個倉庫就癱瘓了,這時候可能OOM或pthread_create錯誤。小車每次使用完後,都會繼續被分配到下一個任務,固然若是經銷商的事情都處理完了,可能就都閒置了,有可能晚上沒活,倉庫就把小車都封存起來,整理回收到固定地方(超時閒置後回收非核心線程),有可能留下幾輛預先說好的小車以便晚上有緊急貨物時處理(核心線程常駐)。 以下圖所示,大體概念以下。綠色的取貨車多是由於取貨車不夠,臨時採購或者借調的,相似於線程池臨時開啓新線程。紅色的取貨車區域是核心線程,有限的。安全

通常核心線程數根據CPU數量來肯定,線程池數大於CPU數量,看似是併發執行任務,實際上是操做系統幫咱們在按照必定時間片進行調度來執行任務,達到一種同時執行的效果,因此大量線程同時執行對CPU負載性能要求,會讓機器達到50%甚至100%高負荷運做,此時整機機器發生出錯的機率增大。因此同時執行不少任務致使頻繁切換線程自己也是一種額外的開銷 ,不建議如此操做,尤爲是部分任務是低優先級且不重要、可延遲的。bash

單個線程大概1MB左右開銷,在Java內部開發版的JDK中,加入fiber這種新的任務調度模型,開銷只有200KB左右,經過100W級任務調度測試,據介紹性能比thread優異不少,release版預測還會提升,特性相似於協程。不過咱們Android不太可能使用付費JDK。數據結構

線程池模型.png

經過上面的描述咱們簡單的瞭解線程池與線程之間的關係,線程和調用者之間的聯繫,而且線程池運做時是有本身的規則設定,調用者和每一個被管理的線程必須遵照規則。多線程

2. 線程池的種類

java經過Executors提供了四種經常使用線程池,這四種本質上也是封裝了自定義線程池的基本參數,簡化了建立流程,提供特定的功能。併發

先介紹下自定義線程池的基本參數和含義,這樣更好理解下面的java封裝好的線程池。ide

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
複製代碼

一個任務被提交時,它的大體流程都是類似的。下面是流程圖(下圖取自文章https://juejin.im/post/5b052dd7f265da0ba567e7f1)

原生線程池提交流程

  1. CachedThreadPool 緩存線程池 這是一種線程數無上限的線程池,能夠在有空閒線程時複用,無空閒時新建線程。默認狀況下,60s回收空閒線程,而且阻塞隊列爲SynchronousQueue(這是一種無容量阻塞隊列,當拿到任務入隊時就判斷是否有線程能夠執行,若是有就馬上出隊執行,不然就阻塞等待)
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
複製代碼
  1. FixedThreadPool 固定大小線程池 線程池的線程總數是有上限的,當初始化線程池時能夠設置它的容量,若是待執行任務超出總數就須要在隊列中等待了。
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
複製代碼
  1. SchedulerThreadPool 定時週期任務線程池 它不少參數都和固定大小線程池同樣,除了阻塞隊列選用了DelayQueue,這是一種按照延時長短排序的隊列。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}
複製代碼
  1. SingleThreadExecutor 單線程池 只有一個線程,因此隊列裏的任務按照隊列的出隊規則逐個執行,隊列採用的是一個鏈表結構的隊列。
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
複製代碼

3.自定義線程池相關類介紹

通常都是繼承ThreadPoolExecutor類(不繼承也能夠,可是繼承是爲了作更多的方法執行監控),而後根據須要設置下面7種參數。

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
複製代碼

比較重要的就是阻塞隊列workQueue和拒絕策略handler的選擇,這個會在後續RxJava的IO scheduler監控方案裏再次介紹。java提供的默認隊列的種類有無限大小和有限,帶優先級、帶延時等,根據須要能夠選擇不一樣類型的隊列。ThreadFactory 是構造新的thread的工廠,這裏自定義一個能夠進行新建線程的監控。拒絕策略handler有提供四種默認的策略,也能夠本身實現接口RejectedExecutionHandler本身作特殊策略,好比移交任務到另一個執行者,或者判斷下這個任務的重要性,而後再拋棄。

//ThreadPoolExecutor.CallerRunsPolicy:在調用者所在線程執行該任務

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
//ThreadPoolExecutor.AbortPolicy:放棄執行任務,拋出RejectedExecutionException異常。

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }

//ThreadPoolExecutor.DiscardPolicy:放棄執行任務,不拋出異常。

        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }

//ThreadPoolExecutor.DiscardOldestPolicy:poll出隊一個最先任務,而後嘗試執行它

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }

複製代碼

IO scheduler的弊端

RxJavaSchedulersHook類 裏會生成IO Scheduler,默認調用CachedThreadScheduler。 裏面的CachedWorkerPool維護了一個線程管理的隊列expiringWorkerQueue, 默認是每隔60s就去經過evictor清除已通過期的線程,線程池沒有上限。所以若是短期內有大量任務要執行,會致使不停地建立新線程,因此存在出現pthread_create、OOM、耗費大量系統資源形成卡頓等問題。

public final class CachedThreadScheduler extends Scheduler implements SchedulerLifecycle {
    private static final long KEEP_ALIVE_TIME;
    private static final TimeUnit KEEP_ALIVE_UNIT = TimeUnit.SECONDS;

    static final ThreadWorker SHUTDOWN_THREADWORKER;

    static final CachedWorkerPool NONE;

    final ThreadFactory threadFactory;

    final AtomicReference<CachedWorkerPool> pool;

    static {
        SHUTDOWN_THREADWORKER = new ThreadWorker(RxThreadFactory.NONE);
        SHUTDOWN_THREADWORKER.unsubscribe();

        NONE = new CachedWorkerPool(null, 0, null);
        NONE.shutdown();

        KEEP_ALIVE_TIME = Integer.getInteger("rx.io-scheduler.keepalive", 60);
    }

    static final class CachedWorkerPool {
        private final ThreadFactory threadFactory;
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        private final CompositeSubscription allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;

        CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
            this.threadFactory = threadFactory;
            this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
            this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
            this.allWorkers = new CompositeSubscription();

            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
                    @Override public Thread newThread(Runnable r) {
                        Thread thread = threadFactory.newThread(r);
                        thread.setName(thread.getName() + " (Evictor)");
                        return thread;
                    }
                });
                NewThreadWorker.tryEnableCancelPolicy(evictor);
                task = evictor.scheduleWithFixedDelay(
                        new Runnable() {
                            @Override
                            public void run() {
                                evictExpiredWorkers();
                            }
                        }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
                );
            }
            evictorService = evictor;
            evictorTask = task;
        }

        ThreadWorker get() {
            if (allWorkers.isUnsubscribed()) {
                return SHUTDOWN_THREADWORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }

        void release(ThreadWorker threadWorker) {
            // Refresh expire time before putting worker back in pool
            threadWorker.setExpirationTime(now() + keepAliveTime);

            expiringWorkerQueue.offer(threadWorker);
        }
        //每60s執行一次清除隊列中的已過期線程
        void evictExpiredWorkers() {
            if (!expiringWorkerQueue.isEmpty()) {
                long currentTimestamp = now();

                for (ThreadWorker threadWorker : expiringWorkerQueue) {
                    if (threadWorker.getExpirationTime() <= currentTimestamp) {
                        if (expiringWorkerQueue.remove(threadWorker)) {
                            allWorkers.remove(threadWorker);
                        }
                    } else {
                        // Queue is ordered with the worker that will expire first in the beginning, so when we
                        // find a non-expired worker we can stop evicting.
                        break;
                    }
                }
            }
        }

        long now() {
            return System.nanoTime();
        }

        void shutdown() {
            try {
                if (evictorTask != null) {
                    evictorTask.cancel(true);
                }
                if (evictorService != null) {
                    evictorService.shutdownNow();
                }
            } finally {
                allWorkers.unsubscribe();
            }
        }
    }

    public CachedThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
    }

    @Override
    public void start() {
        CachedWorkerPool update =
            new CachedWorkerPool(threadFactory, KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }
    @Override
    public void shutdown() {
        for (;;) {
            CachedWorkerPool curr = pool.get();
            if (curr == NONE) {
                return;
            }
            if (pool.compareAndSet(curr, NONE)) {
                curr.shutdown();
                return;
            }
        }
    }

    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

    static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
        private final CompositeSubscription innerSubscription = new CompositeSubscription();
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
        final AtomicBoolean once;

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.once = new AtomicBoolean();
            this.threadWorker = pool.get();
        }

        @Override
        public void unsubscribe() {
            if (once.compareAndSet(false, true)) {
                // unsubscribe should be idempotent, so only do this once

                // Release the worker _after_ the previous action (if any) has completed
                threadWorker.schedule(this);
            }
            innerSubscription.unsubscribe();
        }

        @Override
        public void call() {
            pool.release(threadWorker);
        }

        @Override
        public boolean isUnsubscribed() {
            return innerSubscription.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action) {
            return schedule(action, 0, null);
        }

        @Override
        public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
            if (innerSubscription.isUnsubscribed()) {
                // don't schedule, we are unsubscribed return Subscriptions.unsubscribed(); } ScheduledAction s = threadWorker.scheduleActual(new Action0() { @Override public void call() { if (isUnsubscribed()) { return; } action.call(); } }, delayTime, unit); innerSubscription.add(s); s.addParent(innerSubscription); return s; } } static final class ThreadWorker extends NewThreadWorker { private long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } } } 複製代碼

在NewThreadWorker裏最終設置executor的初始化構造,這裏能夠看到是一個定時週期任務線程池,核心線程爲1.

/* package */
    public NewThreadWorker(ThreadFactory threadFactory) {
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
        // Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
        boolean cancelSupported = tryEnableCancelPolicy(exec);
        if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
            registerExecutor((ScheduledThreadPoolExecutor)exec);
        }
        executor = exec;
    }
 public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
        Action0 decoratedAction = RxJavaHooks.onScheduledAction(action);
        ScheduledAction run = new ScheduledAction(decoratedAction);
        Future<?> f;
        if (delayTime <= 0) {
            f = executor.submit(run);
        } else {
            f = executor.schedule(run, delayTime, unit);
        }
        run.add(f);

        return run;
    }


    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
複製代碼

技術方案分析

本線程管理方案特性

  1. 幾乎0侵入代碼,只須要在Application初始化。若是要使用優先級,則須要對改變優先級的任務的代碼塊傳入參數,屬於正常侵入。
  2. 線程建立、執行的流程優化,更適合應用APP利用線程池執行任務
  3. 線程的執行代碼塊定位和耗時統計等基本信息收集,以及以此爲基礎的任務追蹤頻度,好比某個階段的線程開啓狀況和任務耗時,任務執行頻度統計。
  4. 避免pthread_create、Rxjava IO scheduler線程開啓過多致使的OOM等問題
  5. 任務優先級執行,可在性能不足時丟棄執行可拋棄類型的任務
  6. 提供當前應用總線程數和其餘線程名字的打印輸出
  7. 可監控子線程中再次切換線程的狀況,避免多餘線程開銷

本方案提供一些自定義的基礎類,也能夠選擇實現接口,自由替換爲本身想要的策略。

方案分析

主要是利用RxJava自己RxJavaPlugins作替換線程池的操做。而後自定義一些scheduler所須要的類,完成相關邏輯。

RxJavaPlugins.getInstance().registerSchedulersHook(new RxJavaSchedulersHookImpl());
複製代碼

管理邏輯類:IOTaskPriorityType、IOMonitorManager、AppSchedulers IOMonitorManager:負責調用基礎監控類,而且替換默認RxJava的IO scheduler,做爲主要的邏輯管理類,負責初始化整個監控模塊,以及配置基本參數。

AppSchedulers做爲替換Schedulers的存在,用於應用層顯式地使用自定義IO Scheduler,區別於原來的RxJava的Sheduler.IO。

IOTaskPriorityType:定義不一樣的任務類型,用於區分管理。

大體結構以下圖所示。

IO scheduler線程監控結構圖.png

改造後的任務執行流程也作了相應的改變,有別於上面提到的原生流程。對何時建立線程作了調整,優先在限制線程池大小範圍內建立新線程執行任務,而不是之前的無限建立或者先阻塞隊列等待執行。爲了方便對比,把上面的圖片也拷貝下來。

線程池替換後的任務執行流程.png

原生線程池提交流程

使用說明

關於custominterface包:自定義IO scheduler或者scheduler所使用的線程池時,須要關注這個包下面的接口和抽象類 customScheduler包:已經自定義好的scheduler相關以及提供的基礎線程池,能夠參考這裏的實現,去自定義應用本身的線程池管理的scheduler

大部分時候你只須要關心IOMonitorManager這個入口管理類,其它只在須要自定義或者策略改動時才修改。

/*
 * description  自定義IO線程監控管理類,配置監視器的基本參數,開啓各類debug方法等
 * 監控要求:
 * 一、自定義的scheduler須要繼承AbstractScheduler
 * 二、參考或者直接使用ExecutorSchedulerWorker建立任務
 * 三、若是是要替換原始RxJava的IO線程池,須要額外實現IThreadPool,建立本身的線程池類
 * 四、IThreadPool的實現類裏,線程池的構造使用MonitorThreadPoolExecutor類,便於監控
 * 五、編寫新的scheduler參考自定義的IOScheduler類
 * 六、若是須要自定義SchedulerWork,須要實現Runnable, IBaseWork 接口,繼承Scheduler.Worker
 * 七、默認提供IOThreadPool和LimitCoreThreadPool兩種基礎線程池,還有自定義的IOScheduler(用來替換本來RxJava的IOScheduler)
 * 八、IOTaskPriorityType優先級類型,RxJava observable在subscribeOn時能夠選擇傳入
 * 九、AbstractRejectedExecutionHandler能夠作一些拒絕任務的策略動做
 *
 * @see ExecutorSchedulerWorker
 * 使用方式:
 * 一、全部public方法提供配置參數
 * 二、在基礎參數配置完後調用此方法startMonitor
 * modify by
 */


   //必須在應用第一次使用observable以前設置,這裏會替換rxjava的默認IO scheduler。
                // 若是隻調用setReplaceIOScheduler方法,則替換時用基礎庫裏自帶的自定義IO scheduler
                //LimitCoreThreadPool不是基礎庫默認的IO scheduler實現,通常都是替換線程池實現,不直接修改自定義的IO scheduler
                IOMonitorManager.getInstance().setReplaceIOScheduler(true)
                        .setIOThreadPool(LimitCoreThreadPool.getInstance()
                                .build(2, BuildConfig.DEBUG ? 35 : 35, 15, 1000, false));

                List<String> targetList = new ArrayList<>(2);
                targetList.add("com.xtc");
                //配置基本的監視器參數,下面參數能夠在IOMonitorManager的startMonitor後修改
                IOMonitorManager.getInstance().setCostTimeLimit(0)
                        //超出這個活躍線程數就輸出到日誌
                        .setThreadActiveCountLimit(30)
                        //打印當前被監控的線程池信息
                        .setLogThreadPoolInfo(BuildConfig.DEBUG)
                        //打印其它非RxJava的IO線程信息
                        .setLogOtherThreadInfo(true)
                        .setPackageName(getPackageName())
//                        .setTargetNameList(list)
                        //監控器輪詢時間,每隔這麼久打印一些線程信息
                        .setMonitorIntervalTime(10)
                        //是否輸出更多日誌信息,看方法註釋
                        .setLogMoreInfo(BuildConfig.DEBUG)
                        //監控器是否啓用
                        .setMonitorEnable(BuildConfig.DEBUG)
                        //通常在調試時纔開啓,監控子線程重複切換線程
                        .setLogRepeatChangeThread(false)
                        //打印全部任務執行狀況,適合打樁分析當前時間段有哪些任務觸發
                        .setLogAllTaskRunningInfo(true)
                        //是否過濾堆棧
                        .setFilterStack(!BuildConfig.DEBUG);

複製代碼

源碼

源碼地址

相關文章
相關標籤/搜索