just(T t1, T t2, T t3 ....)
,just可以傳入多個一樣類型的參數,並將當前參數一個接着一個的發送。java
Observable.just("1","2","3")
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
1
2
3
repeat()
將當前的消息序列無限制循環發送。咱們可以傳入一個參數表示循環的次數數組
Observable.just("1","2","3")
.repeat(3)
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
123123123
延遲建立Observable
。markdown
再訂閱時建立Observable
對象。該方法利用call
方法的特性。ide
post
public static void main(String[] args) {
Observable.defer(new Func0<Observable<Integer>>() {
@Override
public Observable<Integer> call() {
return getInt();
}
});
}
public static Observable<Integer> getInt() {
System.out.println("getInt()");
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<?super Integer> subscriber) { System.out.print("ss"); subscriber.onNext(42); } }); }
此時getInt()
方法不會被調用,會在subscribe()
時調用。這個假設看過源代碼很是easy理解。或者看我以前的博客RxJava 源代碼走讀之Observable.create()和subscribe()大數據
從指定數字開始發射數字。spa
Observable.range(3,2)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer);
}
});
34
range(int start,int count)
第一個參數爲從哪一個數開始,第二個參數爲發送多少個。.net
過濾做用。依據回調的條件對序列進行篩選。線程
查詢0~49能被3整除的數。code
private static ArrayList<Integer> array = new ArrayList<>();
public static void main(String[] args) {
init();
Observable.from(array)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
//推斷條件,假設返回false則該發送內容將取消,true將繼續發送
return integer%3==0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
}
public static void init(){
for (int i=0;i<50;i++){
array.add(i);
}
}
0 3 6 9 12 15 18 21 24 27 30 33 36 39 42 45 48
take()
獲取發射序列的前幾個。後面的取消發送。takeLast()
獲取發射序列的後幾個。其他的取消繼續向下發送獲取0~49的前三個數和最後三個數
Observable.from(array)
.take(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
System.out.println();
Observable.from(array)
.takeLast(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
0 1 2
47 48 49
將發送序列中反覆的值除去。即發送序列後面的值假設和前面有重疊,則後面的值不會被髮送。 該方法去重時需要記錄發送序列每一次發送的值。因此當有大數據時要注意發送的值。
Observable.from(array)
.take(3)
.repeat(3)
.distinct()
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
0 1 2
該方法和distinct()
的差異爲,當前發射值與上一次發射值一樣時則取消當前發射,假設不一樣樣,則繼續發射。
即所謂的有改變時發射。
故名思意。就是獲取發射序列的第一個和最後一個。
同一時候。該方法可以依據條件進行選擇符合條件的第一個和最後一個。
獲取0~49中3的倍數的最後一個值
Observable.from(array)
.last(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%3==0&&integer!=0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
48
跳過發射序列的前幾個和最後幾個 。
該方法和take()
,takeLast()
類似。
跳過0~49發射序列中的前三個和後三個
Observable.from(array)
.skip(3)
.skipLast(3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
獲取發射序列指定位置的發射值。
當中當咱們指定位置大於發射序列時,會拋出異常。因此推薦使用帶有默認值的elementAtDefault()
。
對0~49的發射序列,獲取前三個元素的發射後獲取第五個位置的元素值。假設沒有,則設置默認值爲3.
Observable.from(array)
.take(3)
.elementAtOrDefault(5,3)
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.print(integer+" ");
}
});
3
輪詢。該操做符每隔指定時間發送一次事件。
該方法默認在conmputation
線程執行
Observable
.interval(3, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Long>() {
@Override
public void call(Long s) {
Log.i("info",s);
}
}
);
第一個參數:延時時間 第二個參數:單位
該操做符會從0開始。每隔1秒發送一次
略微複雜點的,對於列表。咱們要遍歷打印此列表,則代碼例如如下
Observable.interval(3,TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(array.get(aLong.intValue()));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("info","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("info","error");
}
@Override
public void onNext(String s) {
Log.i("info","onNext--"+s);
}
});
04-13 15:44:28.634 15455-15455/mahao.alex.rxjava I/info: onNext--aa
04-13 15:44:31.634 15455-15455/mahao.alex.rxjava I/info: onNext--bb
04-13 15:44:34.634 15455-15455/mahao.alex.rxjava I/info: onNext--cc
04-13 15:44:37.634 15455-15455/mahao.alex.rxjava I/info: onNext--dd
04-13 15:44:40.644 15455-15455/mahao.alex.rxjava I/info: error
打印例如如下,而且是每隔三秒。
打印一次。。
延遲固定時間後發送元素。
與interval()
差異爲該操做符僅僅發送一次。
Observable.timer(3,TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(array.get(aLong.intValue()));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("info","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("info","error");
}
@Override
public void onNext(String s) {
Log.i("info","onNext--"+s);
}
});
04-13 15:52:32.114 23036-23036/mahao.alex.rxjava I/info: onNext--aa
04-13 15:52:32.114 23036-23036/mahao.alex.rxjava I/info: onCompleted
注意:該操做符執行在conputation
線程中。
將發射序列每隔固定間隔獲取其近期值並向下發送。
這個分爲兩種狀況。
sample
的時間間隔sample
的時間間隔對於另一種狀況,就是每隔固定間隔發射就能夠。而第一種狀況存在的一種特殊狀況
如下咱們看一下樣例
有一個數組{「aa」,」bb」,」cc」,」dd」}每隔三秒發射,而sample
每隔兩秒篩選。
Observable.interval(3,TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<String>>() {
@Override
public Observable<String> call(Long aLong) {
return Observable.just(array.get(aLong.intValue()));
}
})
.sample(2,TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.i("info","onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("info","error");
}
@Override
public void onNext(String s) {
Log.i("info","onNext--"+s);
}
});
04-13 16:13:45.404 11935-11935/mahao.alex.rxjava I/info: onNext--aa
04-13 16:13:49.404 11935-11935/mahao.alex.rxjava I/info: onNext--bb
04-13 16:13:51.404 11935-11935/mahao.alex.rxjava I/info: onNext--cc
04-13 16:13:55.404 11935-11935/mahao.alex.rxjava I/info: onNext--dd
04-13 16:13:56.414 11935-11935/mahao.alex.rxjava I/info: error
看一下他們的事件間隔。四次發射的時間間隔爲 4,2,4。最後error暫且不提。
爲何是這個時間間隔呢?
圖儘管醜,但仍是有必定道理的
再上一張好看的圖
指定最小的發射時間間隔,假設指定的當前時間間隔內沒有發送元素。則拋出異常,中止。
當發送的數據的時間間隔小於debounce
指定的時間間隔,則當前發送的數據將被過濾,假設在指定的時間間隔內仍沒有數據發送,則會發送最後一個。