Showing posts with label RX. Show all posts
Showing posts with label RX. Show all posts

Thursday, January 5, 2012

How to use SelectMany in Reactive Extensions (RX).

You can use the SelectMany extension method in several different ways. The first example shows you that the SelectMany can be used to flatten the result of the Window method from IObservable<IObservable<T>> to IObservable<T>.
var source = Observable.Repeat(1, 3)
            .Window(2)
            .SelectMany(c => c);

source.Subscribe(
    value => value.Dump(),
    error => Console.WriteLine(error.Message),
    () => Console.WriteLine("Completed")
);    
==========
1
1
1
Completed
==========
Or you can produce a collection of values for each produced value:
var source = Observable.Range(1, 3)
            .SelectMany(c => Observable.Repeat(c,3));
==========
1
1
2
1
2
3
2
3
3
Completed
==========
Instead of another observable you can use an enumerable and project the resulting values with result selector:
var source = Observable.Range(1, 3)
            .SelectMany(c => Enumerable.Repeat(c,3), (x, y) => x*y);
==========
1
1
1
4
4
4
9
9
9
Completed
==========
Finally you could produce a new observable for each action - OnNext, OnError and OnCompleted:
var source = Observable.Range(1, 3)
            .SelectMany(value => Observable.Return(1), 
                        error => Observable.Return(-1),
                        () => Observable.Return(0));
==========
1
1
1
0
Completed
==========

Tuesday, January 3, 2012

How to recursively call IScheduler.Schedule in Reactive Extensions.

The following example demonstrates how to recursively call the schedulers Schedule method in order to call itself. The result of this is a "lazy infinite loop":
public static class ObservableExt
{    

    public static IObservable<int> Natural()
    {
        return Natural(Scheduler.CurrentThread);
    }
    
    public static IObservable<int> Natural(IScheduler scheduler)
    {
        if (scheduler == null)
        {
            throw new ArgumentNullException("scheduler");
        }
        int i = 0;
            
        return Observable.Create<int>(observer => {
            return scheduler.Schedule(action => {
                observer.OnNext(i++);
                action();
            });
        });
    }
}
Now we can call the Natural extension method and chain it with other extension methods from RX library. This example uses Take to stop the production of infinite number of values after 5 items.
IObservable<int> naturals = ObservableExt.Natural()
                                         .Take(3);

naturals.Subscribe(
    value => value.Dump(),
    error => Console.WriteLine(error.Message),
    () => Console.WriteLine("Completed")
);    
==========
0
1
2
Completed
==========

Monday, January 2, 2012

The difference between OfType and Cast in Reactive Extensions (RX).

OfType tries to convert an object to the defined type or ignores it if the value is not of the defined type.
public class Person
{
    public string Name { get; set; }
}

object value = new Person { Name = "Kinga" };

var source = Observable.Return(new object())
                       .Concat(Observable.Return(value))
                       .OfType<Person>()
                       .Select(p => p.Name);
==========
OnNext : Kinga
OnCompleted
==========
The Cast method propagates an OnError if the value is not convertable to the defined type and stops the subscription
object value = new Person { Name = "Kinga" };

var source = Observable.Return(new object())
                       .Concat(Observable.Return(value))
                       .Cast<Person>()
                       .Select(p => p.Name);
==========
OnError : Unable to cast object of type 'System.Object' to type 'Person'.
==========

Sunday, January 1, 2012

Combining observable sequences with Concat, Merge, Catch, OnErrorResumeNext and CombineLatest in Reactive Extensions (RX).

Concat concatenates observable sequences in the order you have provided the observables to it:

var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Concat(source2)
       .Subscribe(Console.WriteLine);
