tag:blogger.com,1999:blog-29389054458441486732024-03-05T19:44:24.875-08:00SynchronicityProgramming with Reactive Extensions and Linq in .NET Framework with C#.Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.comBlogger22125tag:blogger.com,1999:blog-2938905445844148673.post-67590454314502789542012-01-05T02:22:00.000-08:002012-01-05T02:22:00.348-08:00How to use SelectMany in Reactive Extensions (RX).<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;">You can use the <strong>SelectMany</strong> extension method in several different ways. The first example shows you that the SelectMany can be used to flatten the result of the Window method from IObservable<IObservable<T>> to IObservable<T>.</div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Repeat(1, 3)
.Window(2)
.SelectMany(c => c);
source.Subscribe(
value => value.Dump(),
error => Console.WriteLine(error.Message),
() => Console.WriteLine("Completed")
);
</code></pre>==========<br />
<em>1</em><br />
<em>1</em><br />
<em>1</em><br />
<em>Completed</em><br />
========== <br />
Or you can produce a collection of values for each produced value: </div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Range(1, 3)
.SelectMany(c => Observable.Repeat(c,3));
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Completed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">========== </div><div dir="ltr" style="text-align: justify;" trbidi="on">Instead of another observable you can use an enumerable and project the resulting values with result selector:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Range(1, 3)
.SelectMany(c => Enumerable.Repeat(c,3), (x, y) => x*y);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">========== </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>4</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>4</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>4</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>9</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>9</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>9</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Completed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">========== </div><div dir="ltr" style="text-align: justify;" trbidi="on">Finally you could produce a new observable for each action - OnNext, OnError and OnCompleted:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Range(1, 3)
.SelectMany(value => Observable.Return(1),
error => Observable.Return(-1),
() => Observable.Return(0));
</code></pre></div><div style="text-align: justify;">========== </div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>0</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">========== </div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com1tag:blogger.com,1999:blog-2938905445844148673.post-38840468586558282532012-01-03T01:47:00.000-08:002012-01-03T01:47:00.333-08:00How to recursively call IScheduler.Schedule in Reactive Extensions.<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on">The following example demonstrates how to recursively call the schedulers <strong>Schedule</strong> method in order to call itself. The result of this is a "lazy infinite loop":</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public static class ObservableExt
{
public static IObservable<int> Natural()
{
return Natural(Scheduler.CurrentThread);
}
public static IObservable<int> Natural(IScheduler scheduler)
{
if (scheduler == null)
{
throw new ArgumentNullException("scheduler");
}
int i = 0;
return Observable.Create<int>(observer => {
return scheduler.Schedule(action => {
observer.OnNext(i++);
action();
});
});
}
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">Now we can call the Natural extension method and chain it with other extension methods from RX library. This example uses Take to stop the production of infinite number of values after 5 items.</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>IObservable<int> naturals = ObservableExt.Natural()
.Take(3);
naturals.Subscribe(
value => value.Dump(),
error => Console.WriteLine(error.Message),
() => Console.WriteLine("Completed")
);
</code></pre></div><div style="text-align: justify;">==========</div><div style="text-align: justify;"><em>0</em></div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>2</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">==========</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-42321325432013108672012-01-02T03:13:00.000-08:002012-01-02T03:13:00.803-08:00The difference between OfType and Cast in Reactive Extensions (RX).<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><strong>OfType</strong> tries to convert an object to the defined type or ignores it if the value is not of the defined type.</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public class Person
{
public string Name { get; set; }
}
</code></pre></div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>object value = new Person { Name = "Kinga" };
var source = Observable.Return(new object())
.Concat(Observable.Return(value))
.OfType<Person>()
.Select(p => p.Name);
</code></pre></div><div style="text-align: justify;">==========</div><div style="text-align: justify;"><em>OnNext : Kinga</em></div><div style="text-align: justify;"><em>OnCompleted</em></div><div style="text-align: justify;">==========</div><div style="text-align: justify;">The <strong>Cast</strong> method propagates an OnError if the value is not convertable to the defined type and stops the subscription</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>object value = new Person { Name = "Kinga" };
var source = Observable.Return(new object())
.Concat(Observable.Return(value))
.Cast<Person>()
.Select(p => p.Name);
</code></pre></div><div style="text-align: justify;">==========</div><div style="text-align: justify;"><em>OnError : Unable to cast object of type 'System.Object' to type 'Person'.</em></div><div style="text-align: justify;">==========</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-16943744067637609652012-01-01T08:29:00.000-08:002012-01-01T08:29:00.435-08:00Combining observable sequences with Concat, Merge, Catch, OnErrorResumeNext and CombineLatest in Reactive Extensions (RX).<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;"></div><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><strong>Concat</strong> concatenates observable sequences in the order you have provided the observables to it:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><br />
</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3);
var source2 = Observable.Repeat(2, 3);
source1.Concat(source2)
.Subscribe(Console.WriteLine);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">So it propagates first all values from the source1 and then from the source2 observable sequence:</div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">An error calls the OnError action and ends the subscription:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3)
.Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
.Concat(Observable.Repeat(1, 3));
var source2 = Observable.Repeat(2, 3);
source1.Concat(source2)
.Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>An error has occurred.</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">On the other hand the <strong>Merge</strong> extension method takes periodically a value from each provided observable sequence:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3);
var source2 = Observable.Repeat(2, 3);
source1.Merge(source2)
.Subscribe(Console.WriteLine);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">An error again calls the OnError action and ends the subscription:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3)
.Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
.Concat(Observable.Repeat(1, 3));
var source2 = Observable.Repeat(2, 3);
source1.Merge(source2)
.Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>An error has occurred.</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><strong>Catch</strong> propagates the first sequence if everything went OK and ignores the second one, otherwise propagates also the values from the second observable and values from the first one up to the time an error occurred.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3);
var source2 = Observable.Repeat(2, 3);
source1.Catch(source2)
.Subscribe(Console.WriteLine);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3)
.Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
.Concat(Observable.Repeat(1, 3));
var source2 = Observable.Repeat(2, 3);
source1.Catch(source2)
.Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">OnError is not called in this case:</div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><strong>OnErrorResumeNext</strong> produces values from both sequences if an error hasn't occurred like Concat:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3);
var source2 = Observable.Repeat(2, 3);
source1.OnErrorResumeNext(source2)
.Subscribe(Console.WriteLine);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">but in case something went wrong with the first one it will stop producing values from it and concatenates values from the second one to it:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(1, 3)
.Concat(Observable.Throw<int>(new Exception("An error has occurred.")))
.Concat(Observable.Repeat(1, 3));
var source2 = Observable.Repeat(2, 3);
source1.OnErrorResumeNext(source2)
.Subscribe(Console.WriteLine, error => Console.WriteLine(error.Message));
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">OnError is not called in this case:</div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">Finally <strong>CombineLatest</strong> returns an observable sequence containing the result of combining elements of both sources using the specified result selector function.</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var semaphore = new SemaphoreSlim(0);
var observer = Observer.Create<string>(
value => Console.WriteLine(value.ToString()),
error => {
Console.WriteLine(error.Message);
semaphore.Release();
},
() => {
Console.WriteLine("Completed");
semaphore.Release();
}
);
var source1 = Observable.Interval(TimeSpan.FromSeconds(1))
.Timestamp()
.Take(3);
var source2 = Observable.Interval(TimeSpan.FromSeconds(3))
.Timestamp()
.Take(3);
var input = source1.CombineLatest(source2, (x, y) => {
return string.Format("{0} - {1}", x.ToString(), y.ToString());
});
using(input.Subscribe(observer))
{
semaphore.Wait();
}
</code></pre></div><div style="text-align: justify;">==========</div><div style="text-align: justify;"><em>1@27.12.2011 18:26:47 +01:00 - 0@27.12.2011 18:26:48 +01:00</em></div><div style="text-align: justify;"><em>2@27.12.2011 18:26:48 +01:00 - 0@27.12.2011 18:26:48 +01:00</em></div><div style="text-align: justify;"><em>2@27.12.2011 18:26:48 +01:00 - 1@27.12.2011 18:26:51 +01:00</em></div><div style="text-align: justify;"><em>2@27.12.2011 18:26:48 +01:00 - 2@27.12.2011 18:26:54 +01:00</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">==========</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-55650311382152260722011-12-31T07:54:00.000-08:002011-12-31T07:54:00.547-08:00How to subscribe to the observable which reacts first with Amb in Reactive Extensions.<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on">In this example we will create two observables. The first observable source1 is set to start produce values after a due time set to 1 seconds. The second one observable called source2 starts producing 3 values every second immediately after the subscription without a due time. Due to the fact that we are going to use the <strong>Amb</strong> extension method which propagates the observable sequence that reacts first, the first observable will actually never be run. This is shown with the help of the Do method. It's purpose is to invoke an action for each element in the sequence. In our case it writes a message to the console.</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var semaphore = new SemaphoreSlim(0);
var observer = Observer.Create<Timestamped<long>>(
value => Console.WriteLine(value.ToString()),
error => {
Console.WriteLine(error.Message);
semaphore.Release();
},
() => {
Console.WriteLine("Completed");
semaphore.Release();
}
);
TimeSpan oneSecond = TimeSpan.FromSeconds(1);
var source1 = Observable.Timer(oneSecond, oneSecond)
.Do(_ => Console.WriteLine("Value from first observable:"))
.Timestamp().Take(5);
var source2 = Observable.Timer(TimeSpan.FromSeconds(0), oneSecond)
.Do(_ => Console.WriteLine("Value from second observable:"))
.Timestamp().Take(3);
using(source1.Amb(source2).Subscribe(observer))
{
semaphore.Wait();
}
</code></pre></div><div style="text-align: justify;">==========</div><div style="text-align: justify;"><em>Value from second observable:</em></div><div style="text-align: justify;"><em>0@27.12.2011 17:07:42 +01:00</em></div><div style="text-align: justify;"><em>Value from second observable:</em></div><div style="text-align: justify;"><em>1@27.12.2011 17:07:43 +01:00</em></div><div style="text-align: justify;"><em>Value from second observable:</em></div><div style="text-align: justify;"><em>2@27.12.2011 17:07:44 +01:00</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">==========</div><div style="text-align: justify;">Another interesting aspect of this example I wanted to show you is the use of the SemaphoreSlim which enables us to <strong>Wait</strong> for a <strong>Release</strong> of it in the OnCompleted or OnError part of the observer. Without this the using statement would immediately dispose our subscription.</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-66395205508497471562011-12-30T07:17:00.000-08:002011-12-30T07:17:01.132-08:00How to use ToAsync, FromAsyncPattern and Start in Reactive Extensions<div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on">This example demonstrates how to use the <strong>ToAsync</strong> and <strong>FromAsyncPattern</strong> extension methods from RX. First we create an observer and assign the GenerateNumbers function to a generic Func delegate. Then we use its BeginInvoke and EndInvoke functions as input parameters for the FromAsyncPattern method. It will produce you another Func delegate and call it with an input parameter of 7. The result from this function is an observable you can subscribe to.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var observer = Observer.Create<IEnumerable<int>>(
x => Console.WriteLine(
"OnNext: {0}", x.Sum()),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
Func<int, IEnumerable<int>> func = GenerateNumbers;
var source = Observable.FromAsyncPattern<int, IEnumerable<int>>(func.BeginInvoke, func.EndInvoke);
using(source(7).Subscribe(observer))
{
Console.ReadLine();
}
</code></pre><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"></div></div><div dir="ltr" style="text-align: justify;" trbidi="on">Here is the implementation of the GenerateNumbers function which produces a sequence of numbers up to the defined number of values:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public IEnumerable<int> GetNumbers(int count)
{
int i = 1;
while (i <= count)
{
yield return i++;
}
}
</code></pre></div><div style="text-align: justify;">Our observer sums the sequence and prints the result:<br />
==========</div><div style="text-align: justify;"><em>OnNext: 28</em><br />
<em>OnCompleted</em><br />
==========<br />
The classical way how to achieve the same is:</div></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>Func<int, IEnumerable<int>> func = GenerateNumbers;
var asyncResult = func.BeginInvoke(7, callback => {
IEnumerable<int> result = func.EndInvoke(callback);
Console.WriteLine("Sum of numbers is: {0}", result.Sum());
}, null);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Sum of numbers is: 28</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">You can also convert a function to an asynchronous function with the ToAsync method. The difference is that the result of the EndInvoke is an observable you can subscribe to: </div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>Func<int, IObservable<IEnumerable<int>>> asyncFunc = func.ToAsync();
var asyncResult2 = asyncFunc.BeginInvoke(7, callback => {
IObservable<IEnumerable<int>> result = asyncFunc.EndInvoke(callback);
var disposable = result.Subscribe(value => Console.WriteLine(value.Sum()));
}, null);
</code></pre></div><div style="text-align: justify;">==========</div><div style="text-align: justify;"><em>28</em></div><div style="text-align: justify;">==========<br />
Finally you can invoke the function asynchronously with the use of <strong>Start</strong> method:</div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>Observable.Start(() => GetNumbers(4))
.Subscribe(observer);
</code></pre>==========<br />
<em>OnNext: 10</em><br />
<em></em><br />
<em>OnCompleted</em><br />
==========</div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-9074223953396392682011-12-29T06:39:00.000-08:002011-12-29T06:39:00.659-08:00How to create your own Buffer for Reactive Extensions<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on">This post is intended for learning purposes. You can find a more complex solution in the RX library with lot of overloaded versions of the <strong>Buffer</strong> method.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public static class ObservableExt
{
public static IObservable<IList<T>> Buffer2<T>
(
this IObservable<T> source,
int count
)
{
return Observable.Create<IList<T>>(observer => {
IList<T> list = new List<T>();
Action sendAndClearList = () => {
if (list.Count != 0)
{
observer.OnNext(list);
list.Clear();
}
};
int i = 0;
return source.Subscribe(
value => {
list.Add(value);
i++;
if (i % count == 0)
{
sendAndClearList();
}
},
error => {
observer.OnError(error);
},
() => {
sendAndClearList();
observer.OnCompleted();
}
);
});
}
}
</code></pre><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div style="text-align: justify;">Now you can subscribe to it. This will create an observable with buffers based on the count parameter. Every buffer will contain up to 3 elements in our case.</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>// you can use Console.WriteLine instead of Dump
var source = Observable.Repeat(1, 4)
.Buffer(3)
.Subscribe(
v => v.Dump(),
e => e.Message.Dump(),
() => "Completed".Dump()
);
</code></pre></div><div class="spacer" style="text-align: justify;">This will print the following output into the LINQPad's results window:</div><div class="spacer" style="text-align: justify;">==========</div><div class="spacer" style="text-align: justify;"><em>List<int32> (3 items) <br />
1 <br />
1 <br />
1 </em></div><div class="spacer" style="text-align: justify;"><em>List<int32> (1 item) <br />
1 </em></div><div class="spacer" style="text-align: justify;"><em>Completed</em></div><div class="spacer" style="text-align: justify;">==========</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-88289812224087610372011-12-28T11:13:00.000-08:002011-12-28T11:13:00.806-08:00When And Then in Reactive Extensions (RX).<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;">The <strong>And</strong> method matches when both observables have an available value. The <strong>Then</strong> method projects a value when all observables have an available value. Finally the <strong>When</strong> joins together the results from several patterns. The example contains an input observable which is created from the <strong>Generate method</strong>. You should achieve the same result with the <strong>Interval</strong> method instead as you can see with the second observable, but I wanted to show you how you can create a sequence generator with a time selector as a last argument. The results from the first observable are Timestamped so we can see when was the value generated. The results from both observables are joined together every 2 seconds and sent to the subscriber. </div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var input = Observable
.Generate(0, _ => true, x => ++x, x => x, _ => TimeSpan.FromSeconds(2))
.Timestamp()
.Take(5);
var pattern = input.And(Observable.Interval(TimeSpan.FromSeconds(1)));
var source =
Observable.When(
pattern
.Then((left, right) => String.Format("{0} - {1}", left, right))
);
var observer = Observer.Create<string>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
source.Subscribe(observer);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnNext: 0@22.12.2011 20:16:37 +01:00 - 0</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnNext: 1@22.12.2011 20:16:39 +01:00 - 1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnNext: 2@22.12.2011 20:16:41 +01:00 - 2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnNext: 3@22.12.2011 20:16:43 +01:00 - 3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnNext: 4@22.12.2011 20:16:45 +01:00 - 4</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnCompleted</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-7731572968938830022011-12-28T09:32:00.000-08:002011-12-28T09:39:37.151-08:00How to create a parallel observer of two observables with the help of the visitor design pattern in RX.<div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on">This example will be a little bit more complex, but very dynamic solution for observing and influencing two or more observables with the help of a visitor. First we create an abstract class Direction and its two implementations LeftDirection and RightDirection. These two classes will enable us to decide from which observable came the notification.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public abstract class Direction<T>
{
public T Value { get; set; }
public static Direction<T> FromLeft(T value)
{
return new LeftDirection<T>(value);
}
public static Direction<T> FromRight(T value)
{
return new RightDirection<T>(value);
}
}
public class LeftDirection<T> : Direction<T>
{
public LeftDirection(T value)
{
this.Value = value;
}
}
public class RightDirection<T> : Direction<T>
{
public RightDirection(T value)
{
this.Value = value;
}
}
<div style="text-align: justify;"></div></code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">Next code snippet will enable you to inherit a new class from the abstract DirectionVisitor class and override its VisitDirection methods in order to provide a different functionality for both observables.</div><div dir="ltr" style="text-align: left;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public abstract class DirectionVisitor<T>
{
public Action<Notification<T>, IObserver<T>> Visit(Direction<Notification<T>> direction)
{
return VisitDirection(direction as dynamic);
}
protected abstract Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction);
protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
{
return VisitDirection(left as Direction<Notification<T>>);
}
protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
{
return VisitDirection(right as Direction<Notification<T>>);
}
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The basic idea of the following implementation of the DirectionVisitor is to call the observer's OnCompleted only in case that both observables has already produced an OnCompleted notification. You could influence the generation of the notifications from both observables in a lot of different ways. One observable can also impact the other one.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public class ConvertDirectionToAction<T>: DirectionVisitor<T>
{
private bool isLeftCompleted;
private bool isRightCompleted;
protected override Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction)
{
Action<Notification<T>, IObserver<T>> action = (notif, observer) => {
NotificationKind kind = notif.Kind;
switch (notif.Kind)
{
case NotificationKind.OnNext:
observer.OnNext(notif.Value);
break;
case NotificationKind.OnError:
observer.OnError(notif.Exception);
break;
case NotificationKind.OnCompleted:
if (this.isLeftCompleted && this.isRightCompleted)
{
observer.OnCompleted();
}
break;
}
};
return action;
}
protected override Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
{
NotificationKind kind = left.Value.Kind;
if (kind == NotificationKind.OnCompleted ||
kind == NotificationKind.OnError)
{
this.isLeftCompleted = true;
}
return VisitDirection(left as Direction<Notification<T>>);
}
protected override Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
{
NotificationKind kind = right.Value.Kind;
if (kind == NotificationKind.OnCompleted ||
kind == NotificationKind.OnError)
{
this.isRightCompleted = true;
}
return VisitDirection(right as Direction<Notification<T>>);
}
}
</code></pre>Next we will need an implementation of IObserver which will be able to observe two observables and run a different action for a notification based on which one produced it. You can inject your own visitor into the Visitor property and change how the ParallelObserver will observe them.</div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public class ParallelObserver<T> : IObserver<Direction<Notification<T>>>
{
IObserver<T> observer;
public ParallelObserver(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
this.observer = observer;
}
public void OnNext(Direction<Notification<T>> value)
{
var action = this.Visitor.Visit(value);
action(value.Value, this.observer);
}
void IObserver<Direction<Notification<T>>>.OnError(Exception error)
{
}
void IObserver<Direction<Notification<T>>>.OnCompleted()
{
}
private DirectionVisitor<T> visitor;
public DirectionVisitor<T> Visitor
{
get
{
return this.visitor ??
(this.visitor = new ConvertDirectionToAction<T>());
}
set
{
this.visitor = value;
}
}
}
</code></pre>Now you need an extension method which runs both observables in parallel and calls the OnCompleted only when both observables has already finished the work.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public static class ObservableExtensions
{
public static IObservable<T> Run<T>(this IObservable<T> left, IObservable<T> right)
{
return Observable.Create<T>(observer => {
var parallelObserver = new ParallelObserver<T>(observer);
var disposable1 = left.Materialize()
.Select(v => Direction<Notification<T>>.FromLeft(v))
.Synchronize()
.Subscribe(parallelObserver);
var disposable2 = right.Materialize()
.Select(v => Direction<Notification<T>>.FromRight(v))
.Synchronize()
.Subscribe(parallelObserver);
return new CompositeDisposable(2) { disposable1, disposable2 };
});
}
}
</code></pre>Try it and test it as a homework.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source1 = Observable.Repeat(4, 3);
var source2 = Observable.Repeat(1, 3);
var input = source1.Run(source2);
input.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Done"));
</code></pre>==========<br />
<em>4</em><br />
<em>1</em><br />
<em>4</em><br />
<em>1</em><br />
<em>4</em><br />
<em>1</em><br />
<em>Done</em><br />
==========</div>Without this extension the observer wouldn't receive the same notifications.<br />
<pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var observer = Observer.Create<int>(
Console.WriteLine,
Console.WriteLine,
() => Console.WriteLine("Done")
);
var source1 = Observable.Repeat(4, 3).Subscribe(observer);
var source2 = Observable.Repeat(1, 3).Subscribe(observer);
</code></pre>=========<br />
<em>4</em><br />
<em>4</em><br />
<em>4</em><br />
<em>Done</em><br />
=========</div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-58926106504437594312011-12-27T10:11:00.000-08:002011-12-27T10:11:00.751-08:00How to use FromEventPattern in Reactive Extensions (RX).<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on">This example is about how to convert .NET events into an observable collection on which you can then done standard LINQ operations. The example demonstrates how to get removal notifications from an observable collection in a more declarative manner. </div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var strings = new System.Collections.ObjectModel.ObservableCollection<string>()
{
"Item1", "Item2", "Item3"
};
var removals =
Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>
(
handler => strings.CollectionChanged += handler,
handler => strings.CollectionChanged -= handler
)
.Where(e => e.EventArgs.Action == NotifyCollectionChangedAction.Remove)
.SelectMany(c => c.EventArgs.OldItems.Cast<string>());
var disposable = removals.Subscribe(GetDefaultObserver<string>("Removed"));
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Removed : Item1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Removed : Item2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">The observer was generated with this helper function:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public IObserver<T> GetDefaultObserver<T>(string onNextText)
{
return Observer.Create<T>(
x => Console.WriteLine("{0} : {1}", onNextText, x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted")
);
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on">The classical way how to achieve the same with an event handler would be:</div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>strings.CollectionChanged += (sender, ea) => {
if(ea.Action == NotifyCollectionChangedAction.Remove)
{
foreach (var oldItem in ea.OldItems.Cast<string>())
{
Console.WriteLine("Removed {0}", oldItem);
}
}
};
</code></pre></div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com2tag:blogger.com,1999:blog-2938905445844148673.post-33968193934259281162011-12-26T08:42:00.000-08:002011-12-22T10:34:19.242-08:00How to convert a TPL Task to an IObservable in Reactive Extensions.<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;">In this post we will explore how to create a TPL Task running in a new thread and how to convert it to an observable. First start with a simple Task. The code snippet bellow runs a new process in the background and after simulated 3 seconds returns a value.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>Task<int> task = new Task<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
return 42;
});
task.Start();
Console.WriteLine(task.Result);
</code></pre><div style="text-align: justify;">==========<br />
<em>42</em><br />
==========<br />
The next example simulates an error caused by cancellation of the background process. This is accomplished with the CancellationTokenSource's <strong>Cancel</strong> method after one second:</div></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task<int> task = Task.Factory.StartNew<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
token.ThrowIfCancellationRequested();
return 42;
}, token);
Thread.Sleep(1000);
tokenSource.Cancel();
try {
Console.WriteLine(task.Result);
}
catch (AggregateException aggrEx)
{
Console.WriteLine("Error: {0}", aggrEx.InnerException.GetType());
aggrEx.Handle(ex => {
if (ex is OperationCanceledException) {
return true;
}
return false;
});
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Error: System.Threading.Tasks.TaskCanceledException</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">========== </div><div dir="ltr" style="text-align: justify;" trbidi="on">So let's convert the first example to an observable. You can use the <strong>ToObservable</strong> extension method of the static class TaskObservableExtensions from System.Reactive.Threading.Tasks namespace. </div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>Task<int> task = new Task<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
return 42;
});
IObservable<int> source = task.ToObservable();
source.Subscribe(Console.WriteLine);
task.Start();
</code></pre></div><div style="text-align: justify;"></div></div></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>42</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">Now cancel the operation to receive an OnError notification.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var tokenSource = new CancellationTokenSource();
CancellationToken token = tokenSource.Token;
Task<int> task = new Task<int>(() => {
//simulate a long running process
Thread.Sleep(3000);
token.ThrowIfCancellationRequested();
return 42;
}, token);
IObservable<int> source = task.ToObservable();
IDisposable disposable = source.Subscribe(
Console.WriteLine,
error => Console.WriteLine("Error: {0}", error.GetType()),
() => "Completed".Dump()
);
task.Start();
Thread.Sleep(1000);
tokenSource.Cancel();
try {
Task.WaitAll(task);
}
catch (AggregateException aggrEx)
{
aggrEx.Handle(ex => {
if (ex is OperationCanceledException) {
return true;
}
return false;
});
}
disposable.Dispose();
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Error: System.Threading.Tasks.TaskCanceledException</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">You can also convert an observable to a task with the <strong>ToTask</strong> method:</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var task = Observable.Return(2)
.Do(_ => Thread.Sleep(2000))
.ToTask();
task.Result.Dump();
</code></pre><div style="text-align: justify;"> ==========</div><div style="text-align: justify;"><em>2</em></div><div style="text-align: justify;">==========</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-52614291208533278982011-12-25T06:00:00.000-08:002011-12-22T06:47:33.418-08:00How to create your own Window observable extension.<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;">In this post I'm going to show you how to create a simple implementation of the Window extension method that can be found in the Reactive Extensions library. This method produces a new observable for each window defined by the count parameter.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public static class ObservableExt
{
public static IObservable<IObservable<T>> Window<T>
(
this IObservable<T> source,
int count
)
{
return Observable.Create<IObservable<T>>(observer => {
int i = 0;
ISubject<T> subject = new Subject<T>();
observer.OnNext(subject);
return source.Subscribe(value => {
subject.OnNext(value);
if((++i % count) == 0)
{
subject.OnCompleted();
subject = new Subject<T>();
observer.OnNext(subject);
}
}, error => {
subject.OnError(error);
observer.OnError(error);
}, () => {
subject.OnCompleted();
observer.OnCompleted();
});
});
}
}
</code></pre><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div style="text-align: justify;"></div><div style="text-align: justify;">Now you can use it as other extensions. The subscription part is more complex as usually, but the only difference is that in the OnNext action of the outer observer you have to subscribe to its input parameter which is of type IObservable:</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>Observable.Range(1, 10)
.Window(3)
.Subscribe(observable => {
observable.Subscribe(
Console.WriteLine,
error2 => error2.Message.Dump(),
() => "Completed2".Dump()
);
},
error => error.Message.Dump(),
() => "Completed".Dump());
</code></pre><div style="text-align: justify;"></div><div style="text-align: justify;">=============</div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>2</em></div><div style="text-align: justify;"><em>3</em></div><div style="text-align: justify;"><em>Completed2</em></div><div style="text-align: justify;"><em>4</em></div><div style="text-align: justify;"><em>5</em></div><div style="text-align: justify;"><em>6</em></div><div style="text-align: justify;"><em>Completed2</em></div><div style="text-align: justify;"><em>7</em></div><div style="text-align: justify;"><em>8</em></div><div style="text-align: justify;"><em>9</em></div><div style="text-align: justify;"><em>Completed2</em></div><div style="text-align: justify;"><em>10</em></div><div style="text-align: justify;"><em>Completed2</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">=============</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-9291034097762595062011-12-24T05:21:00.000-08:002011-12-21T23:38:13.556-08:00Creating a Subject in Reactive Extensions and the difference between cold and hot observables.<div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;">System.Reactive.Subjects namespace contains implementations for the ISubject interface. Subject simply put is both an observable and an observer:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border: 1px dashed rgb(153, 153, 153); color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding: 5px; width: 100%;"><code>public interface ISubject<in TSource, out TResult>
: IObserver<TSource>, IObservable<TResult>
{ }
public interface ISubject<T>
: ISubject<T, T>
{ }
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">You can create an instance of it with the static Subject.Create method and notify the injected observer with two new values:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border: 1px dashed rgb(153, 153, 153); color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding: 5px; width: 100%;"><code>var observable = Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(5);
var observer = Observer.Create<long>(
x => Console.WriteLine("Value published to subject #1: {0}", x),
() => Console.WriteLine("Sequence Completed."));
ISubject<long, long> subject = Subject.Create(observer, observable);
subject.OnNext(1);
subject.OnNext(2);
</code></pre></div><div style="text-align: justify;">================ </div><div style="text-align: justify;"></div><div style="text-align: justify;"><em>Value published to subject #1: 1</em></div><div style="text-align: justify;"><em>Value published to subject #1: 2</em></div><div style="text-align: justify;">================ </div><div style="text-align: justify;">Then you can subscribe to the source observable through the subject. It uses the <strong>Interval</strong> extension method which produces a new value for each period defined as its argument. In our case it will produce a new value every second. Then we use the <strong>Take</strong> method to limit the infinite sequence to 5 values. Now you can subscribe to the subject which will internally subscribe to the injected observable. But if you wait for two seconds in the current thread with the Sleep method and then subscribe once again to the subject you will get an independent subscription from the previous one. The Interval method will start producing values from the beginning. So you have just subscribed to a cold observable. </div></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border: 1px dashed rgb(153, 153, 153); color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding: 5px; width: 100%;"><code>var d1 = subject.Subscribe(
v => Console.WriteLine("Value published to observer #1: {0}", v),
() => Console.WriteLine("Sequence 1 Completed.")
);
Thread.Sleep(2000);
var d2 = subject.Subscribe(
v => Console.WriteLine("Value published to observer #2: {0}", v),
() => Console.WriteLine("Sequence 2 Completed.")
);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on"><br />
<div dir="ltr" style="text-align: justify;" trbidi="on"><em></em></div><em>Value published to observer #1: 0</em> </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #1: 1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #1: 2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #2: 0</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #1: 3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #2: 1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #1: 4</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Sequence 1 Completed.</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #2: 2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #2: 3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value published to observer #2: 4</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Sequence 2 Completed.</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">To create a hot observable behavior you have to create a new instance of the <strong>Subject</strong> class and subscribe with it to the observable:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border: 1px dashed rgb(153, 153, 153); color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding: 5px; width: 100%;"><code>ISubject<long> subject = new Subject<long>();
observable.Subscribe(subject);
var d1 = subject.Subscribe(
v => Console.WriteLine("Value published to observer #1: {0}", v),
() => Console.WriteLine("Sequence Completed.")
);
Thread.Sleep(3000);
var d2 = subject.Subscribe(
v => Console.WriteLine("Value published to observer #2: {0}", v),
() => Console.WriteLine("Sequence Completed.")
);
</code></pre></div><div style="text-align: justify;">In this case the new observer has subscribed to the already publishing observable:</div><div style="text-align: justify;">================ </div><div style="text-align: justify;"></div><div style="text-align: justify;"><em>Value published to observer #1: 0</em></div><div style="text-align: justify;"><em>Value published to observer #1: 1</em></div><div style="text-align: justify;"><em>Value published to observer #1: 2</em></div><div style="text-align: justify;"><em>Value published to observer #2: 2</em></div><div style="text-align: justify;"><em>Value published to observer #1: 3</em></div><div style="text-align: justify;"><em>Value published to observer #2: 3</em></div><div style="text-align: justify;"><em>Value published to observer #1: 4</em></div><div style="text-align: justify;"><em>Value published to observer #2: 4</em></div><div style="text-align: justify;"><em>Sequence 1 Completed.</em></div><div style="text-align: justify;"><em>Sequence 2 Completed.</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">You can actually promote a cold observable to a hot observable by calling the <strong>Publish</strong> extension method. It will return you an <strong>IConnectableObservable</strong> on which you have to call the <strong>Connect</strong> method before you subscribe to it. In that case the subject implementation is internally hidden from you.</div></div><pre style="background-color: #eeeeee; border: 1px dashed rgb(153, 153, 153); color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding: 5px; width: 100%;"><code>IConnectableObservable<long> observable =
Observable
.Interval(TimeSpan.FromSeconds(1))
.Take(5)
.Publish();
observable.Connect();
var d1 = observable.Subscribe(
v => Console.WriteLine("Value published to observer #1: {0}", v),
() => Console.WriteLine("Sequence Completed.")
);
Thread.Sleep(3000);
var d2 = observable.Subscribe(
v => Console.WriteLine("Value published to observer #2: {0}", v),
() => Console.WriteLine("Sequence Completed.")
);
</code></pre>The results are the same as before:<br />
================<br />
<em>Value published to observer #1: 0</em><br />
<em>Value published to observer #1: 1</em><br />
<em>Value published to observer #1: 2</em><br />
<em>Value published to observer #2: 2</em><br />
<em>Value published to observer #1: 3</em><br />
<em>Value published to observer #2: 3</em><br />
<em>Value published to observer #1: 4</em><br />
<em>Value published to observer #2: 4</em><br />
<em>Sequence 1 Completed.</em><br />
<em>Sequence 2 Completed.</em><br />
================</div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com3tag:blogger.com,1999:blog-2938905445844148673.post-16760617565084794572011-12-23T03:56:00.000-08:002011-12-22T06:19:06.003-08:00IDisposable implementations in Reactive Extensions<div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on">System.Reactive.Disposable namespace contains several implementations of the IDisposable interface in conjunction with RX. </div></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public interface IDisposable
{
void Dispose();
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The main reason for the existence of these classes is that they can be used in your implementations of the observable's Subscribe method which returns an IDisposable.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">For example you can return an Empty disposable which actually does nothing:</div><div dir="ltr" style="text-align: left;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Create<int>(
observer => {
observer.OnNext(1);
return Disposable.Empty;
}
);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">Or you can use the static Disposable.Create method and provide it with an action which will be called during disposal:</div></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Create<int>(
observer => {
observer.OnNext(1);
return Disposable.Create(() => "Dispose".Dump());
}
);
using (source.Subscribe(v => v.Dump()))
{
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Dispose</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">In the following example we will return instead of an IDisposable an action which will be wrapped to an IDisposable:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var source = Observable.Create<int>(
observer => {
observer.OnNext(1);
return () => "Dispose".Dump();
}
);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">A BooleanDisposable will change it's IsDisposed property to false after disposal:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var booleanDisposable = new BooleanDisposable();
if (!booleanDisposable.IsDisposed)
{
booleanDisposable.Dispose();
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The SingleAssignmentDisposable's Disposable property can be set only ones as the name suggest, for the second attempt it will raise an InvalidOperationException with the message "Disposable has already been assigned.". You can use it in two different ways. You can first assign your IDisposable to the Disposable property and then dispose the SingleAssignmentDisposable:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var sad = new SingleAssignmentDisposable();
sad.Disposable =
Disposable.Create(() => "Single disposed".Dump());
sad.Dispose();
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Single disposed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on">Or you can first dispose the SingleAssignmentDisposable and then assign your IDisposable to it. In this case your IDisposable will be automatically disposed during the assignment to the Disposable property: </div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var sad = new SingleAssignmentDisposable();
sad.Dispose();
sad.Disposable =
Disposable.Create(() => "Single disposed".Dump());
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Single disposed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ <br />
SerialDisposable allows you to set multiple times the Disposable property. In that case the current Disposable will be disposed and then change to the new one.</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var serialDisposable = new SerialDisposable();
serialDisposable.Disposable = Disposable.Create(() => "Disposed 1".Dump());
"Set a new disposable.".Dump();
serialDisposable.Disposable = Disposable.Create(() => "Disposed 2".Dump());
"Call serial disposable's dispose.".Dump();
serialDisposable.Dispose();
</code></pre>================ <br />
<em>Set a new disposable.</em><br />
<em>Disposed 1</em><br />
<em>Call serial disposable dispose.</em><br />
<em>Disposed 2</em><br />
================ <br />
<div dir="ltr" style="text-align: justify;" trbidi="on">In case that you want to dispose multiple disposables at once you can use the CompositeDisposable class which implements the ICollection<T> interface:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>IDisposable disposable1 =
Observable.Return(1)
.Subscribe(Console.WriteLine);
IDisposable disposable2 =
Observable.Return(2)
.Subscribe(Console.WriteLine);
using(new CompositeDisposable(disposable1, disposable2))
{
};
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================<br />
or</div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>IDisposable disposable1 = Disposable.Create(() => "Disposed 1".Dump());
IDisposable disposable2 = Disposable.Create(() => "Disposed 2".Dump());
IDisposable disposable3 = Disposable.Create(() => "Disposed 3".Dump());
var compositeDisposable = new CompositeDisposable()
{
disposable1,
disposable2
};
compositeDisposable.Add(disposable3);
compositeDisposable.Dispose();
</code></pre>================<br />
<em>Disposed 1</em><br />
<em>Disposed 2</em><br />
<em>Disposed 3</em><br />
================<br />
<div dir="ltr" style="text-align: justify;" trbidi="on">The final example will use a CancellationDisposable which is disposed when you call the Cancel method on the injected CancellationTokenSource:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: rgb(153,153,153) 1px dashed; border-left: rgb(153,153,153) 1px dashed; border-right: rgb(153,153,153) 1px dashed; border-top: rgb(153,153,153) 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var cts = new CancellationTokenSource();
var cd = new CancellationDisposable(cts);
cts.Cancel();
cd.IsDisposed.Dump();
</code></pre></div><div style="text-align: justify;">================ </div><div style="text-align: justify;"><em>True</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">There are also other implementations of the IDisposable interface in this namespace such as MultipleAssignmentDisposable, SchedulerDisposable and so on, but I'm not going to cover these in this topic.</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-70240826872751940692011-12-22T06:46:00.000-08:002011-12-22T10:01:05.322-08:00The Aggregate and Scan RX extension methods.<div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;">This post is about how to sum all numbers in a range from an observable sequence with the help of the <strong>Aggregate</strong> method. Then the <strong>FirstOrDefault</strong> is used to convert the resulting IObservable to an int.</div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var sumOfNumbers = Observable.Range(1, 10)
.Aggregate(0, (x, y) => x + y)
.FirstOrDefault();
Console.WriteLine("Sum of numbers from 1 to 10 is {0}", sumOfNumbers);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Sum of numbers from 1 to 10 is 55</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on">Or you can you the <strong>Scan</strong> method to display the intermediate results with the help of the <strong>Do</strong> method and finally store the result with the <strong>LastOrDefault</strong>.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>int sumOfNumbers = Observable.Range(1, 10)
.Scan(0, (x, y) => x + y)
.Do(Console.WriteLine)
.LastOrDefault();
Console.WriteLine("Sum of numbers from 1 to 10 is {0}", sumOfNumbers);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>6</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>10</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>15</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>21</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>28</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>36</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>45</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>55</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Sum of numbers from 1 to 10 is 55</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">==========<br />
You can use the Aggregate method to fill a collection with received values from the observable:</div></div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>IList<int> collection =
Observable.Range(1, 5)
.Aggregate(new List<int>(), (list, value) => {
list.Add(value);
return list;
})
.FirstOrDefault();
foreach (var element in collection)
{
Console.WriteLine(element);
}
</code></pre>==========<br />
<em>1</em><br />
<em>2</em><br />
<em>3</em><br />
<em>4</em><br />
<em>5</em><br />
==========</div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-47740706878839080902011-12-22T02:05:00.000-08:002011-12-22T10:32:43.429-08:00Advancing the time with TestScheduler in Reactive Extensions.<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;">In this post I'm going to show you how you can programmatically advance the time with the test scheduler so you don't have to wait 5 days for an event in your test environment. First we will prepare a testable cold observable. It will schedule us 4 OnNext notifications every 100 virtual ticks and finally after 500 ticks an OnCompleted notification kind.<br />
<pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var testScheduler = new TestScheduler();
var records = new Recorded<Notification<int>>[] {
ReactiveTest.OnNext(100, 1),
ReactiveTest.OnNext(200, 2),
ReactiveTest.OnNext(300, 3),
ReactiveTest.OnNext(400, 4),
ReactiveTest.OnCompleted<int>(500)
};
ITestableObserver<int> testableObserver =
testScheduler.CreateObserver<int>();
ITestableObservable<int> testableObservable =
testScheduler.CreateColdObservable(records);
IDisposable d = testableObservable.Subscribe(testableObserver)
</code></pre>Now you can use <strong>AdvanceBy</strong> which advances the test schedulers Clock by this relative time (testScheduler.Clock+time) and runs all the scheduled work within it. You can also use the <strong>AdvanceTo</strong> method to run all scheduled items up to this absolute time and again advance the schedulers Clock to this point. The <strong>Start</strong> method runs the remaining work from the actual Clock time.<br />
<pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>int subscrCount = testableObservable.Subscriptions.Count();
Console.WriteLine("Number of subscriptions to the test observable: {0}"
, subscrCount);
testScheduler.AdvanceBy(200);
Console.WriteLine("Messages sent({0}) until {1}"
, testableObserver.Messages.Count
, testScheduler.Clock);
testScheduler.AdvanceTo(400);
Console.WriteLine("Messages sent({0}) until {1}"
, testableObserver.Messages.Count
, testScheduler.Clock);
testScheduler.Start();
Console.WriteLine("Messages sent({0}) until {1}"
, testableObserver.Messages.Count
, testScheduler.Clock);
foreach (var message in testableObserver.Messages)
{
Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
}
</code></pre>At the end you can check that all scheduled items were run and sent to the mock observer.<br />
================ <br />
<div><div><em>Number of subscriptions to the test observable: 1<br />
Messages sent(2) until 200<br />
Messages sent(4) until 400<br />
Messages sent(5) until 500<br />
Value OnNext(1) at 100<br />
Value OnNext(2) at 200<br />
Value OnNext(3) at 300<br />
Value OnNext(4) at 400<br />
Value OnCompleted() at 500</em><br />
================</div></div></div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-88541765198871042852011-12-21T07:14:00.000-08:002011-12-21T07:14:32.055-08:00How to use the Notification class in Reactive Extensions<div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on">In this topic I will show you how you can use the main three Notification types in RX. They are OnNext, OnError and OnCompleted and you will mainly use them for testing purposes. </div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public enum NotificationKind
{
OnNext,
OnError,
OnCompleted
}
</code></pre><div dir="ltr" style="text-align: justify;" trbidi="on">Let's first create a notification of NotificationKind.OnNext and explore it's properties and methods:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>// Create a NotificationKind.OnNext with new Exception
Notification<int> notification = Notification.CreateOnNext(1);
// explore instance properties
Console.WriteLine("Value: {0}" , notification.Value);
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());
var observer = Observer.Create<int>(Console.WriteLine);
// calls the observer's OnNext method
// with the Value as input parameter
// observer.OnNext(notification.Value);
notification.Accept(observer);
// do the same thing manually
var source = notification.ToObservable();
source.Subscribe(observer.AsObserver());
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The code is describe with comments and the output is as follows:</div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value: 1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>HasValue: True</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Kind: OnNext</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnNext(1)</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">A NotificationKind.OnError notification can be created as follows:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>// Create a NotificationKind.OnErrorwith value 1
Notification<int> notification = Notification
.CreateOnError<int>(new Exception("error"));
// explore instance properties
Console.WriteLine("Exception: {0}" , notification.Exception);
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());
var observer = Observer.Create<int>(
Console.WriteLine,
error => Console.WriteLine(error.Message)
);
// calls the observer's OnError method
// with the Exception as input parameter
// observer.OnError(notification.Exception);
notification.Accept(observer.AsObserver());
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Exception: System.Exception: error</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>HasValue: False</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Kind: OnError</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>OnError(System.Exception)</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>error</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">A NotificationKind.OnCompleted notification can be created as follows:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>// Create a NotificationKind.OnCompleted
Notification<int> notification = Notification
.CreateOnCompleted<int>();
// explore instance properties
Console.WriteLine("HasValue: {0}", notification.HasValue);
Console.WriteLine("Kind: {0}", notification.Kind);
// user friendly ToString
Console.WriteLine(notification.ToString());
var observer = Observer.Create<int>(
Console.WriteLine,
() => Console.WriteLine("Completed")
);
// calls the observer's OnCompleted method
// observer.OnCompleted();
notification.Accept(observer.AsObserver());
</code></pre></div><div style="text-align: justify;">================</div><div style="text-align: justify;"><em>HasValue: False</em></div><div style="text-align: justify;"><em>Kind: OnCompleted</em></div><div style="text-align: justify;"><em>OnCompleted()</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">You can convert an observable to a collection of notifications with the Materialize and ToEnumerable methods. In this case we Concat to observables to produce one sequence and produce an error after the Range ends.</div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var notifications = Observable.Range(1, 2)
.Concat(Observable.Throw<int>(new InvalidOperationException()))
.Materialize()
.ToEnumerable();
foreach (var notification in notifications)
{
notification.ToString().Dump();
}
</code></pre></div>================<br />
<em>OnNext(1)</em><br />
<em>OnNext(2)</em><br />
<em>OnError(System.InvalidOperationException)</em><br />
================<br />
You can achieve the same result with creating an observer from a notification callback. Observer will convert every notification to an appropriate NotificationKind and pass it to the provided callback method.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var notifications = Observable.Range(1, 2)
.Concat(Observable.Throw<int>(new InvalidOperationException()));
Action<Notification<int>> action = notification => {
notification.ToString().Dump();
};
var observer = action.ToObserver();
notifications.Subscribe(observer);
</code></pre></div><div style="text-align: justify;">================</div><div style="text-align: justify;"><em>OnNext(1)</em></div><div style="text-align: justify;"><em>OnNext(2)</em></div><div style="text-align: justify;"><em>OnError(System.InvalidOperationException)</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">And finally you can convert back an observer to an Action delegate with the <strong>ToNotifier</strong> and call it as a method:</div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>var notifier = Observer.Create<int>(
Console.WriteLine,
() => "Completed".Dump())
.ToNotifier();
notifier(Notification.CreateOnNext(2));
notifier(Notification.CreateOnCompleted<int>());
</code></pre>================<br />
<em>2</em><br />
<em>Completed</em><br />
================</div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-70315047717759755652011-12-21T00:49:00.000-08:002011-12-21T05:35:14.131-08:00Preparing for unit testing with testable observables<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on"></div><div dir="ltr" style="text-align: justify;" trbidi="on">In the following example we are going to explore the basic functionality provided to help us with unit testing of RX observables. First we have to create an instance of the class <strong>TestScheduler</strong> which implements <strong>ISchudeler</strong> interface. This will enable us to schedule some work for the future. Then we call the <strong>CreateObserver</strong> method on it. It will return us an ITestableObserver. There is a property called <strong>Messages</strong> which is an addition to the classical IObservable interface. It will contain the actions sent to the previously created MockObserver after calling the Start method on the testScheduler. Next we will schedule two notifications. There are three possibilities in the <strong>NotificationKind</strong> enumeration (OnNext, OnError and OnCompleted). You can create these with Notification.CreateOnNext, Notification.CreateOnError and Notification.CreateOnCompleted static methods. These actions are scheduled to run after 100 and 200 virtual ticks. Schedule contains a callback which will be executed when the virtual time will be advanced to this point. In this case we are calling the notification's <strong>Accept</strong> method which invokes the mock observer's method corresponding to the notification. In our case it's OnNext for the our first scheduled item and for the second one it's OnCompleted. After we have subscribed to the testable observable we can start the scheduler. This will actually run our scheduled items an send them to the mock observer. This observer will record the received notifications to it's Messages collection.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var testScheduler = new TestScheduler();
var testableObserver = testScheduler.CreateObserver<int>();
testScheduler.ScheduleAbsolute(Notification.CreateOnNext<int>(2), 100L, (scheduler, state) => {
state.Accept(testableObserver);
return Disposable.Empty;
});
testScheduler.ScheduleAbsolute(Notification.CreateOnCompleted<int>(), 200L, (scheduler, state) => {
state.Accept(testableObserver);
return Disposable.Empty;
});
testScheduler.Start();
foreach (var message in testableObserver.Messages)
{
Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">Finally we will send the recorded messages to the output:</div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value OnNext(2) at 100</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Value OnCompleted() at 200</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">We can achieve the same result with the use of <strong>CreateColdObservable</strong> method. It receives as an input a collection of notification records. Record's Value has to be a Notification which are in this case created with the help of static methods ReactiveTest.OnNext and ReactiveTest.OnCompleted (there also exists a ReactiveTest.OnError). In case of OnNext the first parameter is the relative schedule time and the value of the notification.</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var testScheduler = new TestScheduler();
var records = new Recorded<Notification<int>>[] {
ReactiveTest.OnNext(100, 1),
ReactiveTest.OnCompleted<int>(200)
};
var testableObserver = testScheduler.CreateObserver<int>();
var testableObservable = testScheduler.CreateColdObservable(records);
testableObservable.Subscribe(testableObserver);
testScheduler.Start();
foreach (var message in testableObserver.Messages)
{
Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
}
</code></pre></div><div style="text-align: justify;">Yet another way how to achieve the same thing is to provide a create observable method and provide creation, subscription and disposal time to the Start method of the test scheduler. In this case the notifications are scheduled relatively to the subscription time. So now we don't have to create a mock observer manually and subscribe to the test observable, it will be managed by the scheduler.</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var testScheduler = new TestScheduler();
var records = new Recorded<Notification<int>>[] {
ReactiveTest.OnNext(100, 1),
ReactiveTest.OnCompleted<int>(200)
};
var testableObserver = testScheduler.Start(
() => testScheduler.CreateColdObservable(records),
0, 50, 300
);
foreach (var message in testableObserver.Messages)
{
Console.WriteLine("Value {0} at {1}", message.Value, message.Time);
}
</code></pre></div><div style="text-align: justify;">The results are: </div><div style="text-align: justify;">================</div><div style="text-align: justify;"><em>Value OnNext(1) at 150</em></div><div style="text-align: justify;"><em>Value OnCompleted() at 250</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">That's it for now. Next we will explore subjects and the difference between cold and hot observables.</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-65369213326740747722011-12-20T09:52:00.000-08:002011-12-21T11:11:11.358-08:00Creating basic observable collections with RX<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;">In the previous article we created some observables which produced only single notification. This time we will focus on more collection like notifications. We will start with the <strong>Repeat</strong> method which repeats the first argument n-times based on the second argument:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = Observable.Repeat(1, 3);
IDisposable d = source.Subscribe(
value => value.Dump(),
error => error.Message.Dump(),
() => "Completed".Dump()
);
</code></pre>The above mentioned code snippet uses an extension method defined in the System.ObservableExtensions static class. It provides you several overloaded versions of ObservableExtensions.<strong>Subscribe</strong> in case you don't want to create your own observer, but only provide the action methods.<br />
If you are using Visual Studio you have to use the subscribe method with System.Console instead of the Dump method.<br />
<pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> IDisposable d = source.Subscribe(
Console.WriteLine,
error => Console.WriteLine(error),
() => Console.WriteLine("Completed")
);
</code></pre>The output from LINQPad is as follows:<br />
================<br />
<em>1</em><br />
<em>1</em><br />
<em>1</em><br />
<em>Completed</em><br />
================<br />
Now we are going to create a <strong>Range</strong> of notifications. The first argument is the starting value and the second is how many values we will receive together:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = Observable.Range(1, 3);
IDisposable d = source.Subscribe(observer);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>2</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>3</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Completed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">The last example is the most generic one. The <strong>Generate</strong> extension method needs these five arguments: an initial state, a condition when to stop, an iterator how to get the next value and a result selector to shape the result:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = Observable.Generate(
1,
value => value <= 3,
value => value + 1,
value => value
);
IDisposable d = source.Subscribe(observer);
</code></pre></div><div style="text-align: justify;">The initial state is set to 1. In every "iteration" the state is incremented by one while the condition is met. The resulting shape of the values won't be changed in this case. This time we have created the exactly same functionality as the Repeat method provides:</div><div style="text-align: justify;">================</div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>2</em></div><div style="text-align: justify;"><em>3</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">Next time we will create a testable observable.</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-21768786019063093162011-12-20T09:18:00.000-08:002011-12-21T09:32:26.694-08:00How to create an observable with RX<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;"></div><div style="text-align: justify;"></div><div dir="ltr" style="text-align: justify;" trbidi="on">IObservable<T> is defined in the System namespace and defines a provider for push based notifications. It looks like this:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">An observable has to implement this interface. Than you can call the <strong>Subscribe</strong> function on it via which you can inject an observer to obtain notifications from it. The function returns an IDisposable which defines the Dispose method.</div><div dir="ltr" style="text-align: justify;" trbidi="on">You can create a new observable in a lot of different ways. I'm going to show you the basic methods of the <strong>Observable</strong> static class from System.Reactive.Linq namespace for creating observables. This class also contains a lot of <strong>LINQ query operators</strong> implemented as extension methods such as Where, Take, etc.</div><div dir="ltr" style="text-align: justify;" trbidi="on">All examples in this article will use the following observer instance:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var observer = Observer.Create<int>(
value => value.Dump(),
error => error.Message.Dump(),
() => "Completed".Dump()
);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The easiest way how to create an observable which notifies us about one value is the <strong>Next</strong> extension method and subscribe to it:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = Observable.Return(1);
source.Subscribe(observer);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The results are:</div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>1</em></div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Completed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on">If you would like to simulate an error condition you can use the <strong>Throw</strong> extension method. In this case the OnCompleted is not called:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = Observable.Throw<int>(new Exception("Error has occurred."));
source.Subscribe(observer);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The result is: </div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Error has occurred.</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on">An empty observable can be created with the <strong>Empty</strong> extension method. In this case only the OnCompleted method of the observer is called:</div><div dir="ltr" style="text-align: justify;" trbidi="on"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = Observable.Empty<int>();
source.Subscribe(observer);
</code></pre></div><div dir="ltr" style="text-align: justify;" trbidi="on">The result is: </div><div dir="ltr" style="text-align: justify;" trbidi="on">================</div><div dir="ltr" style="text-align: justify;" trbidi="on"><em>Completed</em></div><div dir="ltr" style="text-align: justify;" trbidi="on">================ </div><div dir="ltr" style="text-align: justify;" trbidi="on">Ok, so now it's time to create your first own IObservable implementation:</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>internal class DefaultObservable<T> : IObservable<T>
{
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
{
throw new ArgumentNullException("observer");
}
observer.OnNext(default(T));
observer.OnCompleted();
return Disposable.Empty;
}
}
</code></pre></div><div style="text-align: justify;">Now you can create an instance of the DefaultObservable class which notifies our observer with the default value of type T. </div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var source = new DefaultObservable<int>();
IDisposable d = source.Subscribe(observer);
</code></pre></div><div style="text-align: justify;">After you subscribe to it you will receive the following output:</div><div style="text-align: justify;">================</div><div style="text-align: justify;"><em>1</em></div><div style="text-align: justify;"><em>Completed</em></div><div style="text-align: justify;">================</div><div style="text-align: justify;">In the next topic we will focus on creating observables which are producing multiple notifications.</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-48629343711400522892011-12-19T08:04:00.000-08:002011-12-21T05:33:35.866-08:00How to create an observer with RX<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;"></div><div style="text-align: justify;">IObserver<t> interfaces which is used as a receiver for pushed based notifications is defined in the System namespace like this:</div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>public interface IObserver<in T>
{
void OnNext(T value);
void OnError(Exception error);
void OnCompleted();
}
</code></pre></div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;"><strong>OnNext</strong> - is used for notifying the observer about a new value.</div><div style="text-align: justify;"><strong>OnError</strong> - is a notification that an error occurred.</div><div style="text-align: justify;"><strong>OnCompleted</strong> - notifies our observer that the provider has finished with sending notifications.</div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;">You can create a new observer with the help of the static class <strong>Observer </strong>defined in System.Reactive namespace which provides several overloaded <strong>Create</strong> methods:</div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;">Create<T>(Action<T>, Action<Exception>, Action) - creates an observer from OnNext, OnError and OnCompleted actions:</div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var observer = Observer.Create<int>(
value => value.Dump(),
error => error.Message.Dump(),
() => "Completed"
);
</code></pre></div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;">Or you can use implicit values where OnError re-throws the exception and OnCompleted does nothing:</div><div style="text-align: justify;"><br />
</div><div style="text-align: justify;"><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code> var observer1 = Observer.Create<int>(
value => value.Dump(),
error => error.Message.Dump()
);
var observer2 = Observer.Create<int>(
value => value.Dump()
);
</code></pre></div><div style="text-align: justify;"><br />
You can also write your own class which implements the IObserver interface (in your implementation you should also check for null values in the constructor):</div><br />
<pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>internal class Observer<T> : IObserver<T>
{
private Action<T> onNext;
private Action<Exception> onError;
private Action onCompleted;
public Observer(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
this.onNext(value);
}
public void OnError(Exception error)
{
this.onError(error);
}
public void OnCompleted()
{
this.onCompleted();
}
}
</code></pre><br />
Then you can create an instance of the Observer class and notify it about three new values (1, 2 and than 3) and finish the notifications by calling OnCompleted:<br />
<br />
<pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>void Main()
{
var observer = new Observer<int>(
value => value.Dump(),
error => error.Message.Dump(),
() => "Completed".Dump()
);
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
observer.OnCompleted();
}
</code></pre><br />
The results are:<br />
================<br />
<em>1</em><br />
<em>2</em><br />
<em>3</em><br />
<em>Completed</em><br />
================<br />
<div style="text-align: justify;">In the next topic I will show you how you can create basic observables which will notify our observer.</div></div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0tag:blogger.com,1999:blog-2938905445844148673.post-29704923319075263452011-12-18T07:24:00.000-08:002011-12-21T05:33:20.298-08:00Preparing for Reactive Extensions (RX) with LinqPad<div dir="ltr" style="text-align: left;" trbidi="on"><div style="text-align: justify;">In the following series I will show you how to use Reactive Extensions with my favorite scratchpad LINQPad.</div><div style="text-align: justify;"><br />
<div></div></div><div style="text-align: justify;"><strong>Download and Installation</strong></div><div style="text-align: justify;">You can find instructions how to download and install RX on this <a href="http://msdn.microsoft.com/en-us/data/gg577610" target="_blank">page</a> in the "Download the Reactive Extensions for .NET" section.</div><div style="text-align: justify;">LINQPad is downloadable from this <a href="http://www.linqpad.net/" target="_blank">site</a>.</div>Open LINQPad.exe<br />
<div class="separator" style="clear: both; text-align: center;"></div><div class="separator" style="clear: both; text-align: center;"></div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhTHmdhulaws7AGOPBZIghg_4hU46P5v6hTzkuMdedRH50u-jB03ARkJheZ5yei6Vv4v7JEU2peHxKx-3-p7QmKzWTddhIbqQgucnIKccL4Ou3nym0vpcwtsbBC9ERyMyEZjiiNLWQROU36/s1600/LinqPad.JPG" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="213" oda="true" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhTHmdhulaws7AGOPBZIghg_4hU46P5v6hTzkuMdedRH50u-jB03ARkJheZ5yei6Vv4v7JEU2peHxKx-3-p7QmKzWTddhIbqQgucnIKccL4Ou3nym0vpcwtsbBC9ERyMyEZjiiNLWQROU36/s320/LinqPad.JPG" width="320" /></a></div><div style="text-align: justify;">press F4 and then click Browse.. to add new assemblies from the installation folder (Program Files\Microsoft Reactive Extensions SDK\v1.0.10621\Binaries\.NETFramework\v4.0) in the Additional References tab:</div><ul style="text-align: left;"><li>System.Reactive</li>
<li>Microsoft.Reactive.Testing</li>
</ul><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh7QQ-3dElhC-aXeNfz0GEIIIosSQTue9hi9qDpHXxBmZ0RGGVgLLi5ic2YOmM7tQu7KP4tivzbSXpdj-Ayab-ROWLTa-tNHzeH5o0iij0aKdev5I2jHiG5DQipwluZFOkCSmic5kOZxU0o/s1600/AdditionalReferences.JPG" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="214" oda="true" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh7QQ-3dElhC-aXeNfz0GEIIIosSQTue9hi9qDpHXxBmZ0RGGVgLLi5ic2YOmM7tQu7KP4tivzbSXpdj-Ayab-ROWLTa-tNHzeH5o0iij0aKdev5I2jHiG5DQipwluZFOkCSmic5kOZxU0o/s320/AdditionalReferences.JPG" width="320" /></a></div><div style="text-align: justify;">You will also need to add the following namespaces without the using statement in the Additional Namespace Imports Tab:</div><ul style="text-align: left;"><li>System.Threading.Tasks</li>
<li>Microsoft.Reactive.Testing</li>
<li>System.Reactive</li>
<li>System.Reactive.Linq</li>
<li>System.Reactive.Subjects</li>
<li>System.Reactive.Concurrency </li>
<li>System.Reactive.Disposables</li>
</ul><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiccb5gVmUgeD-Od3d_Wo3nvdvshKflsYfOwP0BPx7EChSn6WjYlARDN2qiGQQ1R7khmOOgc0Kd8YKxPHV6EJiRUXab49ySR_nuA2CFLAgsjorOdWJ32Ia96JJawI-N9W-dDFvD-0SOphcX/s1600/AdditionalNamespaceImports.JPG" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="214" oda="true" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEiccb5gVmUgeD-Od3d_Wo3nvdvshKflsYfOwP0BPx7EChSn6WjYlARDN2qiGQQ1R7khmOOgc0Kd8YKxPHV6EJiRUXab49ySR_nuA2CFLAgsjorOdWJ32Ia96JJawI-N9W-dDFvD-0SOphcX/s320/AdditionalNamespaceImports.JPG" width="320" /></a></div>Now we will try our first example which will be explained in the next articles. Type the source code as shown below:<br />
<br />
<div></div><pre style="background-color: #eeeeee; border-bottom: #999999 1px dashed; border-left: #999999 1px dashed; border-right: #999999 1px dashed; border-top: #999999 1px dashed; color: black; font-family: Andale Mono, Lucida Console, Monaco, fixed, monospace; font-size: 12px; line-height: 14px; overflow: auto; padding-bottom: 5px; padding-left: 5px; padding-right: 5px; padding-top: 5px; width: 100%;"><code>void Main()
{
var source = Observable.Range(1, 5);
source.Subscribe(value => value.Dump());
}
</code></pre>You should see the following results in the result window after pressing Execute (F5):<br />
<div>================<br />
<em>1</em><br />
<em>2</em><br />
<em>3</em><br />
<em>4</em><br />
<em>5</em><br />
================</div><div class="separator" style="clear: both; text-align: center;"><a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh6L8HJd7pbqtXkiE4Nsic9lkWx8q-LzgljV_uLexRL3lrW9cu2v32TdaF4dN3rAr7ZjJt4RKP3NQ4MF6pSZE7ElRMMpTwVuC3r1B_Df7k4aUhs8orz0SgFBdwWeaWYGRpDQKlcVO5dMwtF/s1600/Example.JPG" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="214" oda="true" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEh6L8HJd7pbqtXkiE4Nsic9lkWx8q-LzgljV_uLexRL3lrW9cu2v32TdaF4dN3rAr7ZjJt4RKP3NQ4MF6pSZE7ElRMMpTwVuC3r1B_Df7k4aUhs8orz0SgFBdwWeaWYGRpDQKlcVO5dMwtF/s320/Example.JPG" width="320" /></a></div><br />
<div></div>Save the code snippet (Ctrl+S) as a Linq query file. You can create new linq query files with the same settings by pressing Ctrl+Shift+N. I hope that now you are prepared for the upcoming posts.</div>Tomas F.http://www.blogger.com/profile/06569373675946430347noreply@blogger.com0