RxJava 和 RxAndroid 五(線程調度)

對rxJava不瞭解的同窗能夠先看html

RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操做符的使用)
RxJava 和 RxAndroid 三(生命週期控制和內存優化)java

RxJava 和 RxAndroid 四(RxBinding的使用)android

 

本文將有幾個例子說明,rxjava線程調度的正確使用姿式。編程

例1app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable
                .create( new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );
 
                        subscriber.onNext( "dd" );
                        subscriber.onCompleted();
                    }
                })
                .map( new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88" ;
                    }
                })
                .subscribe( new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果ide

/rx_call: main           -- 主線程
/rx_map: main        --  主線程
/rx_subscribe: main   -- 主線程post

例2優化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
   new Thread( new Runnable() {
            @Override
            public void run() {
                Logger.v( "rx_newThread" , Thread.currentThread().getName()  );
                rx();
            }
        }).start();
 
void rx(){
        Observable
                .create( new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );
 
                        subscriber.onNext( "dd" );
                        subscriber.onCompleted();
                    }
                })
                .map( new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88" ;
                    }
                })
                .subscribe( new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;
 
    }

 

      結果url

/rx_newThread: Thread-564   -- 子線程
/rx_call: Thread-564              -- 子線程
/rx_map: Thread-564            -- 子線程 
/rx_subscribe: Thread-564    -- 子線程spa

 

  • 經過例1和例2,說明,Rxjava默認運行在當前線程中。若是當前線程是子線程,則rxjava運行在子線程;一樣,當前線程是主線程,則rxjava運行在主線程

 

例3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable
                .create( new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );
 
                        subscriber.onNext( "dd" );
                        subscriber.onCompleted();
                    }
                })
 
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
 
                .map( new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88" ;
                    }
                })
                .subscribe( new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ;

  結果

/rx_call: RxCachedThreadScheduler-1    --io線程
/rx_map: main                                     --主線程
/rx_subscribe: main                              --主線程

 

例4

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable
                .create( new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Logger.v( "rx_call" , Thread.currentThread().getName()  );
 
                        subscriber.onNext( "dd" );
                        subscriber.onCompleted();
                    }
                })
                .map( new Func1<String, String >() {
                    @Override
                    public String call(String s) {
                        Logger.v( "rx_map" , Thread.currentThread().getName()  );
                        return s + "88" ;
                    }
                })
 
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
 
                .subscribe( new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                    }
                }) ; 

      結果

/rx_call: RxCachedThreadScheduler-1     --io線程
/rx_map: RxCachedThreadScheduler-1   --io線程
/rx_subscribe: main                              --主線程

   

  • 經過例三、例4 能夠看出  .subscribeOn(Schedulers.io())  和 .observeOn(AndroidSchedulers.mainThread()) 寫的位置不同,形成的結果也不同。從例4中能夠看出 map() 操做符默認運行在事件產生的線程之中。事件消費只是在 subscribe() 裏面。
  • 對於 create() , just() , from()   等                 --- 事件產生   

               map() , flapMap() , scan() , filter()  等    --  事件加工

              subscribe()                                          --  事件消費

  •   事件產生:默認運行在當前線程,能夠由 subscribeOn()  自定義線程

         事件加工:默認跟事件產生的線程保持一致, 能夠由 observeOn() 自定義線程

       事件消費:默認運行在當前線程,能夠有observeOn() 自定義

 

例5  屢次切換線程

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
Observable
                 .create( new Observable.OnSubscribe<String>() {
                     @Override
                     public void call(Subscriber<? super String> subscriber) {
                         Logger.v( "rx_call" , Thread.currentThread().getName()  );
 
                         subscriber.onNext( "dd" );
                         subscriber.onCompleted();
                     }
                 })
 
                 .observeOn( Schedulers.newThread() )    //新線程
 
                 .map( new Func1<String, String >() {
                     @Override
                     public String call(String s) {
                         Logger.v( "rx_map" , Thread.currentThread().getName()  );
                         return s + "88" ;
                     }
                 })
 
                 .observeOn( Schedulers.io() )      //io線程
 
                 .filter( new Func1<String, Boolean>() {
                     @Override
                     public Boolean call(String s) {
                         Logger.v( "rx_filter" , Thread.currentThread().getName()  );
                         return s != null ;
                     }
                 })
 
                 .subscribeOn(Schedulers.io())     //定義事件產生線程:io線程
                 .observeOn(AndroidSchedulers.mainThread())     //事件消費線程:主線程
 
                 .subscribe( new Action1<String>() {
                     @Override
                     public void call(String s) {
                         Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                     }
                 }) ;

  結果