So it propagates first all values from the source1 and then from the source2 observable sequence:
==========
1
1
1
2
2
2
==========
An error calls the OnError action and ends the subscription:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Concat(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
==========
1
1
1
An error has occurred.
==========
On the other hand the Merge extension method takes periodically a value from each provided observable sequence:
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Merge(source2)
       .Subscribe(Console.WriteLine);
==========
1
2
1
2
1
2
==========
An error again calls the OnError action and ends the subscription:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Merge(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
==========
1
2
1
2
1
An error has occurred.
==========
Catch propagates the first sequence if everything went OK and ignores the second one, otherwise propagates also the values from the second observable and values from the first one up to the time an error occurred.
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Catch(source2)
       .Subscribe(Console.WriteLine);    
==========
1
1
1
==========
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Catch(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
OnError is not called in this case:
==========
1
1
1
2
2
2
==========
OnErrorResumeNext produces values from both sequences if an error hasn't occurred like Concat:
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);    
==========
1
1
1
2
2
2
==========
but in case something went wrong with the first one it will stop producing values from it and concatenates values from the second one to it:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
OnError is not called in this case:
==========
1
1
1
2
2
2
==========
Finally CombineLatest returns an observable sequence containing the result of combining elements of both sources using the specified result selector function.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<string>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

var source1 = Observable.Interval(TimeSpan.FromSeconds(1))
                        .Timestamp()
                        .Take(3);

var source2 = Observable.Interval(TimeSpan.FromSeconds(3))
                        .Timestamp()
                        .Take(3);

var input = source1.CombineLatest(source2, (x, y) => { 
    return string.Format("{0} - {1}", x.ToString(), y.ToString());
});

using(input.Subscribe(observer))
{
    semaphore.Wait();
}
==========
1@27.12.2011 18:26:47 +01:00 - 0@27.12.2011 18:26:48 +01:00
2@27.12.2011 18:26:48 +01:00 - 0@27.12.2011 18:26:48 +01:00
2@27.12.2011 18:26:48 +01:00 - 1@27.12.2011 18:26:51 +01:00
2@27.12.2011 18:26:48 +01:00 - 2@27.12.2011 18:26:54 +01:00
Completed
==========

Saturday, December 31, 2011

How to subscribe to the observable which reacts first with Amb in Reactive Extensions.

In this example we will create two observables. The first observable source1 is set to start produce values after a due time set to 1 seconds. The second one observable called source2 starts producing 3 values every second immediately after the subscription without a due time. Due to the fact that we are going to use the Amb extension method which propagates the observable sequence that reacts first, the first observable will actually never be run. This is shown with the help of the Do method. It's purpose is to invoke an action for each element in the sequence. In our case it writes a message to the console.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<Timestamped<long>>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

TimeSpan oneSecond = TimeSpan.FromSeconds(1);

var source1 = Observable.Timer(oneSecond, oneSecond)
            .Do(_ => Console.WriteLine("Value from first observable:"))
            .Timestamp().Take(5);
            
var source2 = Observable.Timer(TimeSpan.FromSeconds(0), oneSecond)
            .Do(_ => Console.WriteLine("Value from second observable:"))
            .Timestamp().Take(3);

using(source1.Amb(source2).Subscribe(observer))
{
    semaphore.Wait();
}
==========
Value from second observable:
0@27.12.2011 17:07:42 +01:00
Value from second observable:
1@27.12.2011 17:07:43 +01:00
Value from second observable:
2@27.12.2011 17:07:44 +01:00
Completed
==========
Another interesting aspect of this example I wanted to show you is the use of the SemaphoreSlim which enables us to Wait for a Release of it in the OnCompleted or OnError part of the observer. Without this the using statement would immediately dispose our subscription.

Friday, December 30, 2011

How to use ToAsync, FromAsyncPattern and Start in Reactive Extensions

This example demonstrates how to use the ToAsync and FromAsyncPattern extension methods from RX. First we create an observer and assign the GenerateNumbers function to a generic Func delegate. Then we use its BeginInvoke and EndInvoke functions as input parameters for the FromAsyncPattern method. It will produce you another Func delegate and call it with an input parameter of 7. The result from this function is an observable you can subscribe to.
var observer = Observer.Create<IEnumerable<int>>(
    x => Console.WriteLine(
    "OnNext: {0}", x.Sum()),
    ex => Console.WriteLine("OnError: {0}", ex.Message),
    () => Console.WriteLine("OnCompleted")
);

Func<int, IEnumerable<int>> func = GenerateNumbers;

var source = Observable.FromAsyncPattern<int, IEnumerable<int>>(func.BeginInvoke, func.EndInvoke);

using(source(7).Subscribe(observer))
{
    Console.ReadLine();
}
Here is the implementation of the GenerateNumbers function which produces a sequence of numbers up to the defined number of values:
public IEnumerable<int> GetNumbers(int count)
{
    int i = 1;
    while (i <= count)
    {
        yield return i++;
    }
}
Our observer sums the sequence and prints the result:
==========
OnNext: 28
OnCompleted
==========
The classical way how to achieve the same is:
Func<int, IEnumerable<int>> func = GenerateNumbers;
    
var asyncResult = func.BeginInvoke(7, callback => {
    IEnumerable<int> result = func.EndInvoke(callback);
    Console.WriteLine("Sum of numbers is: {0}", result.Sum());
}, null);
==========
Sum of numbers is: 28
==========
You can also convert a function to an asynchronous function with the ToAsync method. The difference is that the result of the EndInvoke is an observable you can subscribe to:
Func<int, IObservable<IEnumerable<int>>> asyncFunc = func.ToAsync();

var asyncResult2 = asyncFunc.BeginInvoke(7, callback => {
    IObservable<IEnumerable<int>> result = asyncFunc.EndInvoke(callback);
    var disposable = result.Subscribe(value => Console.WriteLine(value.Sum()));
}, null);
==========
28
==========
Finally you can invoke the function asynchronously with the use of Start method:
Observable.Start(() => GetNumbers(4))
          .Subscribe(observer);
==========
OnNext: 10

OnCompleted
==========

Thursday, December 29, 2011

How to create your own Buffer for Reactive Extensions

This post is intended for learning purposes. You can find a more complex solution in the RX library with lot of overloaded versions of the Buffer method.
public static class ObservableExt
{
    public static IObservable<IList<T>> Buffer2<T>
    (
        this IObservable<T> source, 
        int count
    )
    {
        return Observable.Create<IList<T>>(observer => {
            IList<T> list = new List<T>();
            Action sendAndClearList = () => {
                if (list.Count != 0)
                {
                    observer.OnNext(list);
                    list.Clear();
                }
            };
            int i = 0;
            return source.Subscribe(
                value => {
                    list.Add(value);
                    i++;
                    if (i % count == 0)
                    {
                        sendAndClearList();
                    }
                },
                error => {
                    observer.OnError(error);
                },
                () => {
                    sendAndClearList();
                    observer.OnCompleted();
                }
            );
        });
    }
}
Now you can subscribe to it. This will create an observable with buffers based on the count parameter. Every buffer will contain up to 3 elements in our case.
// you can use Console.WriteLine instead of Dump
var source = Observable.Repeat(1, 4)
.Buffer(3)
.Subscribe(
    v => v.Dump(),
    e => e.Message.Dump(),
    () => "Completed".Dump()
);
This will print the following output into the LINQPad's results window:
==========
List (3 items) 
1
1
1
List (1 item) 
1
Completed
==========

Wednesday, December 28, 2011

When And Then in Reactive Extensions (RX).

The And method matches when both observables have an available value. The Then method projects a value when all observables have an available value. Finally the When joins together the results from several patterns. The example contains an input observable which is created from the Generate method. You should achieve the same result with the Interval method instead as you can see with the second observable, but I wanted to show you how you can create a sequence generator with a time selector as a last argument. The results from the first observable are Timestamped so we can see when was the value generated. The results from both observables are joined together every 2 seconds and sent to the subscriber.
var input = Observable
    .Generate(0, _ => true, x => ++x, x => x, _ => TimeSpan.FromSeconds(2))
    .Timestamp()
    .Take(5); 

var pattern = input.And(Observable.Interval(TimeSpan.FromSeconds(1)));
            
var source = 
Observable.When(
          pattern
          .Then((left, right) => String.Format("{0} - {1}", left, right))
          );    
          
var observer = Observer.Create<string>(
    x => Console.WriteLine("OnNext: {0}", x),
    ex => Console.WriteLine("OnError: {0}", ex.Message),
    () => Console.WriteLine("OnCompleted")
);    

source.Subscribe(observer);
==========
OnNext: 0@22.12.2011 20:16:37 +01:00 - 0
OnNext: 1@22.12.2011 20:16:39 +01:00 - 1
OnNext: 2@22.12.2011 20:16:41 +01:00 - 2
OnNext: 3@22.12.2011 20:16:43 +01:00 - 3
OnNext: 4@22.12.2011 20:16:45 +01:00 - 4
OnCompleted
==========

How to create a parallel observer of two observables with the help of the visitor design pattern in RX.

This example will be a little bit more complex, but very dynamic solution for observing and influencing two or more observables with the help of a visitor. First we create an abstract class Direction and its two implementations LeftDirection and RightDirection. These two classes will enable us to decide from which observable came the notification.
public abstract class Direction<T>
{
    public T Value { get; set; }
    
    public static Direction<T> FromLeft(T value)
    {
        return new LeftDirection<T>(value);
    }
    
    public static Direction<T> FromRight(T value)
    {
        return new RightDirection<T>(value);
    }
}

public class LeftDirection<T> : Direction<T>
{
    public LeftDirection(T value)
    {
        this.Value = value;
    }
}

public class RightDirection<T> : Direction<T>
{
    public RightDirection(T value)
    {
        this.Value = value;
    }
}

Next code snippet will enable you to inherit a new class from the abstract DirectionVisitor class and override its VisitDirection methods in order to provide a different functionality for both observables.
public abstract class DirectionVisitor<T>
{
    public Action<Notification<T>, IObserver<T>> Visit(Direction<Notification<T>> direction)
    {
        return VisitDirection(direction as dynamic);
    }
    
    protected abstract Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction);
    
    protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
    {
        return VisitDirection(left as Direction<Notification<T>>);
    }
    
    protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
    {
        return VisitDirection(right as Direction<Notification<T>>);
    }
    
}
The basic idea of the following implementation of the DirectionVisitor is to call the observer's OnCompleted only in case that both observables has already produced an OnCompleted notification. You could influence the generation of the notifications from both observables in a lot of different ways. One observable can also impact the other one.
public class ConvertDirectionToAction<T>: DirectionVisitor<T>
{
    private bool isLeftCompleted;
    private bool isRightCompleted;
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction)
    {
        Action<Notification<T>, IObserver<T>> action = (notif, observer) => {
            NotificationKind kind = notif.Kind;
            switch (notif.Kind)
            {
                case NotificationKind.OnNext: 
                    observer.OnNext(notif.Value);
                    break;
                case NotificationKind.OnError: 
                    observer.OnError(notif.Exception);
                    break;
                case NotificationKind.OnCompleted: 
                    if (this.isLeftCompleted && this.isRightCompleted)
                    {
                        observer.OnCompleted();
                    }
                    break;                    
            }    
        };
        return action;
    }
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
    {
        NotificationKind kind = left.Value.Kind;
        if (kind == NotificationKind.OnCompleted || 
            kind == NotificationKind.OnError)
        {
            this.isLeftCompleted = true;
        }
        
        return VisitDirection(left as Direction<Notification<T>>);
    }
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
    {
        NotificationKind kind = right.Value.Kind;
        if (kind == NotificationKind.OnCompleted || 
            kind == NotificationKind.OnError)
        {
            this.isRightCompleted = true;
        }
        
        return VisitDirection(right as Direction<Notification<T>>);
    }    
}
Next we will need an implementation of IObserver which will be able to observe two observables and run a different action for a notification based on which one produced it. You can inject your own visitor into the Visitor property and change how the ParallelObserver will observe them.
public class ParallelObserver<T> : IObserver<Direction<Notification<T>>>
{
    IObserver<T> observer;
    
