接觸RxJava2
已經好久了,也看了網上的不少文章,發現基本都是在對RxJava
的基本思想介紹以後,再去對各個操做符進行分析,可是看了以後感受過了不久就忘了。java
偶然的機會看到了開源項目 RxJava-Android-Samples,這裏一共介紹了十六種RxJava2
的使用場景,它從實際的應用場景出發介紹RxJava2
的使用,特別適合對於RxJava2
已經有初步瞭解的開發者進一步地去學習如何將其應用到實際開發當中。react
所以,我打算跟着這個項目的思路編寫一系列實戰的介紹並完成示例代碼編寫,並對該實例中用到的知識進行介紹,作到學以至用。下面,就開始第一個例子的學習,源碼的倉庫爲:RxSample。android
當咱們須要進行一些耗時操做,例以下載、訪問數據庫等,爲了避免阻塞主線程,每每會將其放在後臺進行處理,同時在處理的過程當中、處理完成後通知主線程更新UI
,這裏就涉及到了後臺線程和主線程之間的切換。首先回憶一下,在之前咱們通常會用如下兩種方式來實現這一效果:git
run()
方法中執行耗時的操做,並經過一個和主線程Looper
關聯的Handler
發送消息給主線程更新進度顯示、處理結果。AsyncTask
,在其doInBackground
方法中執行耗時的操做,調用publishProgress
方法通知主線程,而後在onProgressUpdate
中更新進度顯示,在onPostExecute
中顯示最終結果。那麼,讓咱們看一些在RxJava
中如何完成這一需求。github
咱們的界面上有一個按鈕mTvDownload
,點擊以後會發起一個耗時的任務,這裏咱們用Thread.sleep
來模擬耗時的操做,每隔500ms
咱們會將當前的進度通知主線程,在mTvDownloadResult
中顯示當前處理的進度。數據庫
public class BackgroundActivity extends AppCompatActivity {
private TextView mTvDownload;
private TextView mTvDownloadResult;
private CompositeDisposable mCompositeDisposable = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_background);
mTvDownload = (TextView) findViewById(R.id.tv_download);
mTvDownloadResult = (TextView) findViewById(R.id.tv_download_result);
mTvDownload.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
startDownload();
}
});
}
private void startDownload() {
final Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 100; i++) {
if (i % 20 == 0) {
try {
Thread.sleep(500); //模擬下載的操做。
} catch (InterruptedException exception) {
if (!e.isDisposed()) {
e.onError(exception);
}
}
e.onNext(i);
}
}
e.onComplete();
}
});
DisposableObserver<Integer> disposableObserver = new DisposableObserver<Integer>() {
@Override
public void onNext(Integer value) {
Log.d("BackgroundActivity", "onNext=" + value);
mTvDownloadResult.setText("Current Progress=" + value);
}
@Override
public void onError(Throwable e) {
Log.d("BackgroundActivity", "onError=" + e);
mTvDownloadResult.setText("Download Error");
}
@Override
public void onComplete() {
Log.d("BackgroundActivity", "onComplete");
mTvDownloadResult.setText("Download onComplete");
}
};
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable.add(disposableObserver);
}
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();
}
}
複製代碼
實際的運行結果以下: bash
在上面的例子中,涉及到了兩種類型的操做:網絡
subscribe(ObservableEmitter<Integer> e)
中的代碼。UI
更新的操做,對應於DisposableObserver
的全部回調,具體的是在onNext
中進行進度的更新;在onComplete
和onError
中展現最終的處理結果。那麼,這兩種類型操做所運行的線程是在哪裏指定的呢,關鍵是下面這句:多線程
observable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
複製代碼
subscribeOn(Schedulers.io())
:指定observable
的subscribe
方法運行在後臺線程。observeOn(AndroidSchedulers.mainThread())
:指定observer
的回調方法運行在主線程。這兩個函數剛開始的時候頗有可能弄混,我是這麼記的,subscribeOn
以s
開頭,能夠理解爲「上游」開頭的諧音,也就是上游執行的線程。框架
關於這兩個函數,還有一點說明:屢次調用subscribeOn
,會以第一次的爲準;而屢次調用observeOn
則會以最後一次的爲準,不過通常咱們都不會這麼幹,就不舉例子了。
subscribeOn/observeOn
都要求傳入一個Schedulers
的子類,它就表明了運行線程類型,下面咱們來看一下都有哪些選擇:
Schedulers.computation()
:用於計算任務,默認線程數等於處理器的數量。Schedulers.from(Executor executor)
:使用Executor
做爲調度器,關於Executor
框架能夠參考這篇文章:多線程知識梳理(5) - 線程池四部曲之 Executor 框架。Schedulers.immediate( )
:在當前線程執行任務Schedulers.io( )
:用於IO
密集型任務,例如訪問網絡、數據庫操做等,也是咱們最常使用的。Schedulers.newThread( )
:爲每個任務建立一個新的線程。Schedulers.trampoline( )
:當其它排隊的任務完成後,在當前線程排隊開始執行。Schedulers.single()
:全部任務共用一個後臺線程。以上是在io.reactivex.schedulers
包中,提供的Schedulers
,而若是咱們導入了下面的依賴,那麼在io.reactivex.android.schedulers
下,還有額外的兩個Schedulers
可選:
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
複製代碼
AndroidSchedulers.mainThread()
:運行在應用程序的主線程。AndroidSchedulers.from(Looper looper)
:運行在該looper
對應的線程當中。若是Activity
要被銷燬時,咱們的後臺任務沒有執行完,那麼就會致使Activity
不能正常回收,而對於每個Observer
,都會有一個Disposable
對象用於管理,而RxJava
提供了一個CompositeDisposable
類用於管理這些Disposable
,咱們只須要將其將入到該集合當中,在Activity
的onDestroy
方法中,調用它的clear
方法,就能避免內存泄漏的發生。
這個系列的第一篇文章,咱們介紹瞭如何使用subscribeOn/observeOn
來實現後臺執行耗時任務,並通知主線程更新進度。