Saturday, December 31, 2011

How to subscribe to the observable which reacts first with Amb in Reactive Extensions.

In this example we will create two observables. The first observable source1 is set to start produce values after a due time set to 1 seconds. The second one observable called source2 starts producing 3 values every second immediately after the subscription without a due time. Due to the fact that we are going to use the Amb extension method which propagates the observable sequence that reacts first, the first observable will actually never be run. This is shown with the help of the Do method. It's purpose is to invoke an action for each element in the sequence. In our case it writes a message to the console.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<Timestamped<long>>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

TimeSpan oneSecond = TimeSpan.FromSeconds(1);

var source1 = Observable.Timer(oneSecond, oneSecond)
            .Do(_ => Console.WriteLine("Value from first observable:"))
            .Timestamp().Take(5);
            
var source2 = Observable.Timer(TimeSpan.FromSeconds(0), oneSecond)
            .Do(_ => Console.WriteLine("Value from second observable:"))
            .Timestamp().Take(3);

using(source1.Amb(source2).Subscribe(observer))
{
    semaphore.Wait();
}
==========
Value from second observable:
0@27.12.2011 17:07:42 +01:00
Value from second observable:
1@27.12.2011 17:07:43 +01:00
Value from second observable:
2@27.12.2011 17:07:44 +01:00
Completed
==========
Another interesting aspect of this example I wanted to show you is the use of the SemaphoreSlim which enables us to Wait for a Release of it in the OnCompleted or OnError part of the observer. Without this the using statement would immediately dispose our subscription.

Friday, December 30, 2011

How to use ToAsync, FromAsyncPattern and Start in Reactive Extensions

This example demonstrates how to use the ToAsync and FromAsyncPattern extension methods from RX. First we create an observer and assign the GenerateNumbers function to a generic Func delegate. Then we use its BeginInvoke and EndInvoke functions as input parameters for the FromAsyncPattern method. It will produce you another Func delegate and call it with an input parameter of 7. The result from this function is an observable you can subscribe to.
var observer = Observer.Create<IEnumerable<int>>(
    x => Console.WriteLine(
    "OnNext: {0}", x.Sum()),
    ex => Console.WriteLine("OnError: {0}", ex.Message),
    () => Console.WriteLine("OnCompleted")
);

Func<int, IEnumerable<int>> func = GenerateNumbers;

var source = Observable.FromAsyncPattern<int, IEnumerable<int>>(func.BeginInvoke, func.EndInvoke);

using(source(7).Subscribe(observer))
{
    Console.ReadLine();
}
Here is the implementation of the GenerateNumbers function which produces a sequence of numbers up to the defined number of values:
public IEnumerable<int> GetNumbers(int count)
{
    int i = 1;
    while (i <= count)
    {
        yield return i++;
    }
}
Our observer sums the sequence and prints the result:
==========
OnNext: 28
OnCompleted
==========
The classical way how to achieve the same is:
Func<int, IEnumerable<int>> func = GenerateNumbers;
    
var asyncResult = func.BeginInvoke(7, callback => {
    IEnumerable<int> result = func.EndInvoke(callback);
    Console.WriteLine("Sum of numbers is: {0}", result.Sum());
}, null);
==========
Sum of numbers is: 28
==========
You can also convert a function to an asynchronous function with the ToAsync method. The difference is that the result of the EndInvoke is an observable you can subscribe to:
Func<int, IObservable<IEnumerable<int>>> asyncFunc = func.ToAsync();

var asyncResult2 = asyncFunc.BeginInvoke(7, callback => {
    IObservable<IEnumerable<int>> result = asyncFunc.EndInvoke(callback);
    var disposable = result.Subscribe(value => Console.WriteLine(value.Sum()));
}, null);
==========
28
==========
Finally you can invoke the function asynchronously with the use of Start method:
Observable.Start(() => GetNumbers(4))
          .Subscribe(observer);
==========
OnNext: 10

OnCompleted
==========

Thursday, December 29, 2011

How to create your own Buffer for Reactive Extensions

