ConnectableObservable has the connect() method to conveniently dictate the start of the shared execution of the source Observable. However, we need a mechanism to dictate the stop of the shared execution, otherwise a leak happens. This lesson will teach you how to do that, and it's all about Subscriptions.app
var connectableObservable = Rx.Observable.interval(1000) .do(x => console.log('source ' + x)) .multicast(new Rx.Subject()); var observerA = { next: function (x) { console.log('A next ' + x); }, error: function (err) { console.log('A error ' + err); }, complete: function () { console.log('A done'); }, }; var sub = connectableObservable.connect(); // start var subA = connectableObservable.subscribe(observerA); var observerB = { next: function (x) { console.log('B next ' + x); }, error: function (err) { console.log('B error ' + err); }, complete: function () { console.log('B done'); }, }; var subB; setTimeout(function () { subB = connectableObservable.subscribe(observerB); }, 2000); setTimeout(function () { sub.unsubscribe(); // stop console.log('unsubscribed shared execution'); }, 5000);
Just remember that with connect
we are manually controlling the start of the shared execution, and then we keep a subscription in order to manually control the stop of the shared execution. All of this is in order to avoid leaks.less