ReactiveX 學習筆記(8)錯誤處理和 To 操做符

Error Handling Operators

Operators to Convert Observables

本文的主題爲對 Observable 進行錯誤處理的操做符以及轉換 Observable 類型的操做符。 這裏的 Observable 實質上是可觀察的數據流。html

RxJava操做符(五)Error Handlingjava

公共代碼

  • RxNET
public static void Dump<T>(this IObservable<T> source, string name)
{
    source.Subscribe(
    i => Console.WriteLine("{0}-->{1}", name, i),
    ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
    () => Console.WriteLine("{0} completed", name));
}
  • RxJava
fun <T> Observable<T>.dump() =
    this.subscribe({ println("onNext: $it") },
        { e -> println("onError: ${e.javaClass.name}: ${e.message}") },
        { println("onComplete") })

Catch / OnErrorResumeNext

ReactiveX - Catch operator Reactive Extensions再入門 その11「Catchメソッド」 Reactive Extensions再入門 その34「ダメなら次の人!を実現するOnErrorResumeNextメソッド」react

當源數據流出錯拋出異常時,Catch 能捕捉該異常並從中恢復。至關於 catch 語句。 不指定異常類型時,Catch 能捕捉全部類型的異常。 指定異常類型時,Catch 只能捕捉指定類型的異常。 OnErrorResumeNext 與 VB 的同名語句的做用相同。使用該操做符的話,一個數據流不管出錯仍是正常結束都會轉到後一個指定數據流。swift

Catch OnErrorResumeNext

  • RxNET
var source = new Subject<int>();
var result = source.Catch(Observable.Empty<int>());
result.Dump("Catch");
source.OnNext(1);
source.OnNext(2);
source.OnError(new Exception("Fail!"));
/*
Catch-->1
Catch-->2
Catch completed
*/
var source = new Subject<int>();
var result = source.Catch<int, TimeoutException>(tx => Observable.Return(-1));
result.Dump("Catch");
source.OnNext(1);
source.OnNext(2);
source.OnError(new TimeoutException());
/*
Catch-->1
Catch-->2
Catch-->-1
Catch completed
*/
var source = new Subject<int>();
var result = source.Catch<int, TimeoutException>(tx => Observable.Return(-1));
result.Dump("Catch");
source.OnNext(1);
source.OnNext(2);
source.OnError(new ArgumentException("Fail!"));
/*
Catch-->1
Catch-->2
Catch failed-->Fail!
*/
Observable
	.Throw<string>(new Exception())
	.OnErrorResumeNext(Observable.Return("OK"))
	.Subscribe(
		s => Console.WriteLine("OnNext: {0}", s),
		ex => Console.WriteLine("OnError: {0}", ex),
		() => Console.WriteLine("OnCompleted"));
new[] { "NG", "Error", "Abort", "OK" }
	.Select((s, i) => new { index = i, value = s })
	.Select(s => s.value != "OK" ?
		Observable.Throw<string>(new Exception(s.ToString())) :
		Observable.Return(s.ToString()))
	.OnErrorResumeNext()
	.Subscribe(
		s => Console.WriteLine("OnNext: {0}", s),
		ex => Console.WriteLine("OnError: {0}", ex),
		() => Console.WriteLine("OnCompleted"));
new[] { "NG", "OK", "Abort", "Error" }
	.Select((s, i) => new { index = i, value = s })
	.Select(s => s.value != "OK" ?
		Observable.Throw<string>(new Exception(s.ToString())) :
		Observable.Return(s.ToString()))
	.OnErrorResumeNext()
	.Subscribe(
		s => Console.WriteLine("OnNext: {0}", s),
		ex => Console.WriteLine("OnError: {0}", ex),
		() => Console.WriteLine("OnCompleted"));
new[] { "NG", "Exception", "Abort", "Error" }
	.Select((s, i) => new { index = i, value = s })
	.Select(s => s.value != "OK" ?
		Observable.Throw<string>(new Exception(s.ToString())) :
		Observable.Return(s.ToString()))
	.OnErrorResumeNext()
	.Subscribe(
		s => Console.WriteLine("OnNext: {0}", s),
		ex => Console.WriteLine("OnError: {0}", ex),
		() => Console.WriteLine("OnCompleted"));
/*
OnNext: OK
OnCompleted
OnNext: { index = 3, value = OK }
OnCompleted
OnNext: { index = 1, value = OK }
OnCompleted
OnCompleted
*/
  • RxJava

onErrorReturn OnErrorResumeNext onExceptionResumeNext

val values = Observable.create<String> { o ->
    o.onNext("Rx")
    o.onNext("is")
    o.onError(Exception("adjective unknown"))
}
values
    .onErrorReturn { e -> "Error: " + e.message }
    .dump()
