Thursday, December 29, 2011

How to create your own Buffer for Reactive Extensions

This post is intended for learning purposes. You can find a more complex solution in the RX library with lot of overloaded versions of the Buffer method.
public static class ObservableExt
{
    public static IObservable<IList<T>> Buffer2<T>
    (
        this IObservable<T> source, 
        int count
    )
    {
        return Observable.Create<IList<T>>(observer => {
            IList<T> list = new List<T>();
            Action sendAndClearList = () => {
                if (list.Count != 0)
                {
                    observer.OnNext(list);
                    list.Clear();
                }
            };
            int i = 0;
            return source.Subscribe(
                value => {
                    list.Add(value);
                    i++;
                    if (i % count == 0)
                    {
                        sendAndClearList();
                    }
                },
                error => {
                    observer.OnError(error);
                },
                () => {
                    sendAndClearList();
                    observer.OnCompleted();
                }
            );
        });
    }
}
Now you can subscribe to it. This will create an observable with buffers based on the count parameter. Every buffer will contain up to 3 elements in our case.
// you can use Console.WriteLine instead of Dump
var source = Observable.Repeat(1, 4)
.Buffer(3)
.Subscribe(
    v => v.Dump(),
    e => e.Message.Dump(),
    () => "Completed".Dump()
);
This will print the following output into the LINQPad's results window:
==========
List (3 items) 
1
1
1
List (1 item) 
1
Completed
==========

No comments:

Post a Comment