Friday, December 30, 2011

How to use ToAsync, FromAsyncPattern and Start in Reactive Extensions

This example demonstrates how to use the ToAsync and FromAsyncPattern extension methods from RX. First we create an observer and assign the GenerateNumbers function to a generic Func delegate. Then we use its BeginInvoke and EndInvoke functions as input parameters for the FromAsyncPattern method. It will produce you another Func delegate and call it with an input parameter of 7. The result from this function is an observable you can subscribe to.
var observer = Observer.Create<IEnumerable<int>>(
    x => Console.WriteLine(
    "OnNext: {0}", x.Sum()),
    ex => Console.WriteLine("OnError: {0}", ex.Message),
    () => Console.WriteLine("OnCompleted")
);

Func<int, IEnumerable<int>> func = GenerateNumbers;

var source = Observable.FromAsyncPattern<int, IEnumerable<int>>(func.BeginInvoke, func.EndInvoke);

using(source(7).Subscribe(observer))
{
    Console.ReadLine();
}
Here is the implementation of the GenerateNumbers function which produces a sequence of numbers up to the defined number of values:
public IEnumerable<int> GetNumbers(int count)
{
    int i = 1;
    while (i <= count)
    {
        yield return i++;
    }
}
Our observer sums the sequence and prints the result:
==========
OnNext: 28
OnCompleted
==========
The classical way how to achieve the same is:
Func<int, IEnumerable<int>> func = GenerateNumbers;
    
var asyncResult = func.BeginInvoke(7, callback => {
    IEnumerable<int> result = func.EndInvoke(callback);
    Console.WriteLine("Sum of numbers is: {0}", result.Sum());
}, null);
==========
Sum of numbers is: 28
==========
You can also convert a function to an asynchronous function with the ToAsync method. The difference is that the result of the EndInvoke is an observable you can subscribe to:
Func<int, IObservable<IEnumerable<int>>> asyncFunc = func.ToAsync();

var asyncResult2 = asyncFunc.BeginInvoke(7, callback => {
    IObservable<IEnumerable<int>> result = asyncFunc.EndInvoke(callback);
    var disposable = result.Subscribe(value => Console.WriteLine(value.Sum()));
}, null);
==========
28
==========
Finally you can invoke the function asynchronously with the use of Start method:
Observable.Start(() => GetNumbers(4))
          .Subscribe(observer);
==========
OnNext: 10

OnCompleted
==========

No comments:

Post a Comment