    public ParallelObserver(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        this.observer = observer;
    }
    
    public void OnNext(Direction<Notification<T>> value)
    {
        var action = this.Visitor.Visit(value);
        
        action(value.Value, this.observer);
    }
    
    void IObserver<Direction<Notification<T>>>.OnError(Exception error)
    {
    }
    
    void IObserver<Direction<Notification<T>>>.OnCompleted()
    {
    }
    
    private DirectionVisitor<T> visitor;

    public DirectionVisitor<T> Visitor 
    {
        get 
        { 
            return this.visitor ?? 
                   (this.visitor = new ConvertDirectionToAction<T>()); 
        }
        set 
        { 
            this.visitor = value; 
        }    
    }
}
Now you need an extension method which runs both observables in parallel and calls the OnCompleted only when both observables has already finished the work.
public static class ObservableExtensions
{
    public static IObservable<T> Run<T>(this IObservable<T> left, IObservable<T> right)
    {
        return Observable.Create<T>(observer => {
            var parallelObserver = new ParallelObserver<T>(observer);
            
            var disposable1 = left.Materialize()
                         .Select(v => Direction<Notification<T>>.FromLeft(v))
                         .Synchronize()
                         .Subscribe(parallelObserver);
                         
            var disposable2 = right.Materialize()
                          .Select(v => Direction<Notification<T>>.FromRight(v))
                          .Synchronize()
                          .Subscribe(parallelObserver);
                          
            return new CompositeDisposable(2) { disposable1, disposable2 };
        });
    }
}
Try it and test it as a homework.
var source1 = Observable.Repeat(4, 3);
var source2 = Observable.Repeat(1, 3);
    
