This post is intended for learning purposes. You can find a more complex solution in the RX library with lot of overloaded versions of the Buffer method.
public static class ObservableExt
{
public static IObservable<IList<T>> Buffer2<T>
(
this IObservable<T> source,
int count
)
{
return Observable.Create<IList<T>>(observer => {
IList<T> list = new List<T>();
Action sendAndClearList = () => {
if (list.Count != 0)
{
observer.OnNext(list);
list.Clear();
}
};
int i = 0;
return source.Subscribe(
value => {
list.Add(value);
i++;
if (i % count == 0)
{
sendAndClearList();
}
},
error => {
observer.OnError(error);
},
() => {
sendAndClearList();
observer.OnCompleted();
}
);
});
}
}
Now you can subscribe to it. This will create an observable with buffers based on the count parameter. Every buffer will contain up to 3 elements in our case.
// you can use Console.WriteLine instead of Dump
var source = Observable.Repeat(1, 4)
.Buffer(3)
.Subscribe(
v => v.Dump(),
e => e.Message.Dump(),
() => "Completed".Dump()
);
This will print the following output into the LINQPad's results window:
==========
List (3 items)
1
1
1
1
1
1
List (1 item)
1
1
Completed
==========
No comments:
Post a Comment