Wednesday, December 21, 2011

How to use the Notification class in Reactive Extensions

In this topic I will show you how you can use the main three Notification types in RX. They are OnNext, OnError and OnCompleted and you will mainly use them for testing purposes.
public enum NotificationKind
{
    OnNext,
    OnError,
    OnCompleted
}
Let's first create a notification of NotificationKind.OnNext and explore it's properties and methods:
// Create a NotificationKind.OnNext with new Exception
Notification<int> notification = Notification.CreateOnNext(1);

// explore instance properties
Console.WriteLine("Value: {0}" , notification.Value);
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());

var observer = Observer.Create<int>(Console.WriteLine);

// calls the observer's OnNext method 
// with the Value as input parameter
// observer.OnNext(notification.Value);
notification.Accept(observer);

// do the same thing manually
var source = notification.ToObservable();
source.Subscribe(observer.AsObserver());
The code is describe with comments and the output is as follows:
================
Value: 1
HasValue: True
Kind: OnNext
OnNext(1)
1
1
================
A NotificationKind.OnError notification can be created as follows:
// Create a NotificationKind.OnErrorwith value 1
Notification<int> notification = Notification
    .CreateOnError<int>(new Exception("error"));

// explore instance properties
Console.WriteLine("Exception: {0}" , notification.Exception);
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());

var observer = Observer.Create<int>(
        Console.WriteLine,
        error => Console.WriteLine(error.Message)
        );

// calls the observer's OnError method 
// with the Exception as input parameter
// observer.OnError(notification.Exception);
notification.Accept(observer.AsObserver());
================
Exception: System.Exception: error
HasValue: False
Kind: OnError
OnError(System.Exception)
error
================
A NotificationKind.OnCompleted notification can be created as follows:
// Create a NotificationKind.OnCompleted
Notification<int> notification = Notification
    .CreateOnCompleted<int>();

// explore instance properties
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());

var observer = Observer.Create<int>(
        Console.WriteLine,
        () => Console.WriteLine("Completed")
        );

// calls the observer's OnCompleted method 
// observer.OnCompleted();
notification.Accept(observer.AsObserver());
================
HasValue: False
Kind: OnCompleted
OnCompleted()
Completed
================
You can convert an observable to a collection of notifications with the Materialize and ToEnumerable methods. In this case we Concat to observables to produce one sequence and produce an error after the Range ends.
var notifications = Observable.Range(1, 2)
.Concat(Observable.Throw<int>(new InvalidOperationException()))
.Materialize()
.ToEnumerable();

foreach (var notification in notifications)
{
    notification.ToString().Dump();
}
================
OnNext(1)
OnNext(2)
OnError(System.InvalidOperationException)
================
You can achieve the same result with creating an observer from a notification callback. Observer will convert every notification to an appropriate NotificationKind and pass it to the provided callback method.
var notifications = Observable.Range(1, 2)
.Concat(Observable.Throw<int>(new InvalidOperationException()));

Action<Notification<int>> action = notification => {
    notification.ToString().Dump();
};

var observer = action.ToObserver();

notifications.Subscribe(observer);
================
OnNext(1)
OnNext(2)
OnError(System.InvalidOperationException)
================
And finally you can convert back an observer to an Action delegate with the ToNotifier and call it as a method:
var notifier = Observer.Create<int>(
                Console.WriteLine,
                () => "Completed".Dump())
                       .ToNotifier();
                       
notifier(Notification.CreateOnNext(2));    
notifier(Notification.CreateOnCompleted<int>());
================
2
Completed
================

No comments:

Post a Comment