本文的主題爲對 Observable 進行錯誤處理的操做符以及轉換 Observable 類型的操做符。 這裏的 Observable 實質上是可觀察的數據流。html
RxJava操做符(五)Error Handlingjava
public static void Dump<T>(this IObservable<T> source, string name) { source.Subscribe( i => Console.WriteLine("{0}-->{1}", name, i), ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message), () => Console.WriteLine("{0} completed", name)); }
fun <T> Observable<T>.dump() = this.subscribe({ println("onNext: $it") }, { e -> println("onError: ${e.javaClass.name}: ${e.message}") }, { println("onComplete") })
ReactiveX - Catch operator Reactive Extensions再入門 その11「Catchメソッド」 Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」react
當源數據流出錯拋出異常時,Catch 能捕捉該異常並從中恢復。至關於 catch 語句。 不指定異常類型時,Catch 能捕捉全部類型的異常。 指定異常類型時,Catch 只能捕捉指定類型的異常。 OnErrorResumeNext 與 VB 的同名語句的做用相同。使用該操做符的話,一個數據流不管出錯仍是正常結束都會轉到後一個指定數據流。swift
var source = new Subject<int>(); var result = source.Catch(Observable.Empty<int>()); result.Dump("Catch"); source.OnNext(1); source.OnNext(2); source.OnError(new Exception("Fail!")); /* Catch-->1 Catch-->2 Catch completed */
var source = new Subject<int>(); var result = source.Catch<int, TimeoutException>(tx => Observable.Return(-1)); result.Dump("Catch"); source.OnNext(1); source.OnNext(2); source.OnError(new TimeoutException()); /* Catch-->1 Catch-->2 Catch-->-1 Catch completed */
var source = new Subject<int>(); var result = source.Catch<int, TimeoutException>(tx => Observable.Return(-1)); result.Dump("Catch"); source.OnNext(1); source.OnNext(2); source.OnError(new ArgumentException("Fail!")); /* Catch-->1 Catch-->2 Catch failed-->Fail! */
Observable .Throw<string>(new Exception()) .OnErrorResumeNext(Observable.Return("OK")) .Subscribe( s => Console.WriteLine("OnNext: {0}", s), ex => Console.WriteLine("OnError: {0}", ex), () => Console.WriteLine("OnCompleted")); new[] { "NG", "Error", "Abort", "OK" } .Select((s, i) => new { index = i, value = s }) .Select(s => s.value != "OK" ? Observable.Throw<string>(new Exception(s.ToString())) : Observable.Return(s.ToString())) .OnErrorResumeNext() .Subscribe( s => Console.WriteLine("OnNext: {0}", s), ex => Console.WriteLine("OnError: {0}", ex), () => Console.WriteLine("OnCompleted")); new[] { "NG", "OK", "Abort", "Error" } .Select((s, i) => new { index = i, value = s }) .Select(s => s.value != "OK" ? Observable.Throw<string>(new Exception(s.ToString())) : Observable.Return(s.ToString())) .OnErrorResumeNext() .Subscribe( s => Console.WriteLine("OnNext: {0}", s), ex => Console.WriteLine("OnError: {0}", ex), () => Console.WriteLine("OnCompleted")); new[] { "NG", "Exception", "Abort", "Error" } .Select((s, i) => new { index = i, value = s }) .Select(s => s.value != "OK" ? Observable.Throw<string>(new Exception(s.ToString())) : Observable.Return(s.ToString())) .OnErrorResumeNext() .Subscribe( s => Console.WriteLine("OnNext: {0}", s), ex => Console.WriteLine("OnError: {0}", ex), () => Console.WriteLine("OnCompleted")); /* OnNext: OK OnCompleted OnNext: { index = 3, value = OK } OnCompleted OnNext: { index = 1, value = OK } OnCompleted OnCompleted */
val values = Observable.create<String> { o -> o.onNext("Rx") o.onNext("is") o.onError(Exception("adjective unknown")) } values .onErrorReturn { e -> "Error: " + e.message } .dump() /* onNext: Rx onNext: is onNext: Error: adjective unknown onComplete */
val values = Observable.create<Int> { o -> o.onNext(1) o.onNext(2) o.onError(Exception("Oops")) } values .onErrorResumeNext(Observable.just(Integer.MAX_VALUE)) .dump() /* onNext: 1 onNext: 2 onNext: 2147483647 onComplete */
val values = Observable.create<Int> { o -> o.onNext(1) o.onNext(2) o.onError(Exception("Oops")) } values .onErrorResumeNext( Function { e -> Observable.error(UnsupportedOperationException(e)) } ) .dump() /* onNext: 1 onNext: 2 onError: java.lang.UnsupportedOperationException: java.lang.Exception: Oops: java.lang.Exception: Oops */
val values = Observable.create<String> { o -> o.onNext("Rx") o.onNext("is") // o.onError(Throwable()) // this won't be caught o.onError(Exception()) // this will be caught } values .onExceptionResumeNext(Observable.just("hard")) .dump() /* onNext: Rx onNext: is onNext: hard onComplete */
val values = Observable.create<String> { o -> o.onNext("Rx") o.onNext("is") o.onError(object : Throwable() { }) // this won't be caught } values .onExceptionResumeNext(Observable.just("hard")) .dump() /* onNext: Rx onNext: is onNext: hard onComplete */
let disposeBag = DisposeBag() let sequenceThatFails = PublishSubject<String>() sequenceThatFails .catchErrorJustReturn("😊") .subscribe { print($0) } .disposed(by: disposeBag) sequenceThatFails.onNext("😬") sequenceThatFails.onNext("😨") sequenceThatFails.onNext("😡") sequenceThatFails.onNext("🔴") sequenceThatFails.onError(TestError.test) /* next(😬) next(😨) next(😡) next(🔴) next(😊) completed */
let disposeBag = DisposeBag() let sequenceThatFails = PublishSubject<String>() let recoverySequence = PublishSubject<String>() sequenceThatFails .catchError { print("Error:", $0) return recoverySequence } .subscribe { print($0) } .disposed(by: disposeBag) sequenceThatFails.onNext("😬") sequenceThatFails.onNext("😨") sequenceThatFails.onNext("😡") sequenceThatFails.onNext("🔴") sequenceThatFails.onError(TestError.test) recoverySequence.onNext("😊") /* next(😬) next(😨) next(😡) next(🔴) Error: test next(😊) */
ReactiveX - Retry operator Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」app
當源數據流出錯拋出異常時,Retry 會從新嘗試。 能夠指定從新嘗試的次數。不指定次數時,Retry 會進行無限次從新嘗試。dom
var retryCount = 0; Observable .Create<string>(o => { Console.WriteLine("Create method called: {0}", retryCount); if (retryCount == 3) { o.OnNext(retryCount.ToString()); o.OnCompleted(); return Disposable.Empty; } retryCount++; o.OnError(new InvalidOperationException(retryCount.ToString())); return Disposable.Empty; }) .Retry() .Subscribe( s => Console.WriteLine("OnNext: {0}", s), ex => Console.WriteLine("OnError: {0}", ex), () => Console.WriteLine("OnCompleted")); /* Create method called: 0 Create method called: 1 Create method called: 2 Create method called: 3 OnNext: 3 OnCompleted */
var retryCount = 0; Observable .Create<string>(o => { Console.WriteLine("Create method called: {0}", retryCount); if (retryCount == 3) { o.OnNext(retryCount.ToString()); o.OnCompleted(); return Disposable.Empty; } retryCount++; o.OnError(new InvalidOperationException(retryCount.ToString())); return Disposable.Empty; }) .Retry(2) .Subscribe( s => Console.WriteLine("OnNext: {0}", s), ex => Console.WriteLine("OnError: {0}", ex), () => Console.WriteLine("OnCompleted")); /* Create method called: 0 Create method called: 1 OnError: System.InvalidOperationException: 2 */
val random = Random() val values = Observable.create<Int> { o -> o.onNext(random.nextInt() % 20) o.onNext(random.nextInt() % 20) o.onError(Exception()) } values .retry(1) .dump() /* onNext: 8 onNext: -9 onNext: -6 onNext: -10 onError: java.lang.Exception: null */
val source = Observable.create<Int> { o -> o.onNext(1) o.onNext(2) o.onError(Exception("Failed")) } source.retryWhen { o -> o.take(2).delay(100, TimeUnit.MILLISECONDS) } .timeInterval() .dump() /* onNext: Timed[time=2, unit=MILLISECONDS, value=1] onNext: Timed[time=0, unit=MILLISECONDS, value=2] onNext: Timed[time=105, unit=MILLISECONDS, value=1] onNext: Timed[time=0, unit=MILLISECONDS, value=2] onNext: Timed[time=102, unit=MILLISECONDS, value=1] onNext: Timed[time=1, unit=MILLISECONDS, value=2] */
let disposeBag = DisposeBag() var count = 1 let sequenceThatErrors = Observable<String>.create { observer in observer.onNext("🍎") observer.onNext("🍐") observer.onNext("🍊") if count == 1 { observer.onError(TestError.test) print("Error encountered") count += 1 } observer.onNext("🐶") observer.onNext("🐱") observer.onNext("🐭") observer.onCompleted() return Disposables.create() } sequenceThatErrors .retry() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) /* 🍎 🍐 🍊 Error encountered 🍎 🍐 🍊 🐶 🐱 🐭 */
let disposeBag = DisposeBag() var count = 1 let sequenceThatErrors = Observable<String>.create { observer in observer.onNext("🍎") observer.onNext("🍐") observer.onNext("🍊") if count < 5 { observer.onError(TestError.test) print("Error encountered") count += 1 } observer.onNext("🐶") observer.onNext("🐱") observer.onNext("🐭") observer.onCompleted() return Disposables.create() } sequenceThatErrors .retry(3) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) /* 🍎 🍐 🍊 Error encountered 🍎 🍐 🍊 Error encountered 🍎 🍐 🍊 Error encountered Unhandled error happened: test subscription called from: */
ReactiveX - To operator Reactive Extensions再入門 その15「To*****系メソッド」ui
ToArray / ToList / ToDictionary(ToMap) / ToLookup(ToMultiMap) 能將源數據流中的全部數據打包裝進一個集合,發送這個集合並結束。 Wait 會等待源數據流發送完畢,源數據流正常結束時返回最後一個數據,源數據流出錯時報錯拋出異常。this
var s = new Subject<int>(); s.ToArray().Subscribe(array => { Console.WriteLine("start array dump"); foreach (var i in array) { Console.WriteLine(" array value : {0}", i); } }); Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(2)"); s.OnNext(2); Console.WriteLine("OnNext(3)"); s.OnNext(3); Console.WriteLine("OnCompleted()"); s.OnCompleted(); /* OnNext(1) OnNext(2) OnNext(3) OnCompleted() start array dump array value : 1 array value : 2 array value : 3 */
var s = new Subject<Tuple<string, int>>(); s.ToDictionary(t => t.Item1).Subscribe(dict => { Console.WriteLine("one : {0}", dict["one"]); Console.WriteLine("two : {0}", dict["two"]); Console.WriteLine("three : {0}", dict["three"]); }); Console.WriteLine("OnNext(one)"); s.OnNext(Tuple.Create("one", 1)); Console.WriteLine("OnNext(two)"); s.OnNext(Tuple.Create("two", 2)); Console.WriteLine("OnNext(three)"); s.OnNext(Tuple.Create("three", 3)); Console.WriteLine("OnCompleted()"); s.OnCompleted(); /* OnNext(one) OnNext(two) OnNext(three) OnCompleted() one : (one, 1) two : (two, 2) three : (three, 3) */
var s = new Subject<int>(); s.ToList().Subscribe(list => { foreach (var i in list) { Console.WriteLine("value : {0}", i); } }); Console.WriteLine("OnNext(1)"); s.OnNext(1); Console.WriteLine("OnNext(2)"); s.OnNext(2); Console.WriteLine("OnNext(3)"); s.OnNext(3); Console.WriteLine("OnCompleted()"); s.OnCompleted(); /* OnNext(1) OnNext(2) OnNext(3) OnCompleted() value : 1 value : 2 value : 3 */
var s = new Subject<Tuple<string, string>>(); s.ToLookup(t => t.Item1).Subscribe(l => { foreach (var g in l) { Console.WriteLine("Key : {0}", g.Key); foreach (var i in g) { Console.WriteLine(" item : {0}", i); } } }); Console.WriteLine("OnNext(group A)"); s.OnNext(Tuple.Create("group A", "taro")); s.OnNext(Tuple.Create("group A", "jiro")); Console.WriteLine("OnNext(group B)"); s.OnNext(Tuple.Create("group B", "foo")); s.OnNext(Tuple.Create("group B", "hoge")); s.OnNext(Tuple.Create("group B", "bar")); Console.WriteLine("OnCompleted()"); s.OnCompleted(); /* OnNext(group A) OnNext(group B) OnCompleted() Key : group A item : (group A, taro) item : (group A, jiro) Key : group B item : (group B, foo) item : (group B, hoge) item : (group B, bar) */
var period = TimeSpan.FromMilliseconds(200); var source = Observable.Timer(TimeSpan.Zero, period) .Take(5); var result = source.ToEnumerable(); foreach (var value in result) { Console.WriteLine(value); } Console.WriteLine("done"); /* 0 1 2 3 4 done */
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5); var result = source.Wait(); //Will arrive in 5 seconds. Console.WriteLine(result); /* 4 */
var source = Observable.Throw<long>(new Exception("Fail!")); try { source.Wait(); } catch (Exception e) { Console.WriteLine(e.Message); } /* Fail! */
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5); var result = source.ToEvent(); result.OnNext += val => Console.WriteLine(val); /* 0 1 2 3 ... */
public class MyEventArgs : EventArgs { private readonly long _value; public MyEventArgs(long value) { _value = value; } public long Value { get { return _value; } } } var source = Observable.Interval(TimeSpan.FromSeconds(1)) .Select(i => new EventPattern<MyEventArgs>(null, new MyEventArgs(i))); var result = source.ToEventPattern(); result.OnNext += (e, val) => Console.WriteLine(val.Value); /* 0 1 2 3 ... */
val values = Observable.range(10, 5) values .reduce( ArrayList<Int>() ) { acc, value -> acc.add(value) acc } .dump() /* onSuccess: [10, 11, 12, 13, 14] */
val values = Observable.range(10, 5) values .toList() .dump() /* onSuccess: [10, 11, 12, 13, 14] */
val values = Observable.range(10, 5) values .toSortedList { i1, i2 -> i2 - i1 } .dump() /* onSuccess: [14, 13, 12, 11, 10] */
private data class Person(val name: String, val age: Int) val values = Observable.just( Person("Will", 25), Person("Nick", 40), Person("Saul", 35) ) values .toMap { person -> person.name } .dump() /* onSuccess: {Saul=Person(name=Saul, age=35), Nick=Person(name=Nick, age=40), Will=Person(name=Will, age=25)} */
val values = Observable.just( Person("Will", 25), Person("Nick", 40), Person("Saul", 35) ) values .toMap<String, Int>( { person -> person.name }, { person -> person.age }) .dump() /* onSuccess: {Saul=35, Nick=40, Will=25} */
val values = Observable.just( Person("Will", 25), Person("Nick", 40), Person("Saul", 35) ) values .toMap<String, Int>( { person -> person.name }, { person -> person.age }, { HashMap() }) .dump() /* onSuccess: {Saul=35, Nick=40, Will=25} */
val values = Observable.just( Person("Will", 35), Person("Nick", 40), Person("Saul", 35) ) values .toMultimap<Int, String>( { person -> person.age }, { person -> person.name }) .dump() /* onSuccess: {35=[Will, Saul], 40=[Nick]} */
val values = Observable.just( Person("Will", 35), Person("Nick", 40), Person("Saul", 35) ) values .toMultimap( { person -> person.age }, { person -> person.name }, { HashMap<Int, Collection<String>>() }, { key -> ArrayList() }) .dump() /* onSuccess: {35=[Will, Saul], 40=[Nick]} */
let disposeBag = DisposeBag() Observable.range(start: 1, count: 10) .toArray() .subscribe { print($0) } .disposed(by: disposeBag) /* next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]) completed */