- 原文地址:Multithreading with RxJava
- 原文做者:Pierce Zaifman
- 譯文出自:掘金翻譯計劃
- 譯者:PhxNirvana
- 校對者:yazhi1992、stormrabbit
大多數狀況下,我寫的 Android 代碼都是能夠流暢運行的。直到上幾周編寫一個須要讀取和分析大型文件的 app 以前,我從未關心過 app 運行速度的問題。javascript
儘管我指望用戶明白文件越大,耗時越長的道理,有時候他們仍會放棄個人應用。他們可能認爲應用卡住了,也多是由於他們就不想等那麼久。因此若是我能把時間縮短至少一半的話,必定會大有裨益的。java
由於我全部後臺任務都用 RxJava 重寫了,因此繼續用 RxJava 來解決這個問題也是天然而然的。尤爲是我還有一些以下所示的代碼:android
List<String> dataList;
//這裏是數據列表
List<DataModel> result = new ArrayList<>();
for (String data : dataList) {
result.add(DataParser.createData(data));
}複製代碼
因此我只是想把循環的每一個操做放到一個後臺線程中。以下所示:git
List<String> dataList;
//這裏是數據列表
List<Observable<DataModel>> tasks = new ArrayList<>();
for (String data : dataList) {
tasks.add(Observable.just(data).subscribeOn(Schedulers.io()).map(s -> {
// 返回一個 DataModel 對象
return DataParser.createData(s);
}));
}
List<DataModel> result = new ArrayList<>();
// 等待運行結束並收集結果
for (DataModel dataModel : Observable.merge(tasks).toBlocking().toIterable()) {
result.add(dataModel);
}複製代碼
的確起做用了,時間減小了近一半。但也致使大量垃圾回收(GC),這使得加載時的 UI 又卡又慢。爲了搞清楚問題的緣由,我加了一句 log 打印以下信息 Thread.currentThread().getName()
。 這樣我就搞清楚了,我在處理每一段數據時都新建了線程。正如結果所示,建立上千個線程並非什麼好主意。github
我已經完成了加速數據處理的目標,但運行起來並不那麼流暢。我想知道若是不觸發這麼多 GC 的話還能不能跑得再快點。因此我本身寫了一個線程池並指定了最大線程數來供 RxJava 調用,省的每次處理數據都要建立新線程:多線程
List<String> dataList;
//這裏是數據列表
List<Observable<DataModel>> tasks = new ArrayList<>();
// 取得可以使用的最大線程數
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(threadPoolExecutor);
for (String data : dataList) {
tasks.add(Observable.just(data).subscribeOn(scheduler).map(s -> {
// 返回一個 DataModel 對象
return DataParser.createData(s);
}));
}
List<DataModel> result = new ArrayList<>();
// 等待運行結束並收集結果
for (DataModel dataModel : Observable.merge(tasks).toBlocking().toIterable()) {
result.add(dataModel);
}複製代碼
對於單個數據都很大的數據集來講,這樣減小了約 10% 的數據處理時間。然而,對於單個數據都很小的數據集就減小了約 30% 的時間。同時也減小了 GC 的調用次數,但 GC 仍是太頻繁。併發
我有一個新想法——若是性能的瓶頸是頻繁的切換和調用線程呢?爲了克服這個問題,我能夠將數據集根據線程的數目平均分紅總數量相等的子集合,每一個子合集丟給一個線程處理。這樣雖然是併發運行,可是每一個線程被調用的次數將被下降到最小。我嘗試使用 這裏 的解決方法來實現個人想法:app
List<String> dataList;
//這裏是數據列表
// 取得可以使用的最大線程數
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(threadPoolExecutor);
AtomicInteger groupIndex = new AtomicInteger();
// 以線程數量爲依據分組數據,將每組數據放到它們本身的線程中
Iterable<List<DataModel>> resultGroups =
Observable.from(dataList).groupBy(k -> groupIndex.getAndIncrement() % threadCount)
.flatMap(group -> group.observeOn(scheduler).toList().map(sublist -> {
List<DataModel> dataModels = new ArrayList<>();
for (String data : sublist) {
dataModels.add(DataParser.createData(data));
}
return dataModels;
})).toBlocking().toIterable();
List<DataModel> result = new ArrayList<>();
// 等待運行結束並收集結果
for (List<DataModel> dataModels : resultGroups) {
result.addAll(dataModels);
}複製代碼
上文中我提到用兩類數據集進行測試,一類的數據自己是大文件,可是數據集裏包含的數據個數不多;另外一類數據集裏的每個數據並非很大,可是包含數據的總量不少。當我再次測試時,第一組數據幾乎沒差異,而第二組改變至關大。以前幾乎要 20秒,如今只需 5秒。函數
第二類數據集運行時間改進了如此大的緣由,是由於每一個線程再也不處理一個數據(而是處理一個從整體數據集裏拆分下來的小數據集)。以前每個數據,都須要調用一個線程來處理。如今我減小了調用線程的次數,從而提高了性能。工具
上面的代碼要執行併發還有一些地方須要修改,因此我整理了代碼並放到工具類中,使其更具備通用性。
/** * 將數據集拆分紅子集並指派給規定數量的線程,並傳入回調來進行具體業務邏輯處理。 * <b>T</b> 是要被處理的數據類型,<b>U</b> 是返回的數據類型 */
public static <T, U> Iterable<U> parseDataInParallel(List<T> data, Func1<List<T>, U> worker) {
int threadCount = Runtime.getRuntime().availableProcessors();
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(threadCount);
Scheduler scheduler = Schedulers.from(threadPoolExecutor);
AtomicInteger groupIndex = new AtomicInteger();
return Observable.from(data).groupBy(k -> groupIndex.getAndIncrement() % threadCount)
.flatMap(group -> group.observeOn(scheduler).toList().map(worker)).toBlocking().toIterable();
}
//***EXAMPLE USAGE***
Iterable<List<DataModel>> resultGroups = Util.parseDataInParallel(dataList,
(sublist) -> {
List<DataModel> dataModels = new ArrayList<>();
for (String data : sublist) {
dataModels.add(DataParser.createData(data));
}
return dataModels;
});
List<DataModel> results = new ArrayList<>();
for (List<DataModel> dataModels : resultGroups) {
results.addAll(dataModels);
}複製代碼
這裏 T
是被處理的數據類型,樣例中是DataModel
。傳入待處理的 List<T>
並指望結果是 U
。在個人樣例中 U
是 List<DataModel>
,但它能夠是任何東西,並不必定是一個 list。傳入的回調函數負責數據子列表具體的業務處理並返回結果。
事實上影響運行速度的因素有許多。好比線程管理方式,線程數,設備等。大多數因素我沒法控制,但總有一些是我沒有考慮到的。
若是每一個數據大小不相等會怎麼樣?舉個例子,若是有 4 個線程,每一個被指派給第 4 線程的數據大小是被指派給其餘線程的十倍會怎麼樣?這時第四個線程的耗時就是其餘線程的大約 10 倍。這種狀況下使用多線程就不會減小多少時間。個人第二次嘗試基本解決了這個問題,由於線程只在須要時才初始化。但這個方法太慢了。
我也試過改變數據分組方式。做爲隨意分配的取代,我能夠跟蹤每一組數據的總量,而後將數據分配給最少的那組。這樣每一個線程的工做量就接近平均了。倒黴的是,測試以後發現這樣作增長的時間遠大於它節省的時間。
數據被分配的大小越平均,處理速度就越快。但大多數狀況下,隨機分配看起來更快些。理想狀況下是每一個線程一有空就分配任務,同時執行分配所消耗的資源也少,這是最高效的。但我找不到一個足夠高效的能夠減小分配瓶頸的方法。
因此若是你想用多線程,這是個人建議。若是你有什麼好想法,請務必告訴我。獲得一個最優解(若是有的話)老是很難的。以及,能用多線程並不意味着必須用多線程。。