This post is intended for learning purposes. You can find a more complex solution in the RX library with lot of overloaded versions of the Buffer method.
public static class ObservableExt
{
    public static IObservable<IList<T>> Buffer2<T>
    (
        this IObservable<T> source, 
        int count
    )
    {
        return Observable.Create<IList<T>>(observer => {
            IList<T> list = new List<T>();
            Action sendAndClearList = () => {
                if (list.Count != 0)
                {
                    observer.OnNext(list);
                    list.Clear();
                }
            };
            int i = 0;
            return source.Subscribe(
                value => {
                    list.Add(value);
                    i++;
                    if (i % count == 0)
                    {
                        sendAndClearList();
                    }
                },
                error => {
                    observer.OnError(error);
                },
                () => {
                    sendAndClearList();
                    observer.OnCompleted();
                }
            );
        });
    }
}
Now you can subscribe to it. This will create an observable with buffers based on the count parameter. Every buffer will contain up to 3 elements in our case.
// you can use Console.WriteLine instead of Dump
var source = Observable.Repeat(1, 4)
.Buffer(3)
.Subscribe(
    v => v.Dump(),
    e => e.Message.Dump(),
    () => "Completed".Dump()
);
This will print the following output into the LINQPad's results window:
==========
List (3 items) 
1
1
1
List (1 item) 
1
Completed
==========

Wednesday, December 28, 2011

When And Then in Reactive Extensions (RX).

The And method matches when both observables have an available value. The Then method projects a value when all observables have an available value. Finally the When joins together the results from several patterns. The example contains an input observable which is created from the Generate method. You should achieve the same result with the Interval method instead as you can see with the second observable, but I wanted to show you how you can create a sequence generator with a time selector as a last argument. The results from the first observable are Timestamped so we can see when was the value generated. The results from both observables are joined together every 2 seconds and sent to the subscriber.
var input = Observable
    .Generate(0, _ => true, x => ++x, x => x, _ => TimeSpan.FromSeconds(2))
    .Timestamp()
    .Take(5); 

var pattern = input.And(Observable.Interval(TimeSpan.FromSeconds(1)));
            
var source = 
Observable.When(
          pattern
          .Then((left, right) => String.Format("{0} - {1}", left, right))
          );    
          
var observer = Observer.Create<string>(
    x => Console.WriteLine("OnNext: {0}", x),
    ex => Console.WriteLine("OnError: {0}", ex.Message),
    () => Console.WriteLine("OnCompleted")
);    

source.Subscribe(observer);
==========
OnNext: 0@22.12.2011 20:16:37 +01:00 - 0
OnNext: 1@22.12.2011 20:16:39 +01:00 - 1
OnNext: 2@22.12.2011 20:16:41 +01:00 - 2
OnNext: 3@22.12.2011 20:16:43 +01:00 - 3
OnNext: 4@22.12.2011 20:16:45 +01:00 - 4
OnCompleted
==========

How to create a parallel observer of two observables with the help of the visitor design pattern in RX.

This example will be a little bit more complex, but very dynamic solution for observing and influencing two or more observables with the help of a visitor. First we create an abstract class Direction and its two implementations LeftDirection and RightDirection. These two classes will enable us to decide from which observable came the notification.
public abstract class Direction<T>
{
    public T Value { get; set; }
    
    public static Direction<T> FromLeft(T value)
    {
        return new LeftDirection<T>(value);
    }
    
    public static Direction<T> FromRight(T value)
    {
        return new RightDirection<T>(value);
    }
}

public class LeftDirection<T> : Direction<T>
{
    public LeftDirection(T value)
    {
        this.Value = value;
    }
}

public class RightDirection<T> : Direction<T>
{
    public RightDirection(T value)
    {
        this.Value = value;
    }
}

Next code snippet will enable you to inherit a new class from the abstract DirectionVisitor class and override its VisitDirection methods in order to provide a different functionality for both observables.
public abstract class DirectionVisitor<T>
{
    public Action<Notification<T>, IObserver<T>> Visit(Direction<Notification<T>> direction)
    {
        return VisitDirection(direction as dynamic);
    }
    
