對rxJava不瞭解的同窗能夠先看html
RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操做符的使用)
RxJava 和 RxAndroid 三(生命週期控制和內存優化)java
RxJava 和 RxAndroid 四(RxBinding的使用)android
本文將有幾個例子說明,rxjava線程調度的正確使用姿式。編程
例1app
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Logger.v(
"rx_call"
, Thread.currentThread().getName() );
subscriber.onNext(
"dd"
);
subscriber.onCompleted();
}
})
.map(
new
Func1<String, String >() {
@Override
public
String call(String s) {
Logger.v(
"rx_map"
, Thread.currentThread().getName() );
return
s +
"88"
;
}
})
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName() );
}
}) ;
|
結果ide
/rx_call: main -- 主線程
/rx_map: main -- 主線程
/rx_subscribe: main -- 主線程post
例2優化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
new
Thread(
new
Runnable() {
@Override
public
void
run() {
Logger.v(
"rx_newThread"
, Thread.currentThread().getName() );
rx();
}
}).start();
void
rx(){
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Logger.v(
"rx_call"
, Thread.currentThread().getName() );
subscriber.onNext(
"dd"
);
subscriber.onCompleted();
}
})
.map(
new
Func1<String, String >() {
@Override
public
String call(String s) {
Logger.v(
"rx_map"
, Thread.currentThread().getName() );
return
s +
"88"
;
}
})
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName() );
}
}) ;
}
|
結果url
/rx_newThread: Thread-564 -- 子線程
/rx_call: Thread-564 -- 子線程
/rx_map: Thread-564 -- 子線程
/rx_subscribe: Thread-564 -- 子線程spa
例3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Logger.v(
"rx_call"
, Thread.currentThread().getName() );
subscriber.onNext(
"dd"
);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map(
new
Func1<String, String >() {
@Override
public
String call(String s) {
Logger.v(
"rx_map"
, Thread.currentThread().getName() );
return
s +
"88"
;
}
})
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName() );
}
}) ;
|
結果
/rx_call: RxCachedThreadScheduler-1 --io線程
/rx_map: main --主線程
/rx_subscribe: main --主線程
例4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Logger.v(
"rx_call"
, Thread.currentThread().getName() );
subscriber.onNext(
"dd"
);
subscriber.onCompleted();
}
})
.map(
new
Func1<String, String >() {
@Override
public
String call(String s) {
Logger.v(
"rx_map"
, Thread.currentThread().getName() );
return
s +
"88"
;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName() );
}
}) ;
|
結果
/rx_call: RxCachedThreadScheduler-1 --io線程
/rx_map: RxCachedThreadScheduler-1 --io線程
/rx_subscribe: main --主線程
map() , flapMap() , scan() , filter() 等 -- 事件加工
subscribe() -- 事件消費
事件加工:默認跟事件產生的線程保持一致, 能夠由 observeOn() 自定義線程
事件消費:默認運行在當前線程,能夠有observeOn() 自定義
例5 屢次切換線程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Logger.v(
"rx_call"
, Thread.currentThread().getName() );
subscriber.onNext(
"dd"
);
subscriber.onCompleted();
}
})
.observeOn( Schedulers.newThread() )
//新線程
.map(
new
Func1<String, String >() {
@Override
public
String call(String s) {
Logger.v(
"rx_map"
, Thread.currentThread().getName() );
return
s +
"88"
;
}
})
.observeOn( Schedulers.io() )
//io線程
.filter(
new
Func1<String, Boolean>() {
@Override
public
Boolean call(String s) {
Logger.v(
"rx_filter"
, Thread.currentThread().getName() );
return
s !=
null
;
}
})
.subscribeOn(Schedulers.io())
//定義事件產生線程:io線程
.observeOn(AndroidSchedulers.mainThread())
//事件消費線程:主線程
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Logger.v(
"rx_subscribe"
, Thread.currentThread().getName() );
}
}) ;
|
結果
/rx_call: RxCachedThreadScheduler-1 -- io 線程
/rx_map: RxNewThreadScheduler-1 -- new出來的線程
/rx_filter: RxCachedThreadScheduler-2 -- io線程
/rx_subscribe: main -- 主線程
例6:只規定了事件產生的線程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Log.v(
"rx--create "
, Thread.currentThread().getName() ) ;
subscriber.onNext(
"dd"
) ;
}
})
.subscribeOn(Schedulers.io())
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Log.v(
"rx--subscribe "
, Thread.currentThread().getName() ) ;
}
}) ;
|
結果
/rx--create: RxCachedThreadScheduler-4 // io 線程
/rx--subscribe: RxCachedThreadScheduler-4 // io 線程
例:7:只規定事件消費線程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
Observable
.create(
new
Observable.OnSubscribe<String>() {
@Override
public
void
call(Subscriber<?
super
String> subscriber) {
Log.v(
"rx--create "
, Thread.currentThread().getName() ) ;
subscriber.onNext(
"dd"
) ;
}
})
.observeOn( Schedulers.newThread() )
.subscribe(
new
Action1<String>() {
@Override
public
void
call(String s) {
Log.v(
"rx--subscribe "
, Thread.currentThread().getName() ) ;
}
}) ;
|
結果
/rx--create: main -- 主線程
/rx--subscribe: RxNewThreadScheduler-1 -- new 出來的子線程
從例6能夠看出,若是隻規定了事件產生的線程,那麼事件消費線程將跟隨事件產生線程。
從例7能夠看出,若是隻規定了事件消費的線程,那麼事件產生的線程和 當前線程保持一致。
例8:線程調度封裝
在Android 經常有這樣的場景,後臺處理處理數據,前臺展現數據。
通常的用法:
1
2
3
4
5
6
7
8
9
|
Observable
.just(
"123"
)
.subscribeOn( Schedulers.io())
.observeOn( AndroidSchedulers.mainThread() )
.subscribe(
new
Action1() {
@Override
public
void
call(Object o) {
}
}) ;
|
可是項目中這種場景有不少,因此咱們就想能不能把這種場景的調度方式封裝起來,方便調用。
簡單的封裝
1
2
3
4
|
public
Observable apply( Observable observable ){
return
observable.subscribeOn( Schedulers.io() )
.observeOn( AndroidSchedulers.mainThread() ) ;
}
|
使用
1
2
3
4
5
6
7
|
apply( Observable.just(
"123"
) )
.subscribe(
new
Action1() {
@Override
public
void
call(Object o) {
}
}) ;
|
弊端:雖然上面的這種封裝能夠作到線程調度的目的,可是它破壞了鏈式編程的結構,是編程風格變得不優雅。
改進:Transformers 的使用(就是轉化器的意思,把一種類型的Observable轉換成另外一種類型的Observable )
改進後的封裝
1
2
3
4
5
6
|
Observable.Transformer schedulersTransformer =
new
Observable.Transformer() {
@Override
public
Object call(Object observable) {
return
((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
|
使用
1
2
3
4
5
6
7
8
|
Observable
.just(
"123"
)
.compose( schedulersTransformer )
.subscribe(
new
Action1() {
@Override
public
void
call(Object o) {
}
}) ;
|
弊端:雖然保持了鏈式編程結構的完整,可是每次調用 .compose( schedulersTransformer ) 都是 new 了一個對象的。因此咱們須要再次封裝,儘可能保證單例的模式。
改進後的封裝
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package
lib.app.com.myapplication;
import
rx.Observable;
import
rx.android.schedulers.AndroidSchedulers;
import
rx.schedulers.Schedulers;
/**
* Created by ${zyj} on 2016/7/1.
*/
public
class
RxUtil {
private
final
static
Observable.Transformer schedulersTransformer =
new
Observable.Transformer() {
@Override
public
Object call(Object observable) {
return
((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
public
static
<T> Observable.Transformer<T, T> applySchedulers() {
return
(Observable.Transformer<T, T>) schedulersTransformer;
}
}
|
使用
1
2
3
4
5
6
7
8
|
Observable
.just(
"123"
)
.compose( RxUtil.<String>applySchedulers() )
.subscribe(
new
Action1() {
@Override
public
void
call(Object o) {
}
}) ;
|