var input = source1.Run(source2);

input.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Done"));
==========
4
1
4
1
4
1
Done
==========
Without this extension the observer wouldn't receive the same notifications.
var observer = Observer.Create<int>(
    Console.WriteLine, 
    Console.WriteLine, 
    () => Console.WriteLine("Done")
);

var source1 = Observable.Repeat(4, 3).Subscribe(observer);

var source2 = Observable.Repeat(1, 3).Subscribe(observer);
=========
4
4
4
Done
=========

Tuesday, December 27, 2011

How to use FromEventPattern in Reactive Extensions (RX).

This example is about how to convert .NET events into an observable collection on which you can then done standard LINQ operations. The example demonstrates how to get removal notifications from an observable collection in a more declarative manner.
var strings = new System.Collections.ObjectModel.ObservableCollection<string>()
{
    "Item1", "Item2", "Item3"
};

var removals = 
Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>
(
    handler => strings.CollectionChanged += handler,
    handler => strings.CollectionChanged -= handler
)
.Where(e => e.EventArgs.Action == NotifyCollectionChangedAction.Remove)
.SelectMany(c => c.EventArgs.OldItems.Cast<string>());
    
var disposable = removals.Subscribe(GetDefaultObserver<string>("Removed"));
==========
Removed : Item1
Removed : Item2
==========
The observer was generated with this helper function:
public IObserver<T> GetDefaultObserver<T>(string onNextText)
{
    return Observer.Create<T>(
        x => Console.WriteLine("{0} : {1}", onNextText, x),
        ex => Console.WriteLine("OnError: {0}", ex.Message),
        () => Console.WriteLine("OnCompleted")
    );
}
The classical way how to achieve the same with an event handler would be:
strings.CollectionChanged += (sender, ea) => {
    if(ea.Action == NotifyCollectionChangedAction.Remove)
    {
        foreach (var oldItem in ea.OldItems.Cast<string>())
        {
            Console.WriteLine("Removed {0}", oldItem);    
        }
    }
};

