本文的主題爲轉換 Observable 的操做符。 這裏的 Observable 實質上是可觀察的數據流。html
RxJava操做符(二)Transforming Observablesjava
public static void Dump<T>(this IObservable<T> source, string name) { source.Subscribe( i => Console.WriteLine("{0}-->{1}", name, i), ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message), () => Console.WriteLine("{0} completed", name)); }
fun <T> Observable<T>.dump() = this.subscribe({ println("onNext: $it") }, { println("onError: $it: ${it.message}") }, { println("onComplete") }) fun <T> Observable<T>.dump(name: String) = this.subscribe({ println("$name: onNext: $it") }, { println("$name: onError: $it: ${it.message}") }, { println("$name: onComplete") })
ReactiveX - Buffer operator Reactive Extensions再入門 その25「値をまとめるBufferメソッド」react
Buffer 轉換數據流:間隔性地將數據流中鄰近的數據打包,造成數據包的數據流。 這裏的數據包即靜態的數組。swift
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10); source.Buffer(3, 1) .Subscribe( buffer => { Console.WriteLine("--Buffered values"); foreach (var value in buffer) Console.WriteLine(value); }, () => Console.WriteLine("Completed")); /* --Buffered values 0 1 2 --Buffered values 1 2 3 .. --Buffered values 7 8 9 --Buffered values 8 9 --Buffered values 9 Completed */
Observable.range(0, 10) .buffer(4) .dump() /* onNext: [0, 1, 2, 3] onNext: [4, 5, 6, 7] onNext: [8, 9] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .buffer(250, TimeUnit.MILLISECONDS) .dump() /* onNext: [0, 1] onNext: [2, 3, 4] onNext: [5, 6] onNext: [7, 8, 9] onNext: [] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS) .take(10) .buffer(250, TimeUnit.MILLISECONDS, 2) .dump() /* onNext: [0, 1] onNext: [] onNext: [2, 3] onNext: [] onNext: [4, 5] onNext: [6] onNext: [7, 8] onNext: [9] onNext: [] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) .dump() /* onNext: [0, 1] onNext: [2, 3] onNext: [4, 5, 6] onNext: [7, 8, 9] onNext: [] onComplete */
Observable.range(0, 10) .buffer(4, 3) .dump() /* onNext: [0, 1, 2, 3] onNext: [3, 4, 5, 6] onNext: [6, 7, 8, 9] onNext: [9] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .buffer(350, 200, TimeUnit.MILLISECONDS) .dump() /* onNext: [0, 1, 2] onNext: [2, 3, 4] onNext: [3, 4, 5, 6] onNext: [6, 7, 8] onNext: [7, 8, 9] onNext: [] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .buffer<Long, Long>( Observable.interval(250, TimeUnit.MILLISECONDS), Function { i -> Observable.timer(200, TimeUnit.MILLISECONDS) } ) .dump() /* onNext: [2, 3] onNext: [5, 6] onNext: [7, 8] onNext: [] onComplete */
ReactiveX - FlatMap operator Reactive Extensions再入門 その40「IObservableの合成はじめました」數組
FlatMap / SelectMany 轉換數據流:將源數據流的每一項都轉換成數據流,從而造成數據流的數據流,最後再平坦化將兩維數據流合併成一個數據流。函數
Observable.Range(1, 3) .SelectMany(i => Observable.Range(1, i)) .Dump("SelectMany"); /* SelectMany-->1 SelectMany-->1 SelectMany-->2 SelectMany-->1 SelectMany-->2 SelectMany-->3 SelectMany completed */
val values = Observable.just(2) values .flatMap { i -> Observable.range(0, i) } .dump() /* onNext: 0 onNext: 1 onComplete */
val values = Observable.range(1, 3) values .flatMap { i -> Observable.range(0, i) } .dump() /* onNext: 0 onNext: 0 onNext: 1 onNext: 0 onNext: 1 onNext: 2 onComplete */
val values = Observable.just(1) values .flatMap { i -> Observable.just( Character.valueOf((i + 64).toChar()) ) } .dump() /* onNext: A onComplete */
val values = Observable.range(0, 30) values .flatMap<Char> { i -> if (i in 1..26) Observable.just(Character.valueOf((i + 64).toChar())) else Observable.empty() } .dump() /* onNext: A onNext: B ... onNext: Y onNext: Z onComplete */
Observable.just(100, 150) .flatMap { i -> Observable.interval(i.toLong(), TimeUnit.MILLISECONDS) .map { v -> i } } .take(10) .dump() /* onNext: 100 onNext: 150 onNext: 100 onNext: 150 onNext: 100 onNext: 100 onNext: 150 onNext: 100 onNext: 100 onNext: 150 onComplete */
Observable.just(100, 150) .concatMap { i -> Observable.interval(i.toLong(), TimeUnit.MILLISECONDS) .map { v -> i } .take(3) } .dump() /* onNext: 100 onNext: 100 onNext: 100 onNext: 150 onNext: 150 onNext: 150 onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS) .switchMap { i -> Observable.interval(30, TimeUnit.MILLISECONDS) .map { l -> i } } .take(9) .dump() /* onNext: 0 onNext: 0 onNext: 0 onNext: 1 onNext: 1 onNext: 1 onNext: 2 onNext: 2 onNext: 2 onComplete */
concatMap / flatMap / switchMapui
flatMapIterablethis
Observable.range(1, 3) .flatMapIterable { i -> 1..i } .dump() /* onNext: 1 onNext: 1 onNext: 2 onNext: 1 onNext: 2 onNext: 3 onComplete */
Observable.range(1, 3) .flatMapIterable<Int, Int>( { i -> 1..i }, { ori, rv -> ori * rv }) .dump() /* onNext: 1 onNext: 2 onNext: 4 onNext: 3 onNext: 6 onNext: 9 onComplete */
Observable.range(1, 3) .flatMapIterable<Int, Int>( { i -> generateSequence(1) { (it + 1).takeIf { it <= i } }.asIterable() }, { ori, rv -> ori * rv }) .dump() /* onNext: 1 onNext: 2 onNext: 4 onNext: 3 onNext: 6 onNext: 9 onComplete */
let disposeBag = DisposeBag() struct Player { var score: Variable<Int> } let 👦🏻 = Player(score: Variable(80)) let 👧🏼 = Player(score: Variable(90)) let player = Variable(👦🏻) player.asObservable() .flatMap { $0.score.asObservable() } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) 👦🏻.score.value = 85 player.value = 👧🏼 👦🏻.score.value = 95 👧🏼.score.value = 100 /* 80 85 90 95 100 */
let disposeBag = DisposeBag() struct Player { var score: Variable<Int> } let 👦🏻 = Player(score: Variable(80)) let 👧🏼 = Player(score: Variable(90)) let player = Variable(👦🏻) player.asObservable() .flatMapLatest { $0.score.asObservable() } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) 👦🏻.score.value = 85 player.value = 👧🏼 👦🏻.score.value = 95 👧🏼.score.value = 100 /* 80 85 90 100 */
ReactiveX - GroupBy operator Reactive Extensions再入門 その20「GroupByメソッドでグルーピングしてみよう」 Reactive Extensions再入門 その21「GroupByUntilメソッド」spa
GroupBy 轉換數據流:將數據流分組,結果是被分組的數據流的數據流。 所謂被分組的數據流是指帶分組鍵(Key)的數據流。3d
var source = Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(10); var group = source.GroupBy(i => i % 3); group.Subscribe( grp => grp.Min().Subscribe( minValue => Console.WriteLine("{0} min value = {1}", grp.Key, minValue)), () => Console.WriteLine("Completed")); Console.ReadLine(); /* 0 min value = 0 1 min value = 1 2 min value = 2 Completed */
var source = Observable.Interval(TimeSpan.FromSeconds(0.1)).Take(10); var group = source.GroupBy(i => i % 3); group.SelectMany( grp => grp.Max() .Select(value => new { grp.Key, value })) .Dump("group"); Console.ReadLine(); /* group-->{ Key = 0, value = 9 } group-->{ Key = 1, value = 7 } group-->{ Key = 2, value = 8 } group completed */
val values = Observable.just( "first", "second", "third", "forth", "fifth", "sixth" ) values.groupBy { word -> word[0] } .flatMap<Any> { group -> group.lastOrError().toObservable().map { v -> "${group.key}: $v" } } .dump() /* onNext: s: sixth onNext: t: third onNext: f: fifth onComplete */
ReactiveX - Map operator Reactive Extensions再入門 その7「LINQスタイルの拡張メソッド」
Map / Select 轉換數據流:針對源數據流的每一項都調用指定函數後將源數據流映射爲目標數據流。 Cast 轉換數據流:將源數據流的每一項都轉變爲指定類型,不能轉型時出錯。 OfType 過濾數據流:只發送源數據流中指定類型的數據。
var source = Observable.Range(0, 3); source.Select(i => i + 3).Dump("+3"); source.Select(i => (char)(i + 64)).Dump("char"); source.Select(i => new { Number = i, Character = (char)(i + 64) }).Dump("anon"); var query = from i in source select new { Number = i, Character = (char)(i + 64) }; query.Dump("anon"); /* +3-->3 +3-->4 +3-->5 +3 completed char-->@ char-->A char-->B char completed anon-->{ Number = 0, Character = @ } anon-->{ Number = 1, Character = A } anon-->{ Number = 2, Character = B } anon completed anon-->{ Number = 0, Character = @ } anon-->{ Number = 1, Character = A } anon-->{ Number = 2, Character = B } anon completed */
var objects = new Subject<object>(); objects.Cast<int>().Dump("cast"); objects.OnNext(1); objects.OnNext(2); objects.OnNext(3); objects.OnCompleted(); /* cast-->1 cast-->2 cast-->3 cast completed */ var objects = new Subject<object>(); objects.Cast<int>().Dump("cast"); objects.OnNext(1); objects.OnNext(2); objects.OnNext("3");//Fail /* cast-->1 cast-->2 cast failed-->Specified cast is not valid. */
var objects = new Subject<object>(); objects.OfType<int>().Dump("OfType"); objects.OnNext(1); objects.OnNext(2); objects.OnNext("3");//Ignored objects.OnNext(4); objects.OnCompleted(); /* OfType-->1 OfType-->2 OfType-->4 OfType completed */
val values = Observable.range(0, 4) values .map { i -> i + 3 } .dump() /* onNext: 3 onNext: 4 onNext: 5 onNext: 6 onComplete */
val values = Observable.just("0", "1", "2", "3") .map { Integer.parseInt(it) } values.dump() /* onNext: 0 onNext: 1 onNext: 2 onNext: 3 onComplete */
val values = Observable.just<Int>(0, 1, 2, 3) values .cast(java.lang.Integer::class.java) .dump() /* onNext: 0 onNext: 1 onNext: 2 onNext: 3 onComplete */
val values = Observable.just(0, 1, 2, "3") values .cast(java.lang.Integer::class.java) .dump() /* onNext: 0 onNext: 1 onNext: 2 onError: java.lang.ClassCastException: Cannot cast java.lang.String to java.lang.Integer: Cannot cast java.lang.String to java.lang.Integer */
val values = Observable.just(0, 1, "2", 3) values .ofType(java.lang.Integer::class.java) .dump() /* onNext: 0 onNext: 1 onNext: 3 onComplete */
let disposeBag = DisposeBag() Observable.of(1, 2, 3) .map { $0 * $0 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) /* 1 4 9 */
ReactiveX - Scan operator Reactive Extensions再入門 その17「集計するメソッド」
Scan 轉換數據流:
Console.WriteLine(MethodBase.GetCurrentMethod().Name); var numbers = new Subject<int>(); var scan = numbers.Scan(0, (acc, current) => acc + current); numbers.Dump("numbers"); scan.Dump("scan"); numbers.OnNext(1); numbers.OnNext(2); numbers.OnNext(3); numbers.OnCompleted(); /* Scan numbers-->1 scan-->1 numbers-->2 scan-->3 numbers-->3 scan-->6 numbers completed scan completed */
val values = Observable.range(0, 5) values .scan { i1, i2 -> i1 + i2 } .dump() /* onNext: 0 onNext: 1 onNext: 3 onNext: 6 onNext: 10 onComplete */
val values = ReplaySubject.create<Int>() values .dump("Values") values .scan { i1, i2 -> if (i1 < i2) i1 else i2 } .distinctUntilChanged() .dump("Min") values.onNext(2) values.onNext(3) values.onNext(1) values.onNext(4) values.onComplete() /* Values: onNext: 2 Min: onNext: 2 Values: onNext: 3 Values: onNext: 1 Min: onNext: 1 Values: onNext: 4 Values: onComplete Min: onComplete */
let disposeBag = DisposeBag() Observable.of(10, 100, 1000) .scan(1) { aggregateValue, newValue in aggregateValue + newValue } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) /* 11 111 1111 */
ReactiveX - Window operator Reactive Extensions再入門 その26「値をまとめるWindowメソッド」
Window 轉換數據流:間隔性地將數據流分段,造成數據流的數據流,即兩維數據流。
var windowIdx = 0; var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10); source.Window(3) .Subscribe(window => { var id = windowIdx++; Console.WriteLine("--Starting new window"); var windowName = "Window" + id; window.Subscribe( value => Console.WriteLine("{0} : {1}", windowName, value), ex => Console.WriteLine("{0} : {1}", windowName, ex), () => Console.WriteLine("{0} Completed", windowName)); }, () => Console.WriteLine("Completed")); /* --Starting new window Window0 : 0 Window0 : 1 Window0 : 2 Window0 Completed --Starting new window Window1 : 3 Window1 : 4 Window1 : 5 Window1 Completed --Starting new window Window2 : 6 Window2 : 7 Window2 : 8 Window2 Completed --Starting new window Window3 : 9 Window3 Completed Completed */
Observable .merge( Observable.range(0, 5) .window(3, 1)) .dump() /* onNext: 0 onNext: 1 onNext: 1 onNext: 2 onNext: 2 onNext: 2 onNext: 3 onNext: 3 onNext: 3 onNext: 4 onNext: 4 onNext: 4 onComplete */
Observable.range(0, 5) .window(3, 1) .flatMap { o -> o.toList().toObservable() } .dump() /* onNext: [0, 1, 2] onNext: [1, 2, 3] onNext: [2, 3, 4] onNext: [3, 4] onNext: [4] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS) .take(5) .window(250, 100, TimeUnit.MILLISECONDS) .flatMap { o -> o.toList().toObservable() } .dump() /* onNext: [0, 1] onNext: [1, 2] onNext: [2, 3] onNext: [3, 4] onNext: [4] onComplete */
Observable.interval(100, TimeUnit.MILLISECONDS) .take(5) .window<Long, Long>( Observable.interval(100, TimeUnit.MILLISECONDS), Function { _ -> Observable.timer(250, TimeUnit.MILLISECONDS) }) .flatMap { o -> o.toList().toObservable() } .dump() /* onNext: [1, 2] onNext: [2, 3] onNext: [3, 4] onNext: [4] onNext: [] onComplete */