In this post we will explore how to create a TPL Task running in a new thread and how to convert it to an observable. First start with a simple Task. The code snippet bellow runs a new process in the background and after simulated 3 seconds returns a value.
Task<int> task = new Task<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
return 42;
});
task.Start();
Console.WriteLine(task.Result);
==========
42
==========
The next example simulates an error caused by cancellation of the background process. This is accomplished with the CancellationTokenSource's Cancel method after one second:
42
==========
The next example simulates an error caused by cancellation of the background process. This is accomplished with the CancellationTokenSource's Cancel method after one second:
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task<int> task = Task.Factory.StartNew<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
token.ThrowIfCancellationRequested();
return 42;
}, token);
Thread.Sleep(1000);
tokenSource.Cancel();
try {
Console.WriteLine(task.Result);
}
catch (AggregateException aggrEx)
{
Console.WriteLine("Error: {0}", aggrEx.InnerException.GetType());
aggrEx.Handle(ex => {
if (ex is OperationCanceledException) {
return true;
}
return false;
});
}
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
So let's convert the first example to an observable. You can use the ToObservable extension method of the static class TaskObservableExtensions from System.Reactive.Threading.Tasks namespace.
Task<int> task = new Task<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
return 42;
});
IObservable<int> source = task.ToObservable();
source.Subscribe(Console.WriteLine);
task.Start();
==========
42
==========
Now cancel the operation to receive an OnError notification.
var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task<int> task = new Task<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
token.ThrowIfCancellationRequested();
return 42;
}, token);
IObservable<int> source = task.ToObservable();
IDisposable disposable = source.Subscribe(
Console.WriteLine,
error => Console.WriteLine("Error: {0}", error.GetType()),
() => "Completed".Dump()
);
task.Start();
Thread.Sleep(1000);
tokenSource.Cancel();
try {
Task.WaitAll(task);
}
catch (AggregateException aggrEx)
{
aggrEx.Handle(ex => {
if (ex is OperationCanceledException) {
return true;
}
return false;
});
}
disposable.Dispose();
==========
Error: System.Threading.Tasks.TaskCanceledException
==========
You can also convert an observable to a task with the ToTask method:
var task = Observable.Return(2)
.Do(_ => Thread.Sleep(2000))
.ToTask();
task.Result.Dump();
==========
2
==========
No comments:
Post a Comment