Thursday, January 5, 2012

How to use SelectMany in Reactive Extensions (RX).

You can use the SelectMany extension method in several different ways. The first example shows you that the SelectMany can be used to flatten the result of the Window method from IObservable<IObservable<T>> to IObservable<T>.
var source = Observable.Repeat(1, 3)
            .Window(2)
            .SelectMany(c => c);

source.Subscribe(
    value => value.Dump(),
    error => Console.WriteLine(error.Message),
    () => Console.WriteLine("Completed")
);    
==========
1
1
1
Completed
==========
Or you can produce a collection of values for each produced value:
var source = Observable.Range(1, 3)
            .SelectMany(c => Observable.Repeat(c,3));
==========
1
1
2
1
2
3
2
3
3
Completed
==========
Instead of another observable you can use an enumerable and project the resulting values with result selector:
var source = Observable.Range(1, 3)
            .SelectMany(c => Enumerable.Repeat(c,3), (x, y) => x*y);
==========
1
1
1
4
4
4
9
9
9
Completed
==========
Finally you could produce a new observable for each action - OnNext, OnError and OnCompleted:
var source = Observable.Range(1, 3)
            .SelectMany(value => Observable.Return(1), 
                        error => Observable.Return(-1),
                        () => Observable.Return(0));
==========
1
1
1
0
Completed
==========

Tuesday, January 3, 2012

How to recursively call IScheduler.Schedule in Reactive Extensions.

The following example demonstrates how to recursively call the schedulers Schedule method in order to call itself. The result of this is a "lazy infinite loop":
public static class ObservableExt
{    

    public static IObservable<int> Natural()
    {
        return Natural(Scheduler.CurrentThread);
    }
    
    public static IObservable<int> Natural(IScheduler scheduler)
    {
        if (scheduler == null)
        {
            throw new ArgumentNullException("scheduler");
        }
        int i = 0;
            
        return Observable.Create<int>(observer => {
            return scheduler.Schedule(action => {
                observer.OnNext(i++);
                action();
            });
        });
    }
}
Now we can call the Natural extension method and chain it with other extension methods from RX library. This example uses Take to stop the production of infinite number of values after 5 items.
IObservable<int> naturals = ObservableExt.Natural()
                                         .Take(3);

naturals.Subscribe(
    value => value.Dump(),
    error => Console.WriteLine(error.Message),
    () => Console.WriteLine("Completed")
);    
==========
0
1
2
Completed
==========

Monday, January 2, 2012

The difference between OfType and Cast in Reactive Extensions (RX).

OfType tries to convert an object to the defined type or ignores it if the value is not of the defined type.
public class Person
{
    public string Name { get; set; }
}

object value = new Person { Name = "Kinga" };

var source = Observable.Return(new object())
                       .Concat(Observable.Return(value))
                       .OfType<Person>()
                       .Select(p => p.Name);
==========
OnNext : Kinga
OnCompleted
==========
The Cast method propagates an OnError if the value is not convertable to the defined type and stops the subscription
object value = new Person { Name = "Kinga" };

var source = Observable.Return(new object())
                       .Concat(Observable.Return(value))
                       .Cast<Person>()
                       .Select(p => p.Name);
==========
OnError : Unable to cast object of type 'System.Object' to type 'Person'.
==========

Sunday, January 1, 2012

Combining observable sequences with Concat, Merge, Catch, OnErrorResumeNext and CombineLatest in Reactive Extensions (RX).

Concat concatenates observable sequences in the order you have provided the observables to it:

var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Concat(source2)
       .Subscribe(Console.WriteLine);
