Showing posts with label observer. Show all posts
Showing posts with label observer. Show all posts

Saturday, December 24, 2011

Creating a Subject in Reactive Extensions and the difference between cold and hot observables.

System.Reactive.Subjects namespace contains implementations for the ISubject interface. Subject simply put is both an observable and an observer:
public interface ISubject<in TSource, out TResult> 
: IObserver<TSource>, IObservable<TResult>
{ }

public interface ISubject<T> 
: ISubject<T, T>
{ }
You can create an instance of it with the static Subject.Create method and notify the injected observer with two new values:
var observable = Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5);

var observer = Observer.Create<long>(
    x => Console.WriteLine("Value published to subject #1: {0}", x),
    () => Console.WriteLine("Sequence Completed."));

ISubject<long, long> subject = Subject.Create(observer, observable);

subject.OnNext(1);
subject.OnNext(2);
================
Value published to subject #1: 1
Value published to subject #1: 2
================
Then you can subscribe to the source observable through the subject. It uses the Interval extension method which produces a new value for each period defined as its argument. In our case it will produce a new value every second. Then we use the Take method to limit the infinite sequence to 5 values. Now you can subscribe to the subject which will internally subscribe to the injected observable. But if you wait for two seconds in the current thread with the Sleep method and then subscribe once again to the subject you will get an independent subscription from the previous one. The Interval method will start producing values from the beginning. So you have just subscribed to a cold observable.
var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence 1 Completed.")
    );
        
Thread.Sleep(2000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence 2 Completed.")
);
================

Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 0
Value published to observer #1: 3
Value published to observer #2: 1
Value published to observer #1: 4
Sequence 1 Completed.
Value published to observer #2: 2
Value published to observer #2: 3
Value published to observer #2: 4
Sequence 2 Completed.
================
To create a hot observable behavior you have to create a new instance of the Subject class and subscribe with it to the observable:
ISubject<long> subject = new Subject<long>();

observable.Subscribe(subject);

var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
In this case the new observer has subscribed to the already publishing observable:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================
You can actually promote a cold observable to a hot observable by calling the Publish extension method. It will return you an IConnectableObservable on which you have to call the Connect method before you subscribe to it. In that case the subject implementation is internally hidden from you.
IConnectableObservable<long> observable = 
                Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5)
                .Publish();

observable.Connect();

var d1 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
The results are the same as before:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================

Tuesday, December 20, 2011

How to create an observable with RX

IObservable<T> is defined in the System namespace and defines a provider for push based notifications. It looks like this:
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}
An observable has to implement this interface. Than you can call the Subscribe function on it via which you can inject an observer to obtain notifications from it. The function returns an IDisposable which defines the Dispose method.
You can create a new observable in a lot of different ways. I'm going to show you the basic methods of the Observable static class from System.Reactive.Linq namespace for creating observables. This class also contains a lot of LINQ query operators implemented as extension methods such as Where, Take, etc.
All examples in this article will use the following observer instance:
    var observer = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump(),
                        () => "Completed".Dump()
                        );
The easiest way how to create an observable which notifies us about one value is the Next extension method and subscribe to it:
    var source = Observable.Return(1);
    
    source.Subscribe(observer);
The results are:
================
1
Completed
================  
If you would like to simulate an error condition you can use the Throw extension method. In this case the OnCompleted is not called:
    var source = Observable.Throw<int>(new Exception("Error has occurred."));
    
    source.Subscribe(observer);
The result is:
================
Error has occurred.
================
An empty observable can be created with the Empty extension method. In this case only the OnCompleted method of the observer is called:
    var source = Observable.Empty<int>();
    
    source.Subscribe(observer);
The result is:
================
Completed
================
Ok, so now it's time to create your first own IObservable implementation:
internal class DefaultObservable<T> : IObservable<T>
{
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        
        observer.OnNext(default(T));
        observer.OnCompleted();
        
        return Disposable.Empty;
    }
}
Now you can create an instance of the DefaultObservable class which notifies our observer with the default value of type T.
    var source = new DefaultObservable<int>();
    IDisposable d = source.Subscribe(observer);
After you subscribe to it you will receive the following output:
================
1
Completed
================
In the next topic we will focus on creating observables which are producing multiple notifications.

Monday, December 19, 2011

How to create an observer with RX

IObserver interfaces which is used as a receiver for pushed based notifications is defined in the System namespace like this:

public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

OnNext - is used for notifying the observer about a new value.
OnError - is a notification that an error occurred.
OnCompleted - notifies our observer that the provider has finished with sending notifications.

You can create a new observer with the help of the static class Observer defined in System.Reactive namespace which provides several overloaded Create methods:

Create<T>(Action<T>, Action<Exception>, Action) - creates an observer from OnNext, OnError and OnCompleted actions:

    var observer = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump(),
                        () => "Completed"
                        );

Or you can use implicit values where OnError re-throws the exception and OnCompleted does nothing:

    var observer1 = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump()
                        );    
    var observer2 = Observer.Create<int>(
                        value => value.Dump()
                        );    

You can also write your own class which implements the IObserver interface (in your implementation you should also check for null values in the constructor):

internal class Observer<T> : IObserver<T>
{
    private Action<T> onNext;
    private Action<Exception> onError;
    private Action onCompleted;
    
    public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.onNext = onNext;
        this.onError = onError;
        this.onCompleted = onCompleted;
    }
    
    public void OnNext(T value)
    {
        this.onNext(value);
    }
    
    public void OnError(Exception error)
    {
        this.onError(error);
    }
    
    public void OnCompleted()
    {
        this.onCompleted();
    }
}

Then you can create an instance of the Observer class and notify it about three new values (1, 2 and than 3) and finish the notifications by calling OnCompleted:

void Main()
{
    var observer = new Observer<int>(
                value => value.Dump(),
                error => error.Message.Dump(),
                () => "Completed".Dump()
                );
                
    observer.OnNext(1);
    observer.OnNext(2);
    observer.OnNext(3);
    observer.OnCompleted();
}

The results are:
================
1
2
3
Completed
================
In the next topic I will show you how you can create basic observables which will notify our observer.