媽媽不再用擔憂你不會使用線程池了(ThreadUtils)

爲何要用線程池

使用線程池管理線程有以下優勢:java

  1. 下降資源消耗:經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  2. 提升響應速度:當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  3. 提升線程的可管理性:線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。

線程池介紹

ThreadPoolExecutor

Java 爲咱們提供了 ThreadPoolExecutor 來建立一個線程池,其完整構造函數以下所示:git

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) 複製代碼
  • int corePoolSize(核心線程數):線程池新建線程的時候,若是當前線程總數小於corePoolSize,則新建的是核心線程,若是超過corePoolSize,則新建的是非核心線程;核心線程默認狀況下會一直存活在線程池中,即便這個核心線程啥也不幹(閒置狀態);若是設置了 allowCoreThreadTimeOut 爲 true,那麼核心線程若是不幹活(閒置狀態)的話,超過必定時間(時長下面參數決定),就會被銷燬掉。github

  • int maximumPoolSize(線程池能容納的最大線程數量):線程總數 = 核心線程數 + 非核心線程數。緩存

  • long keepAliveTime(非核心線程空閒存活時長):非核心線程空閒時長超過該時長將會被回收,主要應用在緩存線程池中,當設置了 allowCoreThreadTimeOut 爲 true 時,對核心線程一樣起做用。bash

  • TimeUnit unit(keepAliveTime 的單位):它是一個枚舉類型,經常使用的如:TimeUnit.SECONDS(秒)、TimeUnit.MILLISECONDS(毫秒)。併發

  • BlockingQueue workQueue(任務隊列):當全部的核心線程都在幹活時,新添加的任務會被添加到這個隊列中等待處理,若是隊列滿了,則新建非核心線程執行任務,經常使用的 workQueue 類型:異步

    1. SynchronousQueue:這個隊列接收到任務的時候,會直接提交給線程處理,而不保留它,若是全部線程都在工做怎麼辦?那就新建一個線程來處理這個任務!因此爲了保證不出現 線程數達到了 maximumPoolSize 而不能新建線程 的錯誤,使用這個類型隊列的時候,maximumPoolSize 通常指定成 Integer.MAX_VALUE,即無限大。async

    2. LinkedBlockingQueue:這個隊列接收到任務的時候,若是當前線程數小於核心線程數,則新建線程(核心線程)處理任務;若是當前線程數等於核心線程數,則進入隊列等待。因爲這個隊列沒有最大值限制,即全部超過核心線程數的任務都將被添加到隊列中,這也就致使了 maximumPoolSize 的設定失效,由於總線程數永遠不會超過 corePoolSize。ide

    3. ArrayBlockingQueue:能夠限定隊列的長度,接收到任務的時候,若是沒有達到 corePoolSize 的值,則新建線程(核心線程)執行任務,若是達到了,則入隊等候,若是隊列已滿,則新建線程(非核心線程)執行任務,又若是總線程數到了 maximumPoolSize,而且隊列也滿了,則發生錯誤。函數

    4. DelayQueue:隊列內元素必須實現 Delayed 接口,這就意味着你傳進去的任務必須先實現 Delayed 接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,纔會執行任務。

  • ThreadFactory threadFactory(線程工廠):用來建立線程池中的線程,一般用默認的便可。

  • RejectedExecutionHandler handler(拒絕策略):在線程池已經關閉的狀況下和任務太多致使最大線程數和任務隊列已經飽和,沒法再接收新的任務,在上面兩種狀況下,只要知足其中一種時,在使用 execute() 來提交新的任務時將會拒絕,線程池提供瞭如下 4 種策略:

    1. AbortPolicy:默認策略,在拒絕任務時,會拋出RejectedExecutionException。

    2. CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。

    3. DiscardOldestPolicy:該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。

    4. DiscardPolicy:該策略默默的丟棄沒法處理的任務,不予任何處理。

線程池執行策略

