Showing posts with label observable. Show all posts
Showing posts with label observable. Show all posts

Saturday, December 24, 2011

Creating a Subject in Reactive Extensions and the difference between cold and hot observables.

System.Reactive.Subjects namespace contains implementations for the ISubject interface. Subject simply put is both an observable and an observer:
public interface ISubject<in TSource, out TResult> 
: IObserver<TSource>, IObservable<TResult>
{ }

public interface ISubject<T> 
: ISubject<T, T>
{ }
You can create an instance of it with the static Subject.Create method and notify the injected observer with two new values:
var observable = Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5);

var observer = Observer.Create<long>(
    x => Console.WriteLine("Value published to subject #1: {0}", x),
    () => Console.WriteLine("Sequence Completed."));

ISubject<long, long> subject = Subject.Create(observer, observable);

subject.OnNext(1);
subject.OnNext(2);
================
Value published to subject #1: 1
Value published to subject #1: 2
================
Then you can subscribe to the source observable through the subject. It uses the Interval extension method which produces a new value for each period defined as its argument. In our case it will produce a new value every second. Then we use the Take method to limit the infinite sequence to 5 values. Now you can subscribe to the subject which will internally subscribe to the injected observable. But if you wait for two seconds in the current thread with the Sleep method and then subscribe once again to the subject you will get an independent subscription from the previous one. The Interval method will start producing values from the beginning. So you have just subscribed to a cold observable.
var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence 1 Completed.")
    );
        
Thread.Sleep(2000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence 2 Completed.")
);
================

Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 0
Value published to observer #1: 3
Value published to observer #2: 1
Value published to observer #1: 4
Sequence 1 Completed.
Value published to observer #2: 2
Value published to observer #2: 3
Value published to observer #2: 4
Sequence 2 Completed.
================
To create a hot observable behavior you have to create a new instance of the Subject class and subscribe with it to the observable:
ISubject<long> subject = new Subject<long>();

observable.Subscribe(subject);

var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
In this case the new observer has subscribed to the already publishing observable:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================
You can actually promote a cold observable to a hot observable by calling the Publish extension method. It will return you an IConnectableObservable on which you have to call the Connect method before you subscribe to it. In that case the subject implementation is internally hidden from you.
IConnectableObservable<long> observable = 
                Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5)
                .Publish();

observable.Connect();

var d1 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
The results are the same as before:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================

Wednesday, December 21, 2011

Preparing for unit testing with testable observables

In the following example we are going to explore the basic functionality provided to help us with unit testing of RX observables. First we have to create an instance of the class TestScheduler which implements ISchudeler interface. This will enable us to schedule some work for the future. Then we call the CreateObserver method on it. It will return us an ITestableObserver. There is a property called Messages which is an addition to the classical IObservable interface. It will contain the actions sent to the previously created MockObserver after calling the Start method on the testScheduler. Next we will schedule two notifications. There are three possibilities in the NotificationKind enumeration (OnNext, OnError and OnCompleted). You can create these with Notification.CreateOnNext, Notification.CreateOnError and Notification.CreateOnCompleted static methods. These actions are scheduled to run after 100 and 200 virtual ticks. Schedule contains a callback which will be executed when the virtual time will be advanced to this point. In this case we are calling the notification's Accept method which invokes the mock observer's method corresponding to the notification. In our case it's OnNext for the our first scheduled item and for the second one it's OnCompleted. After we have subscribed to the testable observable we can start the scheduler. This will actually run our scheduled items an send them to the mock observer. This observer will record the received notifications to it's Messages collection.
    var testScheduler = new TestScheduler();
    
    var testableObserver = testScheduler.CreateObserver<int>();
    
    testScheduler.ScheduleAbsolute(Notification.CreateOnNext<int>(2), 100L, (scheduler, state) => {
        state.Accept(testableObserver);
        return Disposable.Empty;
    });
    
    testScheduler.ScheduleAbsolute(Notification.CreateOnCompleted<int>(), 200L, (scheduler, state) => {
        state.Accept(testableObserver);
        return Disposable.Empty;
    });
    
    testScheduler.Start();
    
    foreach (var message in testableObserver.Messages)
    {
        Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
    }
Finally we will send the recorded messages to the output:
================
Value OnNext(2) at 100
Value OnCompleted() at 200
================
We can achieve the same result with the use of CreateColdObservable method. It receives as an input a collection of notification records. Record's Value has to be a Notification which are in this case created with the help of static methods ReactiveTest.OnNext and ReactiveTest.OnCompleted (there also exists a ReactiveTest.OnError). In case of OnNext the first parameter is the relative schedule time and the value of the notification.
    var testScheduler = new TestScheduler();            
    
    var records = new Recorded<Notification<int>>[] {
        ReactiveTest.OnNext(100, 1),
        ReactiveTest.OnCompleted<int>(200)
    };
    
    var testableObserver = testScheduler.CreateObserver<int>();
    var testableObservable = testScheduler.CreateColdObservable(records);
    
    testableObservable.Subscribe(testableObserver);
    
    testScheduler.Start();
    
    foreach (var message in testableObserver.Messages)
    {
        Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
    }