/rx_call: RxCachedThreadScheduler-1           -- io 線程
/rx_map: RxNewThreadScheduler-1             -- new出來的線程
/rx_filter: RxCachedThreadScheduler-2        -- io線程
/rx_subscribe: main                                   -- 主線程

 

例6:只規定了事件產生的線程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable
          .create( new Observable.OnSubscribe<String>() {
              @Override
              public void call(Subscriber<? super String> subscriber) {
                  Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                  subscriber.onNext( "dd" ) ;
              }
          })
          .subscribeOn(Schedulers.io())
          .subscribe( new Action1<String>() {
              @Override
              public void call(String s) {
                  Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
              }
          }) ;

  結果

/rx--create: RxCachedThreadScheduler-4                      // io 線程
/rx--subscribe: RxCachedThreadScheduler-4                 // io 線程

     

例:7:只規定事件消費線程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observable
                .create( new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        Log.v( "rx--create " , Thread.currentThread().getName() ) ;
                        subscriber.onNext( "dd" ) ;
                    }
                })
                .observeOn( Schedulers.newThread() )
                .subscribe( new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;
                    }
                }) ;

  結果

/rx--create: main                                           -- 主線程
/rx--subscribe: RxNewThreadScheduler-1        --  new 出來的子線程 

      

    從例6能夠看出,若是隻規定了事件產生的線程,那麼事件消費線程將跟隨事件產生線程。

    從例7能夠看出,若是隻規定了事件消費的線程,那麼事件產生的線程和 當前線程保持一致。

 

例8:線程調度封裝

 在Android 經常有這樣的場景,後臺處理處理數據,前臺展現數據。

通常的用法:

1
2
3
4
5
6
7
8
9
Observable
              .just( "123" )
              .subscribeOn( Schedulers.io())
              .observeOn( AndroidSchedulers.mainThread() )
              .subscribe( new Action1() {
                  @Override
                  public void call(Object o) {
                  }
              }) ;

  可是項目中這種場景有不少,因此咱們就想能不能把這種場景的調度方式封裝起來,方便調用。

簡單的封裝

1
2
3
4
public Observable apply( Observable observable ){
    return observable.subscribeOn( Schedulers.io() )
             .observeOn( AndroidSchedulers.mainThread() ) ;
}

使用

1
2
3
4
5
6
7
apply( Observable.just( "123" ) )
               .subscribe( new Action1() {
                   @Override
                   public void call(Object o) {
 
                   }
               }) ;

弊端:雖然上面的這種封裝能夠作到線程調度的目的,可是它破壞了鏈式編程的結構,是編程風格變得不優雅。

改進:Transformers 的使用(就是轉化器的意思,把一種類型的Observable轉換成另外一種類型的Observable )

改進後的封裝

1
2
3
4
5
6
Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
     @Override public Object call(Object observable) {
         return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                 .observeOn(AndroidSchedulers.mainThread());
     }
};

  使用

1
2
3
4
5
6
7
8
Observable
           .just( "123" )
           .compose( schedulersTransformer )
           .subscribe( new Action1() {
               @Override
               public void call(Object o) {
               }
           }) ;

  弊端:雖然保持了鏈式編程結構的完整,可是每次調用 .compose( schedulersTransformer ) 都是 new 了一個對象的。因此咱們須要再次封裝,儘可能保證單例的模式。

改進後的封裝

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package lib.app.com.myapplication;
 
import rx.Observable;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;
 
/**
  * Created by ${zyj} on 2016/7/1.
  */
public class RxUtil {
 
     private final static Observable.Transformer schedulersTransformer = new  Observable.Transformer() {
         @Override public Object call(Object observable) {
             return ((Observable)  observable).subscribeOn(Schedulers.newThread())
                     .observeOn(AndroidSchedulers.mainThread());
         }
     };
 
    public static  <T> Observable.Transformer<T, T> applySchedulers() {
         return (Observable.Transformer<T, T>) schedulersTransformer;
     }
 
}

  使用

1
2
3
4
5
6
7
8
Observable
             .just( "123" )
             .compose( RxUtil.<String>applySchedulers() )
             .subscribe( new Action1() {
                 @Override
                 public void call(Object o) {
                 }
             }) ;
相關文章
相關標籤/搜索