當一個任務要被添加進線程池時,有如下四種執行策略:

  1. 線程數量未達到 corePoolSize,則新建一個線程(核心線程)執行任務。
  2. 線程數量達到了 corePoolsSize,則將任務移入隊列等待。
  3. 隊列已滿,新建非核心線程執行任務。
  4. 隊列已滿,總線程數又達到了 maximumPoolSize,就會由 RejectedExecutionHandler 拋出異常。

其流程圖以下所示:

常見的四類線程池

常見的四類線程池分別有 FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 和 CachedThreadPool,它們其實都是經過 ThreadPoolExecutor 建立的,其參數以下表所示:

參數 FixedThreadPool SingleThreadExecutor ScheduledThreadPool CachedThreadPool
corePoolSize nThreads 1 corePoolSize 0
maximumPoolSize nThreads 1 Integer.MAX_VALUE Integer.MAX_VALUE
keepAliveTime 0 0 10 60
unit MILLISECONDS MILLISECONDS MILLISECONDS SECONDS
workQueue LinkedBlockingQueue LinkedBlockingQueue DelayedWorkQueue SynchronousQueue
threadFactory defaultThreadFactory defaultThreadFactory defaultThreadFactory defaultThreadFactory
handler defaultHandler defaultHandler defaultHandler defaultHandler
適用場景 已知併發壓力的狀況下,對線程數作限制 須要保證順序執行的場景,而且只有一個線程在執行 須要多個後臺線程執行週期任務的場景 處理執行時間比較短的任務

若是你不想本身寫一個線程池,那麼你能夠從上面看看有沒有符合你要求的(通常都夠用了),若是有,那麼很好你直接用就好了,若是沒有,那你就老老實實本身去寫一個吧。

合理地配置線程池

須要針對具體狀況而具體處理,不一樣的任務類別應採用不一樣規模的線程池,任務類別可劃分爲 CPU 密集型任務、IO 密集型任務和混合型任務。

  • CPU 密集型任務:線程池中線程個數應儘可能少,推薦配置爲 (CPU 核心數 + 1);

  • IO 密集型任務:因爲 IO 操做速度遠低於 CPU 速度,那麼在運行這類任務時,CPU 絕大多數時間處於空閒狀態,那麼線程池能夠配置儘可能多些的線程,以提升 CPU 利用率,推薦配置爲 (2 * CPU 核心數 + 1);

  • 混合型任務:能夠拆分爲 CPU 密集型任務和 IO 密集型任務,當這兩類任務執行時間相差無幾時,經過拆分再執行的吞吐率高於串行執行的吞吐率,但若這兩類任務執行時間有數據級的差距,那麼沒有拆分的意義。

線程池工具類封裝及使用

爲了提高開發效率及更好地使用和管理線程池,我已經爲大家封裝好了線程工具類----ThreadUtils,依賴 AndroidUtilCode 1.16.1 版本便可使用,其 API 以下所示:

isMainThread            : 判斷當前是否主線程
getFixedPool            : 獲取固定線程池
getSinglePool           : 獲取單線程池
getCachedPool           : 獲取緩衝線程池
getIoPool               : 獲取 IO 線程池
getCpuPool              : 獲取 CPU 線程池
executeByFixed          : 在固定線程池執行任務
executeByFixedWithDelay : 在固定線程池延時執行任務
executeByFixedAtFixRate : 在固定線程池按固定頻率執行任務
executeBySingle         : 在單線程池執行任務
executeBySingleWithDelay: 在單線程池延時執行任務
executeBySingleAtFixRate: 在單線程池按固定頻率執行任務
executeByCached         : 在緩衝線程池執行任務
executeByCachedWithDelay: 在緩衝線程池延時執行任務
executeByCachedAtFixRate: 在緩衝線程池按固定頻率執行任務
executeByIo             : 在 IO 線程池執行任務
executeByIoWithDelay    : 在 IO 線程池延時執行任務
executeByIoAtFixRate    : 在 IO 線程池按固定頻率執行任務
executeByCpu            : 在 CPU 線程池執行任務
executeByCpuWithDelay   : 在 CPU 線程池延時執行任務
executeByCpuAtFixRate   : 在 CPU 線程池按固定頻率執行任務
executeByCustom         : 在自定義線程池執行任務
executeByCustomWithDelay: 在自定義線程池延時執行任務
executeByCustomAtFixRate: 在自定義線程池按固定頻率執行任務
cancel                  : 取消任務的執行
複製代碼