Yet another way how to achieve the same thing is to provide a create observable method and provide creation, subscription and disposal time to the Start method of the test scheduler. In this case the notifications are scheduled relatively to the subscription time. So now we don't have to create a mock observer manually and subscribe to the test observable, it will be managed by the scheduler.
    var testScheduler = new TestScheduler();            
    
    var records = new Recorded<Notification<int>>[] {
        ReactiveTest.OnNext(100, 1),
        ReactiveTest.OnCompleted<int>(200)
    };
    
    var testableObserver = testScheduler.Start(
        () => testScheduler.CreateColdObservable(records),
        0, 50, 300
    );
    
    foreach (var message in testableObserver.Messages)
    {
        Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
    }
The results are:
================
Value OnNext(1) at 150
Value OnCompleted() at 250
================
That's it for now. Next we will explore subjects and the difference between cold and hot observables.

Tuesday, December 20, 2011

Creating basic observable collections with RX

In the previous article we created some observables which produced only single notification. This time we will focus on more collection like notifications. We will start with the Repeat method which repeats the first argument n-times based on the second argument:
    var source = Observable.Repeat(1, 3);
    
    IDisposable d = source.Subscribe(
                        value => value.Dump(),
                        error => error.Message.Dump(),
                        () => "Completed".Dump()
    );
The above mentioned code snippet uses an extension method defined in the System.ObservableExtensions static class. It provides you several overloaded versions of ObservableExtensions.Subscribe in case you don't want to create your own observer, but only provide the action methods.
If you are using Visual Studio you have to use the subscribe method with System.Console instead of the Dump method.
    IDisposable d = source.Subscribe(
                    Console.WriteLine,
                    error => Console.WriteLine(error),
                    () => Console.WriteLine("Completed")
    );
The output from LINQPad is as follows:
================
1
1
1
Completed
================
Now we are going to create a Range of notifications. The first argument is the starting value and the second is how many values we will receive together:
    var source = Observable.Range(1, 3);
    
    IDisposable d = source.Subscribe(observer);
================
1
2
3
Completed
================
The last example is the most generic one. The Generate extension method needs these five arguments: an initial state, a condition when to stop, an iterator how to get the next value and a result selector to shape the result:
    var source = Observable.Generate(
                    1, 
                    value => value <= 3,
                    value => value + 1,
                    value => value
                    );                    
    
    IDisposable d = source.Subscribe(observer);
The initial state is set to 1. In every "iteration" the state is incremented by one while the condition is met. The resulting shape of the values won't be changed in this case. This time we have created the exactly same functionality as the Repeat method provides:
================
1
2
3
Completed
================
Next time we will create a testable observable.

How to create an observable with RX

IObservable<T> is defined in the System namespace and defines a provider for push based notifications. It looks like this:
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}
An observable has to implement this interface. Than you can call the Subscribe function on it via which you can inject an observer to obtain notifications from it. The function returns an IDisposable which defines the Dispose method.
You can create a new observable in a lot of different ways. I'm going to show you the basic methods of the Observable static class from System.Reactive.Linq namespace for creating observables. This class also contains a lot of LINQ query operators implemented as extension methods such as Where, Take, etc.
All examples in this article will use the following observer instance:
    var observer = Observer.Create<int>(
                        value => value.Dump(),
                        error => error.Message.Dump(),
                        () => "Completed".Dump()
                        );
The easiest way how to create an observable which notifies us about one value is the Next extension method and subscribe to it:
    var source = Observable.Return(1);
    
    source.Subscribe(observer);
The results are:
================
1
Completed
================  
If you would like to simulate an error condition you can use the Throw extension method. In this case the OnCompleted is not called:
    var source = Observable.Throw<int>(new Exception("Error has occurred."));
    
    source.Subscribe(observer);
The result is:
================
Error has occurred.
================
An empty observable can be created with the Empty extension method. In this case only the OnCompleted method of the observer is called:
    var source = Observable.Empty<int>();
    
    source.Subscribe(observer);
The result is:
================
Completed
================
Ok, so now it's time to create your first own IObservable implementation:
internal class DefaultObservable<T> : IObservable<T>
{
    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        
        observer.OnNext(default(T));
        observer.OnCompleted();
        
        return Disposable.Empty;
    }
}
Now you can create an instance of the DefaultObservable class which notifies our observer with the default value of type T.
    var source = new DefaultObservable<int>();
    IDisposable d = source.Subscribe(observer);
After you subscribe to it you will receive the following output:
================
1
Completed
================
In the next topic we will focus on creating observables which are producing multiple notifications.