/*
onNext: Rx
onNext: is
onNext: Error: adjective unknown
onComplete
*/
val values = Observable.create<Int> { o ->
    o.onNext(1)
    o.onNext(2)
    o.onError(Exception("Oops"))
}
values
    .onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
    .dump()
/*
onNext: 1
onNext: 2
onNext: 2147483647
onComplete
*/
val values = Observable.create<Int> { o ->
    o.onNext(1)
    o.onNext(2)
    o.onError(Exception("Oops"))
}
values
    .onErrorResumeNext( Function { e -> Observable.error(UnsupportedOperationException(e)) } )
    .dump()
/*
onNext: 1
onNext: 2
onError: java.lang.UnsupportedOperationException: java.lang.Exception: Oops: java.lang.Exception: Oops
*/
val values = Observable.create<String> { o ->
    o.onNext("Rx")
    o.onNext("is")
// o.onError(Throwable()) // this won't be caught
    o.onError(Exception()) // this will be caught
}
values
    .onExceptionResumeNext(Observable.just("hard"))
    .dump()
/*
onNext: Rx
onNext: is
onNext: hard
onComplete
*/
val values = Observable.create<String> { o ->
    o.onNext("Rx")
    o.onNext("is")
    o.onError(object : Throwable() {
    }) // this won't be caught
}
values
    .onExceptionResumeNext(Observable.just("hard"))
    .dump()
/*
onNext: Rx
onNext: is
onNext: hard
onComplete
*/
  • RxSwift
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
    .catchErrorJustReturn("😊")
    .subscribe { print($0) }
    .disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
/*
next(😬)
next(😨)
next(😡)
next(🔴)
next(😊)
completed
*/
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
    .catchError {
        print("Error:", $0)
        return recoverySequence
    }
    .subscribe { print($0) }
    .disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(TestError.test)
recoverySequence.onNext("😊")
/*
next(😬)
next(😨)
next(😡)
next(🔴)
Error: test
next(😊)
*/

Retry

ReactiveX - Retry operator Reactive Extensions再入門 その35「駄目ならやり直す!を実現するRetryメソッド」app

當源數據流出錯拋出異常時,Retry 會從新嘗試。 能夠指定從新嘗試的次數。不指定次數時,Retry 會進行無限次從新嘗試。dom

Retry

  • RxNET
var retryCount = 0;
Observable
    .Create<string>(o =>
    {
        Console.WriteLine("Create method called: {0}", retryCount);
        if (retryCount == 3)
        {
            o.OnNext(retryCount.ToString());
            o.OnCompleted();
            return Disposable.Empty;
        }
        retryCount++;
        o.OnError(new InvalidOperationException(retryCount.ToString()));
        return Disposable.Empty;
    })
    .Retry()
    .Subscribe(
        s => Console.WriteLine("OnNext: {0}", s),
        ex => Console.WriteLine("OnError: {0}", ex),
        () => Console.WriteLine("OnCompleted"));
/*
Create method called: 0
Create method called: 1
Create method called: 2
Create method called: 3
OnNext: 3
OnCompleted
*/
var retryCount = 0;
Observable
.Create<string>(o =>
{
    Console.WriteLine("Create method called: {0}", retryCount);
    if (retryCount == 3)
    {
        o.OnNext(retryCount.ToString());
        o.OnCompleted();
        return Disposable.Empty;
    }
    retryCount++;
    o.OnError(new InvalidOperationException(retryCount.ToString()));
    return Disposable.Empty;
})
.Retry(2)
.Subscribe(
    s => Console.WriteLine("OnNext: {0}", s),
    ex => Console.WriteLine("OnError: {0}", ex),
    () => Console.WriteLine("OnCompleted"));
/*
Create method called: 0
Create method called: 1
OnError: System.InvalidOperationException: 2
*/
  • RxJava

Retry retryWhen

val random = Random()
val values = Observable.create<Int> { o ->
    o.onNext(random.nextInt() % 20)
    o.onNext(random.nextInt() % 20)
    o.onError(Exception())
}
values
    .retry(1)
    .dump()
/*
onNext: 8
onNext: -9
onNext: -6
onNext: -10
onError: java.lang.Exception: null
*/
val source = Observable.create<Int> { o ->
    o.onNext(1)
    o.onNext(2)
    o.onError(Exception("Failed"))
}
source.retryWhen { o ->
    o.take(2).delay(100, TimeUnit.MILLISECONDS)
}
    .timeInterval()
    .dump()