Monday, December 26, 2011

How to convert a TPL Task to an IObservable in Reactive Extensions.

In this post we will explore how to create a TPL Task running in a new thread and how to convert it to an observable. First start with a simple Task. The code snippet bellow runs a new process in the background and after simulated 3 seconds returns a value.
Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    return 42;
});
    
task.Start();
    
Console.WriteLine(task.Result);
==========
42
==========
The next example simulates an error caused by cancellation of the background process. This is accomplished with the CancellationTokenSource's Cancel method after one second:
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

Task<int> task = Task.Factory.StartNew<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    token.ThrowIfCancellationRequested();
    return 42;
}, token);

Thread.Sleep(1000);

tokenSource.Cancel();

try {
    Console.WriteLine(task.Result);
}
catch (AggregateException aggrEx)
{
    Console.WriteLine("Error: {0}", aggrEx.InnerException.GetType());
    aggrEx.Handle(ex => {
            if (ex is OperationCanceledException) {
                return true;
            }
            return false;
    });
}
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
So let's convert the first example to an observable. You can use the ToObservable extension method of the static class TaskObservableExtensions from System.Reactive.Threading.Tasks namespace.
Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    return 42;
});
    
IObservable<int> source = task.ToObservable();

source.Subscribe(Console.WriteLine);

task.Start();
==========
42
==========
Now cancel the operation to receive an OnError notification.
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    token.ThrowIfCancellationRequested();
    return 42;
}, token);
    
IObservable<int> source = task.ToObservable();

IDisposable disposable = source.Subscribe(
    Console.WriteLine,
    error => Console.WriteLine("Error: {0}", error.GetType()),
    () => "Completed".Dump()
);

task.Start();

Thread.Sleep(1000);

tokenSource.Cancel();

try {
    Task.WaitAll(task);
}
catch (AggregateException aggrEx)
{
    aggrEx.Handle(ex => {
            if (ex is OperationCanceledException) {
                return true;
            }
            return false;
    });
}

disposable.Dispose();
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
You can also convert an observable to a task with the ToTask method:
var task = Observable.Return(2)
                     .Do(_ => Thread.Sleep(2000))
                     .ToTask();
task.Result.Dump();
 ==========
2
==========

Sunday, December 25, 2011

How to create your own Window observable extension.

In this post I'm going to show you how to create a simple implementation of the Window extension method that can be found in the Reactive Extensions library. This method produces a new observable for each window defined by the count parameter.
public static class ObservableExt
{
    public static IObservable<IObservable<T>> Window<T>
    (
        this IObservable<T> source, 
        int count
    )
    {
        return Observable.Create<IObservable<T>>(observer => {
            int i = 0;
            ISubject<T> subject = new Subject<T>();
            observer.OnNext(subject);
            
            return source.Subscribe(value => {
                subject.OnNext(value);
                if((++i % count) == 0)
                {
                    subject.OnCompleted();
                    subject = new Subject<T>();
                    observer.OnNext(subject);
                }
                }, error => {
                    subject.OnError(error);
                    observer.OnError(error);
                }, () => {
                    subject.OnCompleted();
                    observer.OnCompleted();
                });
        });
    }
}    
Now you can use it as other extensions. The subscription part is more complex as usually, but the only difference is that in the OnNext action of the outer observer you have to subscribe to its input parameter which is of type IObservable:
Observable.Range(1, 10)
.Window(3)
.Subscribe(observable => {    
            observable.Subscribe(
                Console.WriteLine,
                error2 => error2.Message.Dump(),
                () => "Completed2".Dump()
                );
            },
            error => error.Message.Dump(),
            () => "Completed".Dump());
=============
1
2
3
Completed2
4
5
6
Completed2
7
8
9
Completed2
10
Completed2
Completed
=============

Saturday, December 24, 2011

Creating a Subject in Reactive Extensions and the difference between cold and hot observables.

