Tuesday, December 20, 2011

How to create an observable with RX

IObservable<T> is defined in the System namespace and defines a provider for push based notifications. It looks like this:
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}
An observable has to implement this interface. Than you can call the Subscribe function on it via which you can inject an observer to obtain notifications from it. The function returns an IDisposable which defines the Dispose method.
You can create a new observable in a lot of different ways. I'm going to show you the basic methods of the Observable static class from System.Reactive.Linq namespace for creating observables. This class also contains a lot of LINQ query operators implemented as extension methods such as Where, Take, etc.
All examples in this article will use the following observer instance:
    var observer = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump(),
                        () => "Completed".Dump()
                        );
The easiest way how to create an observable which notifies us about one value is the Next extension method and subscribe to it:
    var source = Observable.Return(1);
    
    source.Subscribe(observer);
The results are:
================
1
Completed
================  
If you would like to simulate an error condition you can use the Throw extension method. In this case the OnCompleted is not called:
    var source = Observable.Throw<int>(new Exception("Error has occurred."));
    
    source.Subscribe(observer);
The result is:
================
Error has occurred.
================
An empty observable can be created with the Empty extension method. In this case only the OnCompleted method of the observer is called:
    var source = Observable.Empty<int>();
    
    source.Subscribe(observer);
The result is:
================
Completed
================
Ok, so now it's time to create your first own IObservable implementation:
internal class DefaultObservable<T> : IObservable<T>
{
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        
        observer.OnNext(default(T));
        observer.OnCompleted();
        
        return Disposable.Empty;
    }
}
Now you can create an instance of the DefaultObservable class which notifies our observer with the default value of type T.
    var source = new DefaultObservable<int>();
    IDisposable d = source.Subscribe(observer);
After you subscribe to it you will receive the following output:
================
1
Completed
================
In the next topic we will focus on creating observables which are producing multiple notifications.

No comments:

Post a Comment