/*
onNext: Timed[time=2, unit=MILLISECONDS, value=1]
onNext: Timed[time=0, unit=MILLISECONDS, value=2]
onNext: Timed[time=105, unit=MILLISECONDS, value=1]
onNext: Timed[time=0, unit=MILLISECONDS, value=2]
onNext: Timed[time=102, unit=MILLISECONDS, value=1]
onNext: Timed[time=1, unit=MILLISECONDS, value=2]
*/
  • RxSwift
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")
    if count == 1 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }
    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()
    return Disposables.create()
}
sequenceThatErrors
    .retry()
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
/*
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
🐶
🐱
🐭
*/
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
    observer.onNext("🍎")
    observer.onNext("🍐")
    observer.onNext("🍊")
    if count < 5 {
        observer.onError(TestError.test)
        print("Error encountered")
        count += 1
    }
    observer.onNext("🐶")
    observer.onNext("🐱")
    observer.onNext("🐭")
    observer.onCompleted()
    return Disposables.create()
}
sequenceThatErrors
    .retry(3)
    .subscribe(onNext: { print($0) })
    .disposed(by: disposeBag)
/*
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
🍎
🍐
🍊
Error encountered
Unhandled error happened: test
 subscription called from:
*/

To / Wait

ReactiveX - To operator Reactive Extensions再入門 その15「To*****系メソッド」ui

ToArray / ToList / ToDictionary(ToMap) / ToLookup(ToMultiMap) 能將源數據流中的全部數據打包裝進一個集合,發送這個集合並結束。 Wait 會等待源數據流發送完畢,源數據流正常結束時返回最後一個數據,源數據流出錯時報錯拋出異常。this

To ToArray ToList ToMap ToMultiMap

  • RxNET
var s = new Subject<int>();
s.ToArray().Subscribe(array =>
{
    Console.WriteLine("start array dump");
    foreach (var i in array)
    {
        Console.WriteLine("  array value : {0}", i);
    }
});
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(2)");
s.OnNext(2);
Console.WriteLine("OnNext(3)");
s.OnNext(3);
Console.WriteLine("OnCompleted()");
s.OnCompleted();
/*
OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
start array dump
  array value : 1
  array value : 2
  array value : 3
*/
var s = new Subject<Tuple<string, int>>();
s.ToDictionary(t => t.Item1).Subscribe(dict =>
{
    Console.WriteLine("one : {0}", dict["one"]);
    Console.WriteLine("two : {0}", dict["two"]);
    Console.WriteLine("three : {0}", dict["three"]);
});
Console.WriteLine("OnNext(one)");
s.OnNext(Tuple.Create("one", 1));
Console.WriteLine("OnNext(two)");
s.OnNext(Tuple.Create("two", 2));
Console.WriteLine("OnNext(three)");
s.OnNext(Tuple.Create("three", 3));
Console.WriteLine("OnCompleted()");
s.OnCompleted();
/*
OnNext(one)
OnNext(two)
OnNext(three)
OnCompleted()
one : (one, 1)
two : (two, 2)
three : (three, 3)
*/
var s = new Subject<int>();
s.ToList().Subscribe(list =>
{
    foreach (var i in list)
    {
        Console.WriteLine("value : {0}", i);
    }
});
Console.WriteLine("OnNext(1)");
s.OnNext(1);
Console.WriteLine("OnNext(2)");
s.OnNext(2);
Console.WriteLine("OnNext(3)");
s.OnNext(3);
Console.WriteLine("OnCompleted()");
s.OnCompleted();
/*
OnNext(1)
OnNext(2)
OnNext(3)
OnCompleted()
value : 1
value : 2
value : 3
*/
var s = new Subject<Tuple<string, string>>();
s.ToLookup(t => t.Item1).Subscribe(l =>
{
    foreach (var g in l)
    {
        Console.WriteLine("Key : {0}", g.Key);
        foreach (var i in g)
        {
            Console.WriteLine("  item : {0}", i);
        }
    }
});
Console.WriteLine("OnNext(group A)");
s.OnNext(Tuple.Create("group A", "taro"));
s.OnNext(Tuple.Create("group A", "jiro"));
Console.WriteLine("OnNext(group B)");
s.OnNext(Tuple.Create("group B", "foo"));
s.OnNext(Tuple.Create("group B", "hoge"));
s.OnNext(Tuple.Create("group B", "bar"));
Console.WriteLine("OnCompleted()");
s.OnCompleted();
/*
OnNext(group A)
OnNext(group B)
OnCompleted()
Key : group A
  item : (group A, taro)
  item : (group A, jiro)
Key : group B
  item : (group B, foo)
  item : (group B, hoge)
  item : (group B, bar)
*/
  • ToEnumerable 將 IObservable 類型的數據流轉變成 IEnumerable 類型的序列。 轉變以後源數據流發送的數據將變成序列中的一個數據。 Intro to Rx - ToEnumerable