System.Reactive.Subjects namespace contains implementations for the ISubject interface. Subject simply put is both an observable and an observer:
public interface ISubject<in TSource, out TResult> 
: IObserver<TSource>, IObservable<TResult>
{ }

public interface ISubject<T> 
: ISubject<T, T>
{ }
You can create an instance of it with the static Subject.Create method and notify the injected observer with two new values:
var observable = Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5);

var observer = Observer.Create<long>(
    x => Console.WriteLine("Value published to subject #1: {0}", x),
    () => Console.WriteLine("Sequence Completed."));

ISubject<long, long> subject = Subject.Create(observer, observable);

subject.OnNext(1);
subject.OnNext(2);
================
Value published to subject #1: 1
Value published to subject #1: 2
================
Then you can subscribe to the source observable through the subject. It uses the Interval extension method which produces a new value for each period defined as its argument. In our case it will produce a new value every second. Then we use the Take method to limit the infinite sequence to 5 values. Now you can subscribe to the subject which will internally subscribe to the injected observable. But if you wait for two seconds in the current thread with the Sleep method and then subscribe once again to the subject you will get an independent subscription from the previous one. The Interval method will start producing values from the beginning. So you have just subscribed to a cold observable.
var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence 1 Completed.")
    );
        
Thread.Sleep(2000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence 2 Completed.")
);
================

Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 0
Value published to observer #1: 3
Value published to observer #2: 1
Value published to observer #1: 4
Sequence 1 Completed.
Value published to observer #2: 2
Value published to observer #2: 3
Value published to observer #2: 4
Sequence 2 Completed.
================
To create a hot observable behavior you have to create a new instance of the Subject class and subscribe with it to the observable:
ISubject<long> subject = new Subject<long>();

observable.Subscribe(subject);

var d1 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = subject.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
In this case the new observer has subscribed to the already publishing observable:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================
You can actually promote a cold observable to a hot observable by calling the Publish extension method. It will return you an IConnectableObservable on which you have to call the Connect method before you subscribe to it. In that case the subject implementation is internally hidden from you.
IConnectableObservable<long> observable = 
                Observable
                .Interval(TimeSpan.FromSeconds(1))
                .Take(5)
                .Publish();

observable.Connect();

var d1 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #1: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
    );
        
Thread.Sleep(3000);

var d2 = observable.Subscribe(
    v => Console.WriteLine("Value published to observer #2: {0}", v),
    () => Console.WriteLine("Sequence Completed.")
);
The results are the same as before:
================
Value published to observer #1: 0
Value published to observer #1: 1
Value published to observer #1: 2
Value published to observer #2: 2
Value published to observer #1: 3
Value published to observer #2: 3
Value published to observer #1: 4
Value published to observer #2: 4
Sequence 1 Completed.
Sequence 2 Completed.
================

Friday, December 23, 2011

IDisposable implementations in Reactive Extensions

System.Reactive.Disposable namespace contains several implementations of the IDisposable interface in conjunction with RX.
public interface IDisposable
{
    void Dispose();
}
The main reason for the existence of these classes is that they can be used in your implementations of the observable's Subscribe method which returns an IDisposable.
public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}
For example you can return an Empty disposable which actually does nothing:
var source = Observable.Create<int>(
    observer => {
        observer.OnNext(1);
        return Disposable.Empty;
    }
);
Or you can use the static Disposable.Create method and provide it with an action which will be called during disposal:
var source = Observable.Create<int>(
    observer => {
        observer.OnNext(1);
        return Disposable.Create(() => "Dispose".Dump());
    }
);

using (source.Subscribe(v => v.Dump()))
{
}
================
1
Dispose
================
In the following example we will return instead of an IDisposable an action which will be wrapped to an IDisposable:
var source = Observable.Create<int>(
    observer => {
        observer.OnNext(1);
        return () => "Dispose".Dump();
    }
);
A BooleanDisposable will change it's IsDisposed property to false after disposal:
var booleanDisposable = new BooleanDisposable();
if (!booleanDisposable.IsDisposed)
{
    booleanDisposable.Dispose();
}
The SingleAssignmentDisposable's Disposable property can be set only ones as the name suggest, for the second attempt it will raise an InvalidOperationException with the message "Disposable has already been assigned.". You can use it in two different ways. You can first assign your IDisposable to the Disposable property and then dispose the SingleAssignmentDisposable:
var sad = new SingleAssignmentDisposable();

sad.Disposable = 
    Disposable.Create(() => "Single disposed".Dump()); 
    
sad.Dispose();
================
Single disposed
================
Or you can first dispose the SingleAssignmentDisposable and then assign your IDisposable to it. In this case your IDisposable will be automatically disposed during the assignment to the Disposable property:
var sad = new SingleAssignmentDisposable();

