Sunday, December 25, 2011

How to create your own Window observable extension.

In this post I'm going to show you how to create a simple implementation of the Window extension method that can be found in the Reactive Extensions library. This method produces a new observable for each window defined by the count parameter.
public static class ObservableExt
{
    public static IObservable<IObservable<T>> Window<T>
    (
        this IObservable<T> source, 
        int count
    )
    {
        return Observable.Create<IObservable<T>>(observer => {
            int i = 0;
            ISubject<T> subject = new Subject<T>();
            observer.OnNext(subject);
            
            return source.Subscribe(value => {
                subject.OnNext(value);
                if((++i % count) == 0)
                {
                    subject.OnCompleted();
                    subject = new Subject<T>();
                    observer.OnNext(subject);
                }
                }, error => {
                    subject.OnError(error);
                    observer.OnError(error);
                }, () => {
                    subject.OnCompleted();
                    observer.OnCompleted();
                });
        });
    }
}    
Now you can use it as other extensions. The subscription part is more complex as usually, but the only difference is that in the OnNext action of the outer observer you have to subscribe to its input parameter which is of type IObservable:
Observable.Range(1, 10)
.Window(3)
.Subscribe(observable => {    
            observable.Subscribe(
                Console.WriteLine,
                error2 => error2.Message.Dump(),
                () => "Completed2".Dump()
                );
            },
            error => error.Message.Dump(),
            () => "Completed".Dump());
=============
1
2
3
Completed2
4
5
6
Completed2
7
8
9
Completed2
10
Completed2
Completed
=============

No comments:

Post a Comment