var period = TimeSpan.FromMilliseconds(200);
var source = Observable.Timer(TimeSpan.Zero, period)
.Take(5);
var result = source.ToEnumerable();
foreach (var value in result)
{
    Console.WriteLine(value);
}
Console.WriteLine("done");
/*
0
1
2
3
4
done
*/
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
var result = source.Wait(); //Will arrive in 5 seconds. 
Console.WriteLine(result);
/*
4
*/
var source = Observable.Throw<long>(new Exception("Fail!"));
try
{
    source.Wait();
}
catch (Exception e)
{
    Console.WriteLine(e.Message);
}
/*
Fail!
*/
  • ToEvent 將數據流轉變爲具備 OnNext 事件成員的對象,轉變以後數據流發送數據時 OnNext 事件將被觸發。 Intro to Rx - ToEvent
  • ToEvent 所生成對象的 OnNext 事件不是標準的 .NET 事件,沒有 sender 參數。
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
var result = source.ToEvent();
result.OnNext += val => Console.WriteLine(val);
/*
0
1
2
3
...
*/
  • ToEventPattern 將數據流轉變爲具備 OnNext 事件成員的對象,轉變以後數據流發送數據時 OnNext 事件將被觸發。 Intro to Rx - ToEventPattern
  • 要使用 ToEventPattern,數據流中的數據必須事先轉變成 EventPattern 類型。
  • ToEventPattern 所生成對象的 OnNext 事件是標準的 .NET 事件,有 sender 參數。
public class MyEventArgs : EventArgs 
{ 
    private readonly long _value; 
    public MyEventArgs(long value) 
    { 
        _value = value; 
    } 
    public long Value 
    { 
        get { return _value; } 
    }
} 
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => new EventPattern<MyEventArgs>(null, new MyEventArgs(i)));
var result = source.ToEventPattern();
result.OnNext += (e, val) => Console.WriteLine(val.Value);
/*
0
1
2
3
...
*/
  • RxJava
val values = Observable.range(10, 5)
values
    .reduce(
        ArrayList<Int>()
    ) { acc, value ->
        acc.add(value)
        acc
    }
    .dump()
/*
onSuccess: [10, 11, 12, 13, 14]
*/
val values = Observable.range(10, 5)
values
    .toList()
    .dump()
/*
onSuccess: [10, 11, 12, 13, 14]
*/
val values = Observable.range(10, 5)
values
    .toSortedList { i1, i2 -> i2 - i1 }
    .dump()
/*
onSuccess: [14, 13, 12, 11, 10]
*/
private data class Person(val name: String, val age: Int)
val values = Observable.just(
    Person("Will", 25),
    Person("Nick", 40),
    Person("Saul", 35)
)
values
    .toMap { person -> person.name }
    .dump()
/*
onSuccess: {Saul=Person(name=Saul, age=35), Nick=Person(name=Nick, age=40), Will=Person(name=Will, age=25)}
*/
val values = Observable.just(
    Person("Will", 25),
    Person("Nick", 40),
    Person("Saul", 35)
)
values
    .toMap<String, Int>(
        { person -> person.name },
        { person -> person.age })
    .dump()
/*
onSuccess: {Saul=35, Nick=40, Will=25}
*/
val values = Observable.just(
    Person("Will", 25),
    Person("Nick", 40),
    Person("Saul", 35)
)
values
    .toMap<String, Int>(
        { person -> person.name },
        { person -> person.age },
        { HashMap() })
    .dump()
/*
onSuccess: {Saul=35, Nick=40, Will=25}
*/
val values = Observable.just(
    Person("Will", 35),
    Person("Nick", 40),
    Person("Saul", 35)
)
values
    .toMultimap<Int, String>(
        { person -> person.age },
        { person -> person.name })
    .dump()
/*
onSuccess: {35=[Will, Saul], 40=[Nick]}
*/
val values = Observable.just(
    Person("Will", 35),
    Person("Nick", 40),
    Person("Saul", 35)
)
values
    .toMultimap(
        { person -> person.age },
        { person -> person.name },
        { HashMap<Int, Collection<String>>() },
        { key -> ArrayList() })
    .dump()
/*
onSuccess: {35=[Will, Saul], 40=[Nick]}
*/
  • RxSwift
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10)
    .toArray()
    .subscribe { print($0) }
    .disposed(by: disposeBag)
/*
next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
completed
*/
相關文章
相關標籤/搜索