sad.Dispose();

sad.Disposable = 
    Disposable.Create(() => "Single disposed".Dump()); 
================
Single disposed
================
SerialDisposable allows you to set multiple times the Disposable property. In that case the current Disposable will be disposed and then change to the new one.
var serialDisposable = new SerialDisposable();
serialDisposable.Disposable = Disposable.Create(() => "Disposed 1".Dump());
"Set a new disposable.".Dump();
serialDisposable.Disposable = Disposable.Create(() => "Disposed 2".Dump());
"Call serial disposable's dispose.".Dump();
serialDisposable.Dispose();
================
Set a new disposable.
Disposed 1
Call serial disposable dispose.
Disposed 2
================
In case that you want to dispose multiple disposables at once you can use the CompositeDisposable class which implements the ICollection<T> interface:
IDisposable disposable1 = 
            Observable.Return(1)
            .Subscribe(Console.WriteLine);
                            
IDisposable disposable2 = 
            Observable.Return(2)
            .Subscribe(Console.WriteLine);
                            
using(new CompositeDisposable(disposable1, disposable2))
{
};
================
1
2
================
or
IDisposable disposable1 = Disposable.Create(() => "Disposed 1".Dump());
IDisposable disposable2 = Disposable.Create(() => "Disposed 2".Dump());
IDisposable disposable3 = Disposable.Create(() => "Disposed 3".Dump());
var compositeDisposable = new CompositeDisposable()
    {
        disposable1,
        disposable2
    };
compositeDisposable.Add(disposable3);
compositeDisposable.Dispose();
================
Disposed 1
Disposed 2
Disposed 3
================
The final example will use a CancellationDisposable which is disposed when you call the Cancel method on the injected CancellationTokenSource:
var cts = new CancellationTokenSource();

var cd = new CancellationDisposable(cts);

cts.Cancel();

cd.IsDisposed.Dump();
================
True
================
There are also other implementations of the IDisposable interface in this namespace such as MultipleAssignmentDisposable, SchedulerDisposable and so on, but I'm not going to cover these in this topic.

Thursday, December 22, 2011

The Aggregate and Scan RX extension methods.

This post is about how to sum all numbers in a range from an observable sequence with the help of the Aggregate method. Then the FirstOrDefault is used to convert the resulting IObservable to an int.
var sumOfNumbers = Observable.Range(1, 10)
                   .Aggregate(0, (x, y) => x + y)
                   .FirstOrDefault();

Console.WriteLine("Sum of numbers from 1 to 10 is {0}", sumOfNumbers);
==========
Sum of numbers from 1 to 10 is 55
==========
Or you can you the Scan method to display the intermediate results with the help of the Do method and finally store the result with the LastOrDefault.
int sumOfNumbers = Observable.Range(1, 10)
.Scan(0, (x, y) => x + y)
.Do(Console.WriteLine)
.LastOrDefault();

Console.WriteLine("Sum of numbers from 1 to 10 is {0}", sumOfNumbers);
==========
1
3
6
10
15
21
28
36
45
55
Sum of numbers from 1 to 10 is 55
==========
You can use the Aggregate method to fill a collection with received values from the observable:
IList<int> collection = 
    Observable.Range(1, 5)
              .Aggregate(new List<int>(), (list, value) => {
                               list.Add(value);
                            return list;
                         })
              .FirstOrDefault();

foreach (var element in collection)
{
    Console.WriteLine(element);
}
==========
1
2
3
4
5
==========

Advancing the time with TestScheduler in Reactive Extensions.

In this post I'm going to show you how you can programmatically advance the time with the test scheduler so you don't have to wait 5 days for an event in your test environment. First we will prepare a testable cold observable. It will schedule us 4 OnNext notifications every 100 virtual ticks and finally after 500 ticks an OnCompleted notification kind.
var testScheduler = new TestScheduler();            
    
var records = new Recorded<Notification<int>>[] {
    ReactiveTest.OnNext(100, 1),
    ReactiveTest.OnNext(200, 2),
    ReactiveTest.OnNext(300, 3),
    ReactiveTest.OnNext(400, 4),
    ReactiveTest.OnCompleted<int>(500)
};

ITestableObserver<int> testableObserver = 
    testScheduler.CreateObserver<int>();
    
ITestableObservable<int> testableObservable = 
    testScheduler.CreateColdObservable(records);

IDisposable d = testableObservable.Subscribe(testableObserver)
Now you can use AdvanceBy which advances the test schedulers Clock by this relative time (testScheduler.Clock+time) and runs all the scheduled work within it. You can also use the AdvanceTo method to run all scheduled items up to this absolute time and again advance the schedulers Clock to this point. The Start method runs the remaining work from the actual Clock time.
int subscrCount = testableObservable.Subscriptions.Count();
Console.WriteLine("Number of subscriptions to the test observable: {0}"
                  , subscrCount);

