給初學者的RxJava2.0教程(二)

Outlinejava

[TOC]數據庫

前言

上一節教程講解了最基本的RxJava2的使用, 在本節中, 咱們將學習RxJava強大的線程控制. api

正題

仍是以以前的例子, 兩根水管:網絡

RxJava

正常狀況下, 上游和下游是工做在同一個線程中的, 也就是說上游在哪一個線程發事件, 下游就在哪一個線程接收事件. 多線程

怎麼去理解呢, 以Android爲例, 一個Activity的全部動做默認都是在主線程中運行的, 好比咱們在onCreate中打出當前線程的名字:ide

@Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Log.d(TAG, Thread.currentThread().getName());
    }複製代碼

結果即是:學習

D/TAG: main複製代碼

回到RxJava中, 當咱們在主線程中去建立一個上游Observable來發送事件, 則這個上游默認就在主線程發送事件.ui

當咱們在主線程去建立一個下游Observer來接收事件, 則這個下游默認就在主線程中接收事件, 來看段代碼:spa

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         

    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          

    observable.subscribe(consumer);                                                             
}複製代碼

在主線程中分別建立上游和下游, 而後將他們鏈接在一塊兒, 同時分別打印出它們所在的線程, 運行結果爲:線程

D/TAG: Observable thread is : main
D/TAG: emit 1                     
D/TAG: Observer thread is :main   
D/TAG: onNext: 1複製代碼

這就驗證了剛纔所說, 上下游默認是在同一個線程工做.

這樣確定是知足不了咱們的需求的, 咱們更多想要的是這麼一種狀況, 在子線程中作耗時的操做, 而後回到主線程中來操做UI, 用圖片來描述就是下面這個圖片:

thread.png

在這個圖中, 咱們用黃色水管表示子線程, 深藍色水管表示主線程.

要達到這個目的, 咱們須要先改變上游發送事件的線程, 讓它去子線程中發送事件, 而後再改變下游的線程, 讓它去主線程接收事件. 經過RxJava內置的線程調度器能夠很輕鬆的作到這一點. 接下來看一段代碼:

@Override                                                                                       
protected void onCreate(Bundle savedInstanceState) {                                            
    super.onCreate(savedInstanceState);                                                         
    setContentView(R.layout.activity_main);                                                     

    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {   
        @Override                                                                               
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {            
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());           
            Log.d(TAG, "emit 1");                                                               
            emitter.onNext(1);                                                                  
        }                                                                                       
    });                                                                                         

    Consumer<Integer> consumer = new Consumer<Integer>() {                                      
        @Override                                                                               
        public void accept(Integer integer) throws Exception {                                  
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());              
            Log.d(TAG, "onNext: " + integer);                                                   
        }                                                                                       
    };                                                                                          

    observable.subscribeOn(Schedulers.newThread())                                              
            .observeOn(AndroidSchedulers.mainThread())                                          
            .subscribe(consumer);                                                               
}複製代碼

仍是剛纔的例子, 只不過咱們太添加了一點東西, 先來看看運行結果:

D/TAG: Observable thread is : RxNewThreadScheduler-2  
 D/TAG: emit 1                                         
 D/TAG: Observer thread is :main                       
 D/TAG: onNext: 1複製代碼

能夠看到, 上游發送事件的線程的確改變了, 是在一個叫 RxNewThreadScheduler-2的線程中發送的事件, 而下游仍然在主線程中接收事件, 這說明咱們的目的達成了, 接下來看看是如何作到的.

和上一段代碼相比,這段代碼只不過是增長了兩行代碼:

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())複製代碼

做爲一個初學者的入門教程, 並不會貼出一大堆源碼來分析, 所以只須要讓你們記住幾個要點, 已達到如何正確的去使用這個目的纔是咱們的目標.

簡單的來講, subscribeOn() 指定的是上游發送事件的線程, observeOn() 指定的是下游接收事件的線程.

屢次指定上游的線程只有第一次指定的有效, 也就是說屢次調用subscribeOn() 只有第一次的有效, 其他的會被忽略.

屢次指定下游的線程是能夠的, 也就是說每調用一次observeOn() , 下游的線程就會切換一次.

舉個例子:

observable.subscribeOn(Schedulers.newThread())     
         .subscribeOn(Schedulers.io())              
         .observeOn(AndroidSchedulers.mainThread()) 
         .observeOn(Schedulers.io())                
         .subscribe(consumer);複製代碼

這段代碼中指定了兩次上游發送事件的線程, 分別是newThread和IO線程, 下游也指定了兩次線程,分別是main和IO線程. 運行結果爲:

D/TAG: Observable thread is : RxNewThreadScheduler-3
D/TAG: emit 1                                       
D/TAG: Observer thread is :RxCachedThreadScheduler-1
D/TAG: onNext: 1複製代碼

能夠看到, 上游雖然指定了兩次線程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler 線程中, 而下游則跑到了RxCachedThreadScheduler 中, 這個CacheThread其實就是IO線程池中的一個.

爲了更清晰的看到下游的線程切換過程, 咱們加點log:

observable.subscribeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
                    }
                })
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "After observeOn(io), current thread is : " + Thread.currentThread().getName());
                    }
                })
                .subscribe(consumer);複製代碼

咱們在下游線程切換以後, 把當前的線程打印出來, 運行結果:

D/TAG: Observable thread is : RxNewThreadScheduler-1                                             
D/TAG: emit 1                                                                                    
D/TAG: After observeOn(mainThread), current thread is: main D/TAG: After observeOn(io), current thread is : RxCachedThreadScheduler-2 D/TAG: Observer thread is :RxCachedThreadScheduler-2 D/TAG: onNext: 1複製代碼

能夠看到, 每調用一次observeOn() 線程便會切換一次, 所以若是咱們有相似的需求時, 即可知道如何處理了.

在RxJava中, 已經內置了不少線程選項供咱們選擇, 例若有

  • Schedulers.io() 表明io操做的線程, 一般用於網絡,讀寫文件等io密集型的操做
  • Schedulers.computation() 表明CPU計算密集型的操做, 例如須要大量計算的操做
  • Schedulers.newThread() 表明一個常規的新線程
  • AndroidSchedulers.mainThread() 表明Android的主線程

這些內置的Scheduler已經足夠知足咱們開發的需求, 所以咱們應該使用內置的這些選項, 在RxJava內部使用的是線程池來維護這些線程, 全部效率也比較高.

實踐

對於咱們Android開發人員來講, 常常會將一些耗時的操做放在後臺, 好比網絡請求或者讀寫文件,操做數據庫等等,等到操做完成以後回到主線程去更新UI, 有了上面的這些基礎, 那麼如今咱們就能夠輕鬆的去作到這樣一些操做.

下面來舉幾個經常使用的場景.

網絡請求

Android中有名的網絡請求庫就那麼幾個, Retrofit可以從中脫穎而出很大緣由就是由於它支持RxJava的方式來調用, 下面簡單講解一下它的基本用法.

要使用Retrofit,先添加Gradle配置:

//retrofit
    compile 'com.squareup.retrofit2:retrofit:2.1.0'
    //Gson converter
    compile 'com.squareup.retrofit2:converter-gson:2.1.0'
    //RxJava2 Adapter
    compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'
    //okhttp
    compile 'com.squareup.okhttp3:okhttp:3.4.1'
    compile 'com.squareup.okhttp3:logging-interceptor:3.4.1'複製代碼

隨後定義Api接口:

public interface Api {
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);
}複製代碼

接着建立一個Retrofit客戶端:

private static Retrofit create() {
            OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
            builder.readTimeout(10, TimeUnit.SECONDS);
            builder.connectTimeout(9, TimeUnit.SECONDS);

            if (BuildConfig.DEBUG) {
                HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
                interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
                builder.addInterceptor(interceptor);
            }

            return new Retrofit.Builder().baseUrl(ENDPOINT)
                    .client(builder.build())
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
}複製代碼

發起請求就很簡單了:

Api api = retrofit.create(Api.class);
        api.login(request)
              .subscribeOn(Schedulers.io())               //在IO線程進行網絡請求
             .observeOn(AndroidSchedulers.mainThread())  //回到主線程去處理請求結果
            .subscribe(new Observer<LoginResponse>() {
                    @Override
                public void onSubscribe(Disposable d) {}

                @Override
                public void onNext(LoginResponse value) {}

                @Override
                public void onError(Throwable e) {
                    Toast.makeText(mContext, "登陸失敗", Toast.LENGTH_SHORT).show();
                }

                @Override
                public void onComplete() {
                    Toast.makeText(mContext, "登陸成功", Toast.LENGTH_SHORT).show();
                }
            });複製代碼

看似很完美, 但咱們忽略了一點, 若是在請求的過程當中Activity已經退出了, 這個時候若是回到主線程去更新UI, 那麼APP確定就崩潰了, 怎麼辦呢, 上一節咱們說到了Disposable , 說它是個開關, 調用它的dispose()方法時就會切斷水管, 使得下游收不到事件, 既然收不到事件, 那麼也就不會再去更新UI了. 所以咱們能夠在Activity中將這個Disposable 保存起來, 當Activity退出時, 切斷它便可.

那若是有多個Disposable 該怎麼辦呢, RxJava中已經內置了一個容器CompositeDisposable, 每當咱們獲得一個Disposable時就調用CompositeDisposable.add()將它添加到容器中, 在退出的時候, 調用CompositeDisposable.clear() 便可切斷全部的水管.

讀寫數據庫

上面說了網絡請求的例子, 接下來再看看讀寫數據庫, 讀寫數據庫也算一個耗時的操做, 所以咱們也最好放在IO線程裏去進行, 這個例子就比較簡單, 直接上代碼:

public Observable<List<Record>> readAllRecords() {
        return Observable.create(new ObservableOnSubscribe<List<Record>>() {
            @Override
            public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                Cursor cursor = null;
                try {
                    cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                    List<Record> result = new ArrayList<>();
                    while (cursor.moveToNext()) {
                        result.add(Db.Record.read(cursor));
                    }
                    emitter.onNext(result);
                    emitter.onComplete();
                } finally {
                    if (cursor != null) {
                        cursor.close();
                    }
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
    }複製代碼

好了本次的教程就到這裏吧, 後面的教程將會教你們如何使用RxJava中強大的操做符. 經過使用這些操做符能夠很輕鬆的作到各類吊炸天的效果. 敬請期待.

相關文章
相關標籤/搜索