若是你使用 RxJava 很 6,並且項目中已經使用了 RxJava,那麼你能夠繼續使用 RxJava 來作線程切換的操做;若是你並不會 RxJava 或者是在開發 SDK,那麼這個工具類再適合你不過了,它能夠爲你統一管理線程池的使用,不至於讓你的項目中出現過多的線程池。

ThreadUtils 使用極爲方便,看 API 便可明白相關意思,FixedPool、SinglePool、CachedPool 分別對應了上面介紹的 FixedThreadPool、SingleThreadExecutor、CachedThreadPool 這三種,IoPool 是建立 (CPU_COUNT * 2 + 1) 個核心線程數,CpuPool 是創建 (CPU_COUNT + 1) 個核心線程數;而全部的 execute 都是線程池外圍裹了一層 ScheduledThreadPool,這裏和 RxJava 線程池的實現有所類似,能夠更方便地提供延時任務和固定頻率執行的任務,固然也能夠更方便地取消任務的執行,下面讓咱們來簡單地來介紹其使用,以從 assets 中拷貝 APK 到 SD 卡爲例,其代碼以下所示:

public static void releaseInstallApk(final OnReleasedListener listener) {
    if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
        ThreadUtils.executeByIo(new ThreadUtils.SimpleTask<Void>() {
            @Override
            public Void doInBackground() throws Throwable {
                ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
                return null;
            }

            @Override
            public void onSuccess(Void result) {
                if (listener != null) {
                    listener.onReleased();
                }
            }
        });
    } else {
        if (listener != null) {
            listener.onReleased();
        }
        LogUtils.d("test apk existed.");
    }
}
複製代碼

看起來還不是很優雅是吧,你能夠把相關的 Task 都抽出來放到合適的包下,這樣每一個 Task 的職責一看便知,如上例子能夠改裝成以下所示:

public class ReleaseInstallApkTask extends ThreadUtils.SimpleTask<Void> {

    private OnReleasedListener mListener;

    public ReleaseInstallApkTask(final OnReleasedListener listener) {
        mListener = listener;
    }

    @Override
    public Void doInBackground() throws Throwable {
        ResourceUtils.copyFileFromAssets("test_install", Config.TEST_APK_PATH);
        return null;
    }

    @Override
    public void onSuccess(Void result) {
        if (mListener != null) {
            mListener.onReleased();
        }
    }

    public void execute() {
        ThreadUtils.executeByIo(this);
    }
}

public static void releaseInstallApk(final OnReleasedListener listener) {
    if (!FileUtils.isFileExists(Config.TEST_APK_PATH)) {
        new ReleaseInstallApkTask(listener).execute();
    } else {
        if (listener != null) {
            listener.onReleased();
        }
        LogUtils.d("test apk existed.");
    }
}
複製代碼

是否是瞬間清爽了不少,若是執行成功的回調中涉及了 View 相關的操做,那麼你須要在 destroy 中取消 task 的執行哦,不然會內存泄漏哦,繼續以上面的例子爲例,代碼以下所示:

public class XXActivity extends Activity {
    ···
    
    @Override
    protected void onDestroy() {
        // ThreadUtils.cancel(releaseInstallApkTask);// 或者下面的取消均可以
        releaseInstallApkTask.cancel();
        super.onDestroy();
    }
}
複製代碼