So it propagates first all values from the source1 and then from the source2 observable sequence:
==========
1
1
1
2
2
2
==========
An error calls the OnError action and ends the subscription:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Concat(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
==========
1
1
1
An error has occurred.
==========
On the other hand the Merge extension method takes periodically a value from each provided observable sequence:
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Merge(source2)
       .Subscribe(Console.WriteLine);
==========
1
2
1
2
1
2
==========
An error again calls the OnError action and ends the subscription:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Merge(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
==========
1
2
1
2
1
An error has occurred.
==========
Catch propagates the first sequence if everything went OK and ignores the second one, otherwise propagates also the values from the second observable and values from the first one up to the time an error occurred.
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Catch(source2)
       .Subscribe(Console.WriteLine);    
==========
1
1
1
==========
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Catch(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
OnError is not called in this case:
==========
1
1
1
2
2
2
==========
OnErrorResumeNext produces values from both sequences if an error hasn't occurred like Concat:
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);    
==========
1
1
1
2
2
2
==========
but in case something went wrong with the first one it will stop producing values from it and concatenates values from the second one to it:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
OnError is not called in this case:
==========
1
1
1
2
2
2
==========
Finally CombineLatest returns an observable sequence containing the result of combining elements of both sources using the specified result selector function.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<string>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

var source1 = Observable.Interval(TimeSpan.FromSeconds(1))
                        .Timestamp()
                        .Take(3);

var source2 = Observable.Interval(TimeSpan.FromSeconds(3))
                        .Timestamp()
                        .Take(3);

var input = source1.CombineLatest(source2, (x, y) => { 
    return string.Format("{0} - {1}", x.ToString(), y.ToString());
});

using(input.Subscribe(observer))
{
    semaphore.Wait();
}
==========
1@27.12.2011 18:26:47 +01:00 - 0@27.12.2011 18:26:48 +01:00
2@27.12.2011 18:26:48 +01:00 - 0@27.12.2011 18:26:48 +01:00
2@27.12.2011 18:26:48 +01:00 - 1@27.12.2011 18:26:51 +01:00
2@27.12.2011 18:26:48 +01:00 - 2@27.12.2011 18:26:54 +01:00
Completed
==========

Saturday, December 31, 2011

How to subscribe to the observable which reacts first with Amb in Reactive Extensions.

In this example we will create two observables. The first observable source1 is set to start produce values after a due time set to 1 seconds. The second one observable called source2 starts producing 3 values every second immediately after the subscription without a due time. Due to the fact that we are going to use the Amb extension method which propagates the observable sequence that reacts first, the first observable will actually never be run. This is shown with the help of the Do method. It's purpose is to invoke an action for each element in the sequence. In our case it writes a message to the console.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<Timestamped<long>>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

TimeSpan oneSecond = TimeSpan.FromSeconds(1);

var source1 = Observable.Timer(oneSecond, oneSecond)
            .Do(_ => Console.WriteLine("Value from first observable:"))
            .Timestamp().Take(5);
            
var source2 = Observable.Timer(TimeSpan.FromSeconds(0), oneSecond)
            .Do(_ => Console.WriteLine("Value from second observable:"))
            .Timestamp().Take(3);

using(source1.Amb(source2).Subscribe(observer))
{
    semaphore.Wait();
}
==========
Value from second observable:
0@27.12.2011 17:07:42 +01:00
Value from second observable:
1@27.12.2011 17:07:43 +01:00
Value from second observable:
2@27.12.2011 17:07:44 +01:00
Completed
==========
Another interesting aspect of this example I wanted to show you is the use of the SemaphoreSlim which enables us to Wait for a Release of it in the OnCompleted or OnError part of the observer. Without this the using statement would immediately dispose our subscription.

Friday, December 30, 2011

How to use ToAsync, FromAsyncPattern and Start in Reactive Extensions

This example demonstrates how to use the ToAsync and FromAsyncPattern extension methods from RX. First we create an observer and assign the GenerateNumbers function to a generic Func delegate. Then we use its BeginInvoke and EndInvoke functions as input parameters for the FromAsyncPattern method. It will produce you another Func delegate and call it with an input parameter of 7. The result from this function is an observable you can subscribe to.
var observer = Observer.Create<IEnumerable<int>>(
    x => Console.WriteLine(
    "OnNext: {0}", x.Sum()),
    ex => Console.WriteLine("OnError: {0}", ex.Message),
    () => Console.WriteLine("OnCompleted")
);

Func<int, IEnumerable<int>> func = GenerateNumbers;

var source = Observable.FromAsyncPattern<int, IEnumerable<int>>(func.BeginInvoke, func.EndInvoke);

using(source(7).Subscribe(observer))
{
    Console.ReadLine();
}
Here is the implementation of the GenerateNumbers function which produces a sequence of numbers up to the defined number of values:
public IEnumerable<int> GetNumbers(int count)
{
    int i = 1;
    while (i <= count)
    {
        yield return i++;
    }
}
Our observer sums the sequence and prints the result:
==========
OnNext: 28
OnCompleted
==========
The classical way how to achieve the same is:
Func<int, IEnumerable<int>> func = GenerateNumbers;
    
var asyncResult = func.BeginInvoke(7, callback => {
    IEnumerable<int> result = func.EndInvoke(callback);
    Console.WriteLine("Sum of numbers is: {0}", result.Sum());
}, null);
==========
Sum of numbers is: 28
==========
You can also convert a function to an asynchronous function with the ToAsync method. The difference is that the result of the EndInvoke is an observable you can subscribe to:
Func<int, IObservable<IEnumerable<int>>> asyncFunc = func.ToAsync();

var asyncResult2 = asyncFunc.BeginInvoke(7, callback => {
    IObservable<IEnumerable<int>> result = asyncFunc.EndInvoke(callback);
    var disposable = result.Subscribe(value => Console.WriteLine(value.Sum()));
}, null);
==========
28
==========
Finally you can invoke the function asynchronously with the use of Start method:
Observable.Start(() => GetNumbers(4))
          .Subscribe(observer);
==========
OnNext: 10

OnCompleted
==========

Thursday, December 29, 2011

How to create your own Buffer for Reactive Extensions

This post is intended for learning purposes. You can find a more complex solution in the RX library with lot of overloaded versions of the Buffer method.
public static class ObservableExt
{
    public static IObservable<IList<T>> Buffer2<T>
    (
        this IObservable<T> source, 
        int count
    )
    {
        return Observable.Create<IList<T>>(observer => {
            IList<T> list = new List<T>();
            Action sendAndClearList = () => {
                if (list.Count != 0)
                {
                    observer.OnNext(list);
                    list.Clear();
                }
            };
            int i = 0;
            return source.Subscribe(
                value => {
                    list.Add(value);
                    i++;
                    if (i % count == 0)
                    {
                        sendAndClearList();
                    }
                },
                error => {
                    observer.OnError(error);
                },
                () => {
                    sendAndClearList();
                    observer.OnCompleted();
                }
            );
        });
    }
}
Now you can subscribe to it. This will create an observable with buffers based on the count parameter. Every buffer will contain up to 3 elements in our case.
// you can use Console.WriteLine instead of Dump
var source = Observable.Repeat(1, 4)
.Buffer(3)
.Subscribe(
    v => v.Dump(),
    e => e.Message.Dump(),
    () => "Completed".Dump()
);
This will print the following output into the LINQPad's results window:
==========
List (3 items) 
1
1
1
List (1 item) 
1
Completed
==========