Saturday, December 24, 2011

Creating a Subject in Reactive Extensions and the difference between cold and hot observables.

System.Reactive.Subjects namespace contains implementations for the ISubject interface. Subject simply put is both an observable and an observer:
public interface ISubject<in TSource, out TResult> 
: IObserver<TSource>, IObservable<TResult>
{ }

public interface ISubject<T> 
: ISubject<T, T>
{ }
You can create an instance of it with the static Subject.Create method and notify the injected observer with two new values:
var observable = Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5);

var observer = Observer.Create<long>(
    x => Console.WriteLine("Value published to subject #1: {0}", x),
    () => Console.WriteLine("Sequence Completed."));

ISubject<long, long> subject = Subject.Create(observer, observable);

subject.OnNext(1);
subject.OnNext(2);
================
Value published to subject #1: 1
Value published to subject #1: 2
================
Then you can subscribe to the source observable through the subject. It uses the Interval extension method which produces a new value for each period defined as its argument. In our case it will produce a new value every second. Then we use the Take method to limit the infinite sequence to 5 values. Now you can subscribe to the subject which will internally subscribe to the injected observable. But if you wait for two seconds in the current thread with the Sleep method and then subscribe once again to the subject you will get an independent subscription from the previous one. The Interval method will start producing values from the beginning. So you have just subscribed to a cold observable.
var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence 1 Completed.")
    );
        
Thread.Sleep(2000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence 2 Completed.")
);
================

Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 0
Value published to observer #1: 3
Value published to observer #2: 1
Value published to observer #1: 4
Sequence 1 Completed.
Value published to observer #2: 2
Value published to observer #2: 3
Value published to observer #2: 4
Sequence 2 Completed.
================
To create a hot observable behavior you have to create a new instance of the Subject class and subscribe with it to the observable:
ISubject<long> subject = new Subject<long>();

observable.Subscribe(subject);

var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
In this case the new observer has subscribed to the already publishing observable:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================
You can actually promote a cold observable to a hot observable by calling the Publish extension method. It will return you an IConnectableObservable on which you have to call the Connect method before you subscribe to it. In that case the subject implementation is internally hidden from you.
IConnectableObservable<long> observable = 
                Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5)
                .Publish();

observable.Connect();

var d1 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
The results are the same as before:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================

3 comments: