Monday, December 19, 2011

How to create an observer with RX

IObserver interfaces which is used as a receiver for pushed based notifications is defined in the System namespace like this:

public interface IObserver<in T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}

OnNext - is used for notifying the observer about a new value.
OnError - is a notification that an error occurred.
OnCompleted - notifies our observer that the provider has finished with sending notifications.

You can create a new observer with the help of the static class Observer defined in System.Reactive namespace which provides several overloaded Create methods:

Create<T>(Action<T>, Action<Exception>, Action) - creates an observer from OnNext, OnError and OnCompleted actions:

    var observer = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump(),
                        () => "Completed"
                        );

Or you can use implicit values where OnError re-throws the exception and OnCompleted does nothing:

    var observer1 = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump()
                        );    
    var observer2 = Observer.Create<int>(
                        value => value.Dump()
                        );    

You can also write your own class which implements the IObserver interface (in your implementation you should also check for null values in the constructor):

internal class Observer<T> : IObserver<T>
{
    private Action<T> onNext;
    private Action<Exception> onError;
    private Action onCompleted;
    
    public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
    {
        this.onNext = onNext;
        this.onError = onError;
        this.onCompleted = onCompleted;
    }
    
    public void OnNext(T value)
    {
        this.onNext(value);
    }
    
    public void OnError(Exception error)
    {
        this.onError(error);
    }
    
    public void OnCompleted()
    {
        this.onCompleted();
    }
}

Then you can create an instance of the Observer class and notify it about three new values (1, 2 and than 3) and finish the notifications by calling OnCompleted:

void Main()
{
    var observer = new Observer<int>(
                value => value.Dump(),
                error => error.Message.Dump(),
                () => "Completed".Dump()
                );
                
    observer.OnNext(1);
    observer.OnNext(2);
    observer.OnNext(3);
    observer.OnCompleted();
}

The results are:
================
1
2
3
Completed
================
In the next topic I will show you how you can create basic observables which will notify our observer.

No comments:

Post a Comment