以上是以 SimpleTask 爲例,Task 的話會多兩個回調,onCancel() 和 onFail(Throwable t),它們和 onSuccess(T result) 都是互斥的,最終回調只會走它們其中之一,而且在 Android 端是發送到主線程中執行,若是是 Java 端的話那就仍是會在相應的線程池中執行,這點也方便了我作單元測試。

線程池工具類單元測試

若是遇到了異步的單測,你會發現單測很快就跑完呢,並無等待咱們線程跑完再結束,咱們能夠用 CountDownLatch 來等待線程的結束,或者化異步爲同步的作法,這裏咱們使用 CountDownLatch 來實現,我進行了簡單的封裝,測試 Fixed 的代碼以下所示:

public class ThreadUtilsTest {

    @Test
    public void executeByFixed() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestTask<String> task = new TestTask<String>(latch) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500 + index * 10);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixed(3, task);
            }
        });
    }

    @Test
    public void executeByFixedWithDelay() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestTask<String> task = new TestTask<String>(latch) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixedWithDelay(3, task, 500 + index * 10, TimeUnit.MILLISECONDS);
            }
        });
    }

    @Test
    public void executeByFixedAtFixRate() throws Exception {
        asyncTest(10, new TestRunnable<String>() {
            @Override
            public void run(final int index, CountDownLatch latch) {
                final TestScheduledTask<String> task = new TestScheduledTask<String>(latch, 3) {
                    @Override
                    public String doInBackground() throws Throwable {
                        Thread.sleep(500 + index * 10);
                        if (index < 4) {
                            return Thread.currentThread() + " :" + index;
                        } else if (index < 7) {
                            cancel();
                            return null;
                        } else {
                            throw new NullPointerException(String.valueOf(index));
                        }
                    }

                    @Override
                    void onTestSuccess(String result) {
                        System.out.println(result);
                    }
                };
                ThreadUtils.executeByFixedAtFixRate(3, task, 3000 + index * 10, TimeUnit.MILLISECONDS);
            }
        });
    }

    abstract static class TestScheduledTask<T> extends ThreadUtils.Task<T> {

        private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger();
        private int mTimes;
        CountDownLatch mLatch;

        TestScheduledTask(final CountDownLatch latch, final int times) {
            mLatch = latch;
            mTimes = times;
        }

        abstract void onTestSuccess(T result);

        @Override
        public void onSuccess(T result) {
            onTestSuccess(result);
            if (ATOMIC_INTEGER.addAndGet(1) % mTimes == 0) {
                mLatch.countDown();
            }
        }

        @Override
        public void onCancel() {
            System.out.println(Thread.currentThread() + " onCancel: ");
            mLatch.countDown();
        }

        @Override
        public void onFail(Throwable t) {
            System.out.println(Thread.currentThread() + " onFail: " + t);
            mLatch.countDown();
        }
    }

    abstract static class TestTask<T> extends ThreadUtils.Task<T> {
        CountDownLatch mLatch;

        TestTask(final CountDownLatch latch) {
            mLatch = latch;
        }

        abstract void onTestSuccess(T result);

        @Override
        public void onSuccess(T result) {
            onTestSuccess(result);
            mLatch.countDown();
        }

        @Override
        public void onCancel() {
            System.out.println(Thread.currentThread() + " onCancel: ");
            mLatch.countDown();
        }

        @Override
        public void onFail(Throwable t) {
            System.out.println(Thread.currentThread() + " onFail: " + t);
            mLatch.countDown();
        }
    }

    <T> void asyncTest(int threadCount, TestRunnable<T> runnable) throws Exception {
        CountDownLatch latch = new CountDownLatch(threadCount);
        for (int i = 0; i < threadCount; i++) {
            runnable.run(i, latch);
        }
        latch.await();
    }

    interface TestRunnable<T> {
        void run(final int index, CountDownLatch latch);
    }
}
複製代碼

