Saturday, December 31, 2011

How to subscribe to the observable which reacts first with Amb in Reactive Extensions.

In this example we will create two observables. The first observable source1 is set to start produce values after a due time set to 1 seconds. The second one observable called source2 starts producing 3 values every second immediately after the subscription without a due time. Due to the fact that we are going to use the Amb extension method which propagates the observable sequence that reacts first, the first observable will actually never be run. This is shown with the help of the Do method. It's purpose is to invoke an action for each element in the sequence. In our case it writes a message to the console.
var semaphore = new SemaphoreSlim(0);

var observer = Observer.Create<Timestamped<long>>(
    value => Console.WriteLine(value.ToString()),
    error => { 
        Console.WriteLine(error.Message); 
        semaphore.Release(); 
    },
    () => { 
        Console.WriteLine("Completed"); 
        semaphore.Release(); 
    }
);    

TimeSpan oneSecond = TimeSpan.FromSeconds(1);

var source1 = Observable.Timer(oneSecond, oneSecond)
            .Do(_ => Console.WriteLine("Value from first observable:"))
            .Timestamp().Take(5);
            
var source2 = Observable.Timer(TimeSpan.FromSeconds(0), oneSecond)
            .Do(_ => Console.WriteLine("Value from second observable:"))
            .Timestamp().Take(3);

using(source1.Amb(source2).Subscribe(observer))
{
    semaphore.Wait();
}
==========
Value from second observable:
0@27.12.2011 17:07:42 +01:00
Value from second observable:
1@27.12.2011 17:07:43 +01:00
Value from second observable:
2@27.12.2011 17:07:44 +01:00
Completed
==========
Another interesting aspect of this example I wanted to show you is the use of the SemaphoreSlim which enables us to Wait for a Release of it in the OnCompleted or OnError part of the observer. Without this the using statement would immediately dispose our subscription.

No comments:

Post a Comment