Be the first user to complete this post
|
Add to List |
Understanding the Rx multicast operator
By default, observables are unicast. i.e. each subscriber causes the observable to be invoked independently. In contrast, a multicasted observable is one that emits the same data to all the subscribers.
This is achieved by calling the multicast(subject)
function on the observable instance and passing it a subject. For e.g.
var multicast = Rx.Observable
.interval(500)
.take(4)
.multicast(new Rx.Subject());
Think of the subject as a middleman in a multicast observable. It merely relays data from the underlying observable and shares it among all the subscribers.A multicasted observable also differs in terms of when it can begin execution. You control it by calling the
connect()
function on the observable instance.
A multicast observable is also known as a connectable observable. aka. Data is emitted only after you invoke the connect()
function on the multicasted instance. This is because the subject only subscribes to the underlying observable when connect() is invoked.
var multicast = Rx.Observable
.interval(500)
.take(4)
.multicast(new Rx.Subject());
// Nothing happens on subscription
multicast.subscribe(d => console.log('Data + ' + d));
// This triggers the underlying subscription
// and data begins to arrive on the subscriber
multicast.connect();
Multicast with late subscribers
The multicast operation is special because the underlying observable executes just once, and not on a per subscription basis. However this has an important implication. If the underlying observer completes, the middleman subject also completes. So any subscribers that get added in the future will not receive any data.In general, once a subject completes, future subscribers to a subject will not receive more data.(There are other kinds of subjects that offer a slight variation, but we will cover that in another article.) Lets see an example to demonstrate the completion behaviour of a multicasted observable. Make sure to read the inline comments.
// Create a regular observable
var observable = Rx.Observable.interval(500).take(4);
// Create a subject that we use as a way to mulitcast
var sub = new Rx.Subject();
// Multicast it
var source = observable.multicast(sub);
// Subscriber 1: Add a subscriber before the observer starts emitting
source.subscribe(x => console.log('A : ' + x));
// This is when the subject actually connects to the source
// and data can begin to emit
source.connect();
// Subscriber 2: Add a subscriber while the observer is still emitting,
setTimeout(() => {
source.subscribe(x => console.log('B : ' + x));
}, 1200);
// Subscriber 3: Add a subscriber long after the observable is done emitting
setTimeout(() => {
source.subscribe(x => console.log('C : ' + x));
}, 2400);
// Output
/*
A : 0
A : 1
A : 2
B : 2
A : 3
B : 3
*/
As you notice in the output, subscriber A received all the data points. Subscriber B, since it joins sometime during halftime, just receives the last 2 data points. After that, the source completes, therby completing the middleman subject as well. Eventually when subscriber C joins really late, everything is already completed so it never receives any data.
Sometimes I like to think of a multicasted observable as a live football match. You can only enjoy the game with everyone as long as you join the viewing party while it is being broadcasted live. Once it is over, there is no point joining because, well, it is over.I will cover another variation of the multicasted observable in another article. But I hope you find this article useful.
Also Read:
- The difference between Rxjs combineLatest and withLatestFrom
- Getting started with Rxjs and streams
- Difference between Rxjs Subject and Observable
- Rxjs Observable publish refcount vs share