testScheduler.AdvanceBy(200);
Console.WriteLine("Messages sent({0}) until {1}"
                  , testableObserver.Messages.Count 
                  , testScheduler.Clock);

testScheduler.AdvanceTo(400);
Console.WriteLine("Messages sent({0}) until {1}"
                  , testableObserver.Messages.Count 
                  , testScheduler.Clock);

testScheduler.Start();
Console.WriteLine("Messages sent({0}) until {1}"
                  , testableObserver.Messages.Count 
                  , testScheduler.Clock);
                      
foreach (var message in testableObserver.Messages)
{
    Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
}
At the end you can check that all scheduled items were run and sent to the mock observer.
================
Number of subscriptions to the test observable: 1
Messages sent(2) until 200
Messages sent(4) until 400
Messages sent(5) until 500
Value OnNext(1) at 100
Value OnNext(2) at 200
Value OnNext(3) at 300
Value OnNext(4) at 400
Value OnCompleted() at 500

================

Wednesday, December 21, 2011

How to use the Notification class in Reactive Extensions

In this topic I will show you how you can use the main three Notification types in RX. They are OnNext, OnError and OnCompleted and you will mainly use them for testing purposes.
public enum NotificationKind
{
    OnNext,
    OnError,
    OnCompleted
}
Let's first create a notification of NotificationKind.OnNext and explore it's properties and methods:
// Create a NotificationKind.OnNext with new Exception
Notification<int> notification = Notification.CreateOnNext(1);

// explore instance properties
Console.WriteLine("Value: {0}" , notification.Value);
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());

var observer = Observer.Create<int>(Console.WriteLine);

// calls the observer's OnNext method 
// with the Value as input parameter
// observer.OnNext(notification.Value);
notification.Accept(observer);

// do the same thing manually
var source = notification.ToObservable();
source.Subscribe(observer.AsObserver());
The code is describe with comments and the output is as follows:
================
Value: 1
HasValue: True
Kind: OnNext
OnNext(1)
1
1
================
A NotificationKind.OnError notification can be created as follows:
// Create a NotificationKind.OnErrorwith value 1
Notification<int> notification = Notification
    .CreateOnError<int>(new Exception("error"));

// explore instance properties
Console.WriteLine("Exception: {0}" , notification.Exception);
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());

var observer = Observer.Create<int>(
        Console.WriteLine,
        error => Console.WriteLine(error.Message)
        );

// calls the observer's OnError method 
// with the Exception as input parameter
// observer.OnError(notification.Exception);
notification.Accept(observer.AsObserver());
================
Exception: System.Exception: error
HasValue: False
Kind: OnError
OnError(System.Exception)
error
================
A NotificationKind.OnCompleted notification can be created as follows:
// Create a NotificationKind.OnCompleted
Notification<int> notification = Notification
    .CreateOnCompleted<int>();

// explore instance properties
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());

var observer = Observer.Create<int>(
        Console.WriteLine,
        () => Console.WriteLine("Completed")
        );

// calls the observer's OnCompleted method 
// observer.OnCompleted();
notification.Accept(observer.AsObserver());
================
HasValue: False
Kind: OnCompleted
OnCompleted()
Completed
================
You can convert an observable to a collection of notifications with the Materialize and ToEnumerable methods. In this case we Concat to observables to produce one sequence and produce an error after the Range ends.
var notifications = Observable.Range(1, 2)
.Concat(Observable.Throw<int>(new InvalidOperationException()))
.Materialize()
.ToEnumerable();

foreach (var notification in notifications)
{
    notification.ToString().Dump();
}
================
OnNext(1)
OnNext(2)
OnError(System.InvalidOperationException)
================
You can achieve the same result with creating an observer from a notification callback. Observer will convert every notification to an appropriate NotificationKind and pass it to the provided callback method.
var notifications = Observable.Range(1, 2)
.Concat(Observable.Throw<int>(new InvalidOperationException()));

Action<Notification<int>> action = notification => {
    notification.ToString().Dump();
};

var observer = action.ToObserver();

notifications.Subscribe(observer);
================
OnNext(1)
OnNext(2)
OnError(System.InvalidOperationException)
================
And finally you can convert back an observer to an Action delegate with the ToNotifier and call it as a method:
var notifier = Observer.Create<int>(
                Console.WriteLine,
                () => "Completed".Dump())
                       .ToNotifier();
                       
notifier(Notification.CreateOnNext(2));    
notifier(Notification.CreateOnCompleted<int>());
================
2
Completed
================