Tuesday, January 3, 2012

How to recursively call IScheduler.Schedule in Reactive Extensions.

The following example demonstrates how to recursively call the schedulers Schedule method in order to call itself. The result of this is a "lazy infinite loop":
public static class ObservableExt
{    

    public static IObservable<int> Natural()
    {
        return Natural(Scheduler.CurrentThread);
    }
    
    public static IObservable<int> Natural(IScheduler scheduler)
    {
        if (scheduler == null)
        {
            throw new ArgumentNullException("scheduler");
        }
        int i = 0;
            
        return Observable.Create<int>(observer => {
            return scheduler.Schedule(action => {
                observer.OnNext(i++);
                action();
            });
        });
    }
}
Now we can call the Natural extension method and chain it with other extension methods from RX library. This example uses Take to stop the production of infinite number of values after 5 items.
IObservable<int> naturals = ObservableExt.Natural()
                                         .Take(3);

naturals.Subscribe(
    value => value.Dump(),
    error => Console.WriteLine(error.Message),
    () => Console.WriteLine("Completed")
);    
==========
0
1
2
Completed
==========

No comments:

Post a Comment