Monday, December 26, 2011

How to convert a TPL Task to an IObservable in Reactive Extensions.

In this post we will explore how to create a TPL Task running in a new thread and how to convert it to an observable. First start with a simple Task. The code snippet bellow runs a new process in the background and after simulated 3 seconds returns a value.
Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    return 42;
});
    
task.Start();
    
Console.WriteLine(task.Result);
==========
42
==========
The next example simulates an error caused by cancellation of the background process. This is accomplished with the CancellationTokenSource's Cancel method after one second:
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

Task<int> task = Task.Factory.StartNew<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    token.ThrowIfCancellationRequested();
    return 42;
}, token);

Thread.Sleep(1000);

tokenSource.Cancel();

try {
    Console.WriteLine(task.Result);
}
catch (AggregateException aggrEx)
{
    Console.WriteLine("Error: {0}", aggrEx.InnerException.GetType());
    aggrEx.Handle(ex => {
            if (ex is OperationCanceledException) {
                return true;
            }
            return false;
    });
}
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
So let's convert the first example to an observable. You can use the ToObservable extension method of the static class TaskObservableExtensions from System.Reactive.Threading.Tasks namespace.
Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    return 42;
});
    
IObservable<int> source = task.ToObservable();

source.Subscribe(Console.WriteLine);

task.Start();
==========
42
==========
Now cancel the operation to receive an OnError notification.
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;

Task<int> task = new Task<int>(() => {
    //simulate a long running process
    Thread.Sleep(3000);
    token.ThrowIfCancellationRequested();
    return 42;
}, token);
    
IObservable<int> source = task.ToObservable();

IDisposable disposable = source.Subscribe(
    Console.WriteLine,
    error => Console.WriteLine("Error: {0}", error.GetType()),
    () => "Completed".Dump()
);

task.Start();

Thread.Sleep(1000);

tokenSource.Cancel();

try {
    Task.WaitAll(task);
}
catch (AggregateException aggrEx)
{
    aggrEx.Handle(ex => {
            if (ex is OperationCanceledException) {
                return true;
            }
            return false;
    });
}

disposable.Dispose();
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
You can also convert an observable to a task with the ToTask method:
var task = Observable.Return(2)
                     .Do(_ => Thread.Sleep(2000))
                     .ToTask();
task.Result.Dump();
 ==========
2
==========

No comments:

Post a Comment