    protected abstract Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction);
    
    protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
    {
        return VisitDirection(left as Direction<Notification<T>>);
    }
    
    protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
    {
        return VisitDirection(right as Direction<Notification<T>>);
    }
    
}
The basic idea of the following implementation of the DirectionVisitor is to call the observer's OnCompleted only in case that both observables has already produced an OnCompleted notification. You could influence the generation of the notifications from both observables in a lot of different ways. One observable can also impact the other one.
public class ConvertDirectionToAction<T>: DirectionVisitor<T>
{
    private bool isLeftCompleted;
    private bool isRightCompleted;
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction)
    {
        Action<Notification<T>, IObserver<T>> action = (notif, observer) => {
            NotificationKind kind = notif.Kind;
            switch (notif.Kind)
            {
                case NotificationKind.OnNext: 
                    observer.OnNext(notif.Value);
                    break;
                case NotificationKind.OnError: 
                    observer.OnError(notif.Exception);
                    break;
                case NotificationKind.OnCompleted: 
                    if (this.isLeftCompleted && this.isRightCompleted)
                    {
                        observer.OnCompleted();
                    }
                    break;                    
            }    
        };
        return action;
    }
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
    {
        NotificationKind kind = left.Value.Kind;
        if (kind == NotificationKind.OnCompleted || 
            kind == NotificationKind.OnError)
        {
            this.isLeftCompleted = true;
        }
        
        return VisitDirection(left as Direction<Notification<T>>);
    }
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
    {
        NotificationKind kind = right.Value.Kind;
        if (kind == NotificationKind.OnCompleted || 
            kind == NotificationKind.OnError)
        {
            this.isRightCompleted = true;
        }
        
        return VisitDirection(right as Direction<Notification<T>>);
    }    
}
Next we will need an implementation of IObserver which will be able to observe two observables and run a different action for a notification based on which one produced it. You can inject your own visitor into the Visitor property and change how the ParallelObserver will observe them.
public class ParallelObserver<T> : IObserver<Direction<Notification<T>>>
{
    IObserver<T> observer;
    
    public ParallelObserver(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        this.observer = observer;
    }
    
    public void OnNext(Direction<Notification<T>> value)
    {
        var action = this.Visitor.Visit(value);
        
        action(value.Value, this.observer);
    }
    
    void IObserver<Direction<Notification<T>>>.OnError(Exception error)
    {
    }
    
    void IObserver<Direction<Notification<T>>>.OnCompleted()
    {
    }
    
    private DirectionVisitor<T> visitor;

    public DirectionVisitor<T> Visitor 
    {
        get 
        { 
            return this.visitor ?? 
                   (this.visitor = new ConvertDirectionToAction<T>()); 
        }
        set 
        { 
            this.visitor = value; 
        }    
    }
}
Now you need an extension method which runs both observables in parallel and calls the OnCompleted only when both observables has already finished the work.
public static class ObservableExtensions
{
    public static IObservable<T> Run<T>(this IObservable<T> left, IObservable<T> right)
    {
        return Observable.Create<T>(observer => {
            var parallelObserver = new ParallelObserver<T>(observer);
            
            var disposable1 = left.Materialize()
                         .Select(v => Direction<Notification<T>>.FromLeft(v))
                         .Synchronize()
                         .Subscribe(parallelObserver);
                         
            var disposable2 = right.Materialize()
                          .Select(v => Direction<Notification<T>>.FromRight(v))
                          .Synchronize()
                          .Subscribe(parallelObserver);
                          
            return new CompositeDisposable(2) { disposable1, disposable2 };
        });
    }
}
Try it and test it as a homework.
var source1 = Observable.Repeat(4, 3);
var source2 = Observable.Repeat(1, 3);
    
var input = source1.Run(source2);

input.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Done"));
==========
4
1
4
1
4
1
Done
==========
Without this extension the observer wouldn't receive the same notifications.
var observer = Observer.Create<int>(
    Console.WriteLine, 
    Console.WriteLine, 
    () => Console.WriteLine("Done")
);

