Sunday, January 1, 2012

Combining observable sequences with Concat, Merge, Catch, OnErrorResumeNext and CombineLatest in Reactive Extensions (RX).

Concat concatenates observable sequences in the order you have provided the observables to it:

var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Concat(source2)
       .Subscribe(Console.WriteLine);
So it propagates first all values from the source1 and then from the source2 observable sequence:
==========
1
1
1
2
2
2
==========
An error calls the OnError action and ends the subscription:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Concat(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
==========
1
1
1
An error has occurred.
==========
On the other hand the Merge extension method takes periodically a value from each provided observable sequence:
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Merge(source2)
       .Subscribe(Console.WriteLine);
==========
1
2
1
2
1
2
==========
An error again calls the OnError action and ends the subscription:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Merge(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
==========
1
2
1
2
1
An error has occurred.
==========
Catch propagates the first sequence if everything went OK and ignores the second one, otherwise propagates also the values from the second observable and values from the first one up to the time an error occurred.
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.Catch(source2)
       .Subscribe(Console.WriteLine);    
==========
1
1
1
==========
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.Catch(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
OnError is not called in this case:
==========
1
1
1
2
2
2
==========
OnErrorResumeNext produces values from both sequences if an error hasn't occurred like Concat:
var source1 = Observable.Repeat(1, 3);

var source2 = Observable.Repeat(2, 3);

source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine);    
==========
1
1
1
2
2
2
==========
but in case something went wrong with the first one it will stop producing values from it and concatenates values from the second one to it:
var source1 = Observable.Repeat(1, 3)
    .Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
    .Concat(Observable.Repeat(1, 3));

var source2 = Observable.Repeat(2, 3);

source1.OnErrorResumeNext(source2)
       .Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
OnError is not called in this case:
==========
1
1
1
2
2
2
==========
Finally CombineLatest returns an observable sequence containing the result of combining elements of both sources using the specified result selector function.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<string>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

var source1 = Observable.Interval(TimeSpan.FromSeconds(1))
                        .Timestamp()
                        .Take(3);

var source2 = Observable.Interval(TimeSpan.FromSeconds(3))
                        .Timestamp()
                        .Take(3);

var input = source1.CombineLatest(source2, (x, y) => { 
    return string.Format("{0} - {1}", x.ToString(), y.ToString());
});

using(input.Subscribe(observer))
{
    semaphore.Wait();
}
==========
1@27.12.2011 18:26:47 +01:00 - 0@27.12.2011 18:26:48 +01:00
2@27.12.2011 18:26:48 +01:00 - 0@27.12.2011 18:26:48 +01:00
2@27.12.2011 18:26:48 +01:00 - 1@27.12.2011 18:26:51 +01:00
2@27.12.2011 18:26:48 +01:00 - 2@27.12.2011 18:26:54 +01:00
Completed
==========

No comments:

Post a Comment