本文的主題爲處理 Connectable Observable 的操做符。 這裏的 Observable 實質上是可觀察的數據流。html
RxJava操做符(九)Connectable Observable Operatorsjava
<!-- -->react
Console.WriteLine(MethodBase.GetCurrentMethod().Name); var observable = Observable.Interval(TimeSpan.FromSeconds(1)); using (observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i))) { Thread.Sleep(TimeSpan.FromSeconds(1.5)); using (observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i))) Console.ReadKey(); } /* first subscription : 0 first subscription : 1 second subscription : 0 first subscription : 2 second subscription : 1 first subscription : 3 ... */
val cold = Observable .interval(200, TimeUnit.MILLISECONDS) .take(5) cold.dump("First") Thread.sleep(500) cold.dump("Second") /* First: onNext: 0 First: onNext: 1 First: onNext: 2 Second: onNext: 0 First: onNext: 3 Second: onNext: 1 First: onNext: 4 First: onComplete Second: onNext: 2 Second: onNext: 3 Second: onNext: 4 Second: onComplete */
let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance) _ = interval .subscribe(onNext: { print("Subscription: 1, Event: \($0)") }) delay(5) { _ = interval .subscribe(onNext: { print("Subscription: 2, Event: \($0)") }) } /* Subscription: 1, Event: 0 Subscription: 1, Event: 1 Subscription: 1, Event: 2 Subscription: 1, Event: 3 Subscription: 1, Event: 4 Subscription: 1, Event: 5 Subscription: 2, Event: 0 Subscription: 1, Event: 6 Subscription: 2, Event: 1 Subscription: 1, Event: 7 Subscription: 2, Event: 2 Subscription: 1, Event: 8 ... */
ReactiveX - Connect operator ReactiveX - Publish operator Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと參照カウンタ?RefCountメソッド」 Reactive Extensions再入門 その37「ColdからHotへ!その他のPublish系メソッド」swift
Publish / PublishLast 將普通數據流發佈(轉變)爲可鏈接的數據流。 可鏈接的數據流必須經 Connect 鏈接後纔開始發送數據。 Connect 鏈接可鏈接的數據流,將可鏈接的數據流變爲熱數據流並開始發送數據。 通過 Publish 發佈,Connect 鏈接後的熱數據流在訂閱以後能夠觀察到訂閱以後熱數據流發送的數據。 通過 PublishLast 發佈,Connect 鏈接後的熱數據流在訂閱以後只能觀察到訂閱以後熱數據流最後發送的數據。 Publish / PublishLast 的返回值表明數據流對象,銷燬該對象將銷燬源數據流。 Connect 的返回值表明鏈接對象,銷燬該對象將斷開鏈接,被斷開鏈接的熱數據流將變回可鏈接的數據流。緩存
Multicast 的語義以下ide
.Publish() = .Multicast(new Subject<T>)
.PublishLast() = .Multicast(new AsyncSubject<T>)
.Replay() = .Multicast(new ReplaySubject<T>)
var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Publish(); observable.Connect(); using (observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i))) { Thread.Sleep(TimeSpan.FromSeconds(2)); using (observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i))) Console.ReadKey(); } /* first subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 ... */
var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(TimeSpan.FromSeconds(1)).Publish(); using (observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i))) { Thread.Sleep(TimeSpan.FromSeconds(2)); using (observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i))) { observable.Connect(); Console.ReadKey(); } } /* first subscription : 0 second subscription : 0 first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 ... */
var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period).Publish(); observable.Subscribe(i => Console.WriteLine("subscription : {0}", i)); var exit = false; while (!exit) { Console.WriteLine("Press enter to connect, esc to exit."); var key = Console.ReadKey(true); if (key.Key == ConsoleKey.Enter) { var connection = observable.Connect(); //--Connects here-- Console.WriteLine("Press any key to dispose of connection."); Console.ReadKey(); connection.Dispose(); //--Disconnects here-- } if (key.Key == ConsoleKey.Escape) { exit = true; } } /* Press enter to connect, esc to exit. Press any key to dispose of connection. subscription : 0 subscription : 1 subscription : 2 subscription : 3 Press enter to connect, esc to exit. Press any key to dispose of connection. subscription : 0 subscription : 1 subscription : 2 Press enter to connect, esc to exit. ... */
var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period) .Do(l => Console.WriteLine("Publishing {0}", l)) //Side effect to show it is running .Publish(); var subscription2 = observable.Connect(); Console.WriteLine("Press any key to subscribe"); Console.ReadKey(); var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i)); Console.WriteLine("Press any key to unsubscribe."); Console.ReadKey(); subscription.Dispose(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); subscription2.Dispose(); /* Press any key to subscribe Publishing 0 Publishing 1 Publishing 2 Press any key to unsubscribe. Publishing 3 subscription : 3 Publishing 4 subscription : 4 Publishing 5 subscription : 5 Press any key to exit. Publishing 6 Publishing 7 ... */
var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period) .Take(5) .Do(l => Console.WriteLine("Publishing {0}", l)) //side effect to show it is running .PublishLast(); observable.Connect(); Console.WriteLine("Press any key to subscribe"); Console.ReadKey(); var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i)); Console.WriteLine("Press any key to unsubscribe."); Console.ReadKey(); subscription.Dispose(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); /* Press any key to subscribe Publishing 0 Publishing 1 Publishing 2 Press any key to unsubscribe. Publishing 3 Publishing 4 subscription : 4 Press any key to exit. */
val cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish() cold.connect() val s1 = cold.dump("First") Thread.sleep(500) val s2 = cold.dump("Second") readLine() s1.dispose() s2.dispose() /* First: onNext: 0 First: onNext: 1 First: onNext: 2 Second: onNext: 2 First: onNext: 3 Second: onNext: 3 ... */
val connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish() var s = connectable.connect() connectable.dump() Thread.sleep(1000) println("Closing connection") s.dispose() Thread.sleep(1000) println("Reconnecting") s = connectable.connect() connectable.dump() readLine() s.dispose() /* onNext: 0 onNext: 1 onNext: 2 onNext: 3 Closing connection onNext: 4 Reconnecting onNext: 0 onNext: 1 onNext: 2 ... */
val connectable = Observable.interval(200, TimeUnit.MILLISECONDS).publish() connectable.connect() val s1 = connectable.dump("First") Thread.sleep(500) val s2 = connectable.dump("Second") Thread.sleep(500) println("Disposing second") s2.dispose() readLine() s1.dispose() /* First: onNext: 0 First: onNext: 1 First: onNext: 2 Second: onNext: 2 First: onNext: 3 Second: onNext: 3 Disposing second First: onNext: 4 First: onNext: 5 First: onNext: 6 ... */
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() _ = intSequence .subscribe(onNext: { print("Subscription 1:, Event: \($0)") }) delay(2) { _ = intSequence.connect() } delay(4) { _ = intSequence .subscribe(onNext: { print("Subscription 2:, Event: \($0)") }) } delay(6) { _ = intSequence .subscribe(onNext: { print("Subscription 3:, Event: \($0)") }) } /* Subscription 1:, Event: 0 Subscription 1:, Event: 1 Subscription 2:, Event: 1 Subscription 1:, Event: 2 Subscription 2:, Event: 2 Subscription 1:, Event: 3 Subscription 2:, Event: 3 Subscription 3:, Event: 3 Subscription 1:, Event: 4 Subscription 2:, Event: 4 Subscription 3:, Event: 4 ... */
let subject = PublishSubject<Int>() _ = subject .subscribe(onNext: { print("Subject: \($0)") }) let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .multicast(subject) _ = intSequence .subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") }) delay(2) { _ = intSequence.connect() } delay(4) { _ = intSequence .subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") }) } delay(6) { _ = intSequence .subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") }) } /* Subject: 0 Subscription 1:, Event: 0 Subject: 1 Subscription 1:, Event: 1 Subscription 2:, Event: 1 Subject: 2 Subscription 1:, Event: 2 Subscription 2:, Event: 2 Subject: 3 Subscription 1:, Event: 3 Subscription 2:, Event: 3 Subscription 3:, Event: 3 ... */
ReactiveX - RefCount operator Reactive Extensions再入門 その36「ColdからHotへ!Publishメソッドと參照カウンタ?RefCountメソッド」ui
RefCount 將可鏈接的數據流從新變回普通數據流。 RefCount 還給數據流加上引用計數語義,訂閱時引用計數器加一,取消訂閱時引用計數器減一,訂閱者都取消訂閱時數據流自動銷燬。spa
var period = TimeSpan.FromSeconds(1); var observable = Observable.Interval(period) .Do(l => Console.WriteLine("Publishing {0}", l)) //side effect to show it is running .Publish() .RefCount(); //observable.Connect(); Use RefCount instead now Console.WriteLine("Press any key to subscribe"); Console.ReadKey(); var subscription = observable.Subscribe(i => Console.WriteLine("subscription : {0}", i)); Console.WriteLine("Press any key to unsubscribe."); Console.ReadKey(); subscription.Dispose(); Console.WriteLine("Press any key to exit."); Console.ReadKey(); /* Press any key to subscribe Press any key to unsubscribe. Publishing 0 subscription : 0 Publishing 1 subscription : 1 Publishing 2 subscription : 2 Press any key to exit. */
val cold = Observable.interval(200, TimeUnit.MILLISECONDS).publish().refCount() var s1 = cold.dump("First") Thread.sleep(500) val s2 = cold.dump("Second") Thread.sleep(500) println("Dispose Second") s2.dispose() Thread.sleep(500) println("Dispose First") s1.dispose() println("First connection again") Thread.sleep(500) s1 = cold.dump("First") readLine() s1.dispose() /* First: onNext: 0 First: onNext: 1 First: onNext: 2 Second: onNext: 2 First: onNext: 3 Second: onNext: 3 Dispose Second First: onNext: 4 First: onNext: 5 First: onNext: 6 Dispose First First connection again First: onNext: 0 First: onNext: 1 First: onNext: 2 ... */
ReactiveX - Replay operator Reactive Extensions再入門 その38「ColdからHotへ!その他のPublish系メソッド2」3d
Replay 給熱數據流加上 Replay 語義,即緩存全部的數據,使後來的訂閱者也能觀察到訂閱以前所發送的數據。 Replay 能夠指定緩衝區的大小。code
Console.WriteLine(MethodBase.GetCurrentMethod().Name); var hot = Observable.Interval(TimeSpan.FromSeconds(1)) .Take(3) .Publish(); hot.Connect(); Thread.Sleep(TimeSpan.FromSeconds(1.5)); //Run hot and ensure a value is lost. var observable = hot.Replay(); observable.Connect(); observable.Subscribe(i => Console.WriteLine("first subscription : {0}", i)); Thread.Sleep(TimeSpan.FromSeconds(1.5)); observable.Subscribe(i => Console.WriteLine("second subscription : {0}", i)); Console.ReadKey(); observable.Subscribe(i => Console.WriteLine("third subscription : {0}", i)); Console.ReadKey(); /* first subscription : 1 second subscription : 1 first subscription : 2 second subscription : 2 third subscription : 1 third subscription : 2 */
val cold = Observable.interval(200, TimeUnit.MILLISECONDS).replay() cold.connect() println("Subscribe first") val s1 = cold.dump("First") Thread.sleep(700) println("Subscribe second") val s2 = cold.dump("Second") Thread.sleep(500) readLine() s1.dispose() s2.dispose() /* Subscribe first First: onNext: 0 First: onNext: 1 First: onNext: 2 Subscribe second Second: onNext: 0 Second: onNext: 1 Second: onNext: 2 First: onNext: 3 Second: onNext: 3 First: onNext: 4 Second: onNext: 4 ... */
val source = Observable.interval(1000, TimeUnit.MILLISECONDS) .take(5) .replay(2) source.connect() Thread.sleep(4500) source.dump() /* onNext: 2 onNext: 3 onNext: 4 onComplete */
val source = Observable.interval(1000, TimeUnit.MILLISECONDS) .take(5) .replay(2000, TimeUnit.MILLISECONDS) source.connect() Thread.sleep(4500) source.dump() /* onNext: 2 onNext: 3 onNext: 4 onComplete */
val obs = Observable.interval(100, TimeUnit.MILLISECONDS) .take(5) .cache() Thread.sleep(500) obs.dump("First") Thread.sleep(300) obs.dump("Second") /* First: onNext: 0 First: onNext: 1 First: onNext: 2 Second: onNext: 0 Second: onNext: 1 Second: onNext: 2 First: onNext: 3 Second: onNext: 3 First: onNext: 4 Second: onNext: 4 First: onComplete Second: onComplete */
val obs = Observable.interval(100, TimeUnit.MILLISECONDS) .take(5) .doOnNext { println(it) } .cache() .doOnSubscribe { println("Subscribed") } .doOnDispose { println("Disposed") } val subscription = obs.subscribe() Thread.sleep(150) subscription.dispose() /* Subscribed 0 Disposed 1 2 3 4 */
RxSwift: share vs replay vs shareReplay
let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .replay(5) _ = intSequence .subscribe(onNext: { print("Subscription 1:, Event: \($0)") }) delay(2) { _ = intSequence.connect() } delay(4) { _ = intSequence .subscribe(onNext: { print("Subscription 2:, Event: \($0)") }) } delay(8) { _ = intSequence .subscribe(onNext: { print("Subscription 3:, Event: \($0)") }) } /* Subscription 1:, Event: 0 (after 3 secs) Subscription 2:, Event: 0 (after 4 secs) Subscription 1:, Event: 1 (after 4 secs) Subscription 2:, Event: 1 (after 4 secs) Subscription 1:, Event: 2 (after 5 secs) Subscription 2:, Event: 2 (after 5 secs) Subscription 1:, Event: 3 (after 6 secs) Subscription 2:, Event: 3 (after 6 secs) Subscription 1:, Event: 4 (after 7 secs) Subscription 2:, Event: 4 (after 7 secs) Subscription 3:, Event: 0 (after 8 secs) Subscription 3:, Event: 1 (after 8 secs) Subscription 3:, Event: 2 (after 8 secs) Subscription 3:, Event: 3 (after 8 secs) Subscription 3:, Event: 4 (after 8 secs) Subscription 1:, Event: 5 (after 9 secs) Subscription 2:, Event: 5 (after 9 secs) Subscription 3:, Event: 5 (after 9 secs) ... */