var source1 = Observable.Repeat(4, 3).Subscribe(observer);

var source2 = Observable.Repeat(1, 3).Subscribe(observer);
=========
4
4
4
Done
=========

Tuesday, December 27, 2011

How to use FromEventPattern in Reactive Extensions (RX).

This example is about how to convert .NET events into an observable collection on which you can then done standard LINQ operations. The example demonstrates how to get removal notifications from an observable collection in a more declarative manner.
var strings = new System.Collections.ObjectModel.ObservableCollection<string>()
{
    "Item1", "Item2", "Item3"
};

var removals = 
Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>
(
    handler => strings.CollectionChanged += handler,
    handler => strings.CollectionChanged -= handler
)
.Where(e => e.EventArgs.Action == NotifyCollectionChangedAction.Remove)
.SelectMany(c => c.EventArgs.OldItems.Cast<string>());
    
var disposable = removals.Subscribe(GetDefaultObserver<string>("Removed"));
==========
Removed : Item1
Removed : Item2
==========
The observer was generated with this helper function:
public IObserver<T> GetDefaultObserver<T>(string onNextText)
{
    return Observer.Create<T>(
        x => Console.WriteLine("{0} : {1}", onNextText, x),
        ex => Console.WriteLine("OnError: {0}", ex.Message),
        () => Console.WriteLine("OnCompleted")
    );
}
The classical way how to achieve the same with an event handler would be:
strings.CollectionChanged += (sender, ea) => {
    if(ea.Action == NotifyCollectionChangedAction.Remove)
    {
        foreach (var oldItem in ea.OldItems.Cast<string>())
        {
            Console.WriteLine("Removed {0}", oldItem);    
        }
    }
};

Monday, December 26, 2011

How to convert a TPL Task to an IObservable in Reactive Extensions.

In this post we will explore how to create a TPL Task running in a new thread and how to convert it to an observable. First start with a simple Task. The code snippet bellow runs a new process in the background and after simulated 3 seconds returns a value.
Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    return 42;
});
    
task.Start();
    
Console.WriteLine(task.Result);
==========
42
==========
The next example simulates an error caused by cancellation of the background process. This is accomplished with the CancellationTokenSource's Cancel method after one second:
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

Task<int> task = Task.Factory.StartNew<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    token.ThrowIfCancellationRequested();
    return 42;
}, token);

Thread.Sleep(1000);

tokenSource.Cancel();

try {
    Console.WriteLine(task.Result);
}
catch (AggregateException aggrEx)
{
    Console.WriteLine("Error: {0}", aggrEx.InnerException.GetType());
    aggrEx.Handle(ex => {
            if (ex is OperationCanceledException) {
                return true;
            }
            return false;
    });
}
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
So let's convert the first example to an observable. You can use the ToObservable extension method of the static class TaskObservableExtensions from System.Reactive.Threading.Tasks namespace.
Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    return 42;
});
    
IObservable<int> source = task.ToObservable();

source.Subscribe(Console.WriteLine);

task.Start();
==========
42
==========
Now cancel the operation to receive an OnError notification.
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    token.ThrowIfCancellationRequested();
    return 42;
}, token);
    
IObservable<int> source = task.ToObservable();

IDisposable disposable = source.Subscribe(
    Console.WriteLine,
    error => Console.WriteLine("Error: {0}", error.GetType()),
    () => "Completed".Dump()
);

task.Start();

Thread.Sleep(1000);

tokenSource.Cancel();

try {
    Task.WaitAll(task);
}
catch (AggregateException aggrEx)
{
    aggrEx.Handle(ex => {
            if (ex is OperationCanceledException) {
                return true;
            }
            return false;
    });
}

disposable.Dispose();
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
You can also convert an observable to a task with the ToTask method:
var task = Observable.Return(2)
                     .Do(_ => Thread.Sleep(2000))
                     .ToTask();
task.Result.Dump();
 ==========
2
==========