最後想說的話

感謝你們一塊兒陪伴 AndroidUtilCode 的成長,核心工具類幾乎都已囊括,也是聚集了我大量的心血,把開源作到了極致,但願你們能夠用的舒心,大大提高開發效率,早日贏取白富美,走上人生巔峯。

歡迎來個人 狗窩 坐坐哈

後文再添加一個我的對 OkHttp 的線程池的使用分析,算是送上個小福利。

OkHttp 中的線程池使用

查看 OkHttp 的源碼發現,不管是同步請求仍是異步請求,最終都是交給 Dispatcher 作處理,咱們看下該類和線程池有關的的主要代碼:

public final class Dispatcher {
  // 最大請求數
  private int maxRequests = 64;
  // 相同 host 最大請求數
  private int maxRequestsPerHost = 5;
  // 請求執行線程池,懶加載
  private @Nullable ExecutorService executorService;
  // 就緒狀態的異步請求隊列
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
  // 運行中的異步請求隊列,包括還沒完成的請求
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  public Dispatcher(ExecutorService executorService) {
      this.executorService = executorService;
  }

  public Dispatcher() {
  }

  public synchronized ExecutorService executorService() {
      if (executorService == null) {
          // 和 CachedThreadPool 很類似
          executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
      }
      return executorService;
  }

  synchronized void enqueue(AsyncCall call) {
    // 不超過最大請求數而且不超過 host 最大請求數
    if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
      // 添加到運行中的異步請求隊列
      runningAsyncCalls.add(call);
      // 添加到線程池中運行
      executorService().execute(call);
    } else {
      // 添加到就緒的異步請求隊列
      readyAsyncCalls.add(call);
    }
  }

  // 當該異步請求結束的時候,會調用此方法,用於將運行中的異步請求隊列中的該請求移除並調整請求隊列
  // 此時就緒隊列中的請求就能夠進入運行中的隊列
  void finished(AsyncCall call) {
      finished(runningAsyncCalls, call, true);
  }

  private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
      int runningCallsCount;
      Runnable idleCallback;
      synchronized (this) {
          if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
          if (promoteCalls) promoteCalls();
          runningCallsCount = runningCallsCount();
          idleCallback = this.idleCallback;
      }

      if (runningCallsCount == 0 && idleCallback != null) {
          idleCallback.run();
      }
  }

  // 根據 maxRequests 和 maxRequestsPerHost 來調整 runningAsyncCalls 和 readyAsyncCalls
  // 使運行中的異步請求不超過兩種最大值,而且若是隊列有空閒,將就緒狀態的請求歸類爲運行中。
  private void promoteCalls() {
    // 若是運行中的異步隊列不小於最大請求數,直接返回
    if (runningAsyncCalls.size() >= maxRequests) return;
    // 若是就緒隊列爲空,直接返回
    if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    // 遍歷就緒隊列並插入到運行隊列
    for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
      AsyncCall call = i.next();

      if (runningCallsForHost(call) < maxRequestsPerHost) {
        i.remove();
        runningAsyncCalls.add(call);
        executorService().execute(call);
      }
      // 運行隊列中的數量到達最大請求數,直接返回
      if (runningAsyncCalls.size() >= maxRequests) return;
    }
  }
}
複製代碼

能夠發現 OkHttp 不是在線程池中維護線程的個數,線程是經過 Dispatcher 間接控制,線程池中的請求都是運行中的請求,這也就是說線程的重用不是線程池控制的,經過源碼咱們發現線程重用的地方是請求結束的地方 finished(AsyncCall call) ,而真正的控制是經過 promoteCalls 方法, 根據 maxRequestsmaxRequestsPerHost 來調整 runningAsyncCallsreadyAsyncCalls,使運行中的異步請求不超過兩種最大值,而且若是隊列有空閒,將就緒狀態的請求歸類爲運行中。

相關文章
相關標籤/搜索