Wednesday, December 28, 2011

How to create a parallel observer of two observables with the help of the visitor design pattern in RX.

This example will be a little bit more complex, but very dynamic solution for observing and influencing two or more observables with the help of a visitor. First we create an abstract class Direction and its two implementations LeftDirection and RightDirection. These two classes will enable us to decide from which observable came the notification.
public abstract class Direction<T>
{
    public T Value { get; set; }
    
    public static Direction<T> FromLeft(T value)
    {
        return new LeftDirection<T>(value);
    }
    
    public static Direction<T> FromRight(T value)
    {
        return new RightDirection<T>(value);
    }
}

public class LeftDirection<T> : Direction<T>
{
    public LeftDirection(T value)
    {
        this.Value = value;
    }
}

public class RightDirection<T> : Direction<T>
{
    public RightDirection(T value)
    {
        this.Value = value;
    }
}

Next code snippet will enable you to inherit a new class from the abstract DirectionVisitor class and override its VisitDirection methods in order to provide a different functionality for both observables.
public abstract class DirectionVisitor<T>
{
    public Action<Notification<T>, IObserver<T>> Visit(Direction<Notification<T>> direction)
    {
        return VisitDirection(direction as dynamic);
    }
    
    protected abstract Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction);
    
    protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
    {
        return VisitDirection(left as Direction<Notification<T>>);
    }
    
    protected virtual Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
    {
        return VisitDirection(right as Direction<Notification<T>>);
    }
    
}
The basic idea of the following implementation of the DirectionVisitor is to call the observer's OnCompleted only in case that both observables has already produced an OnCompleted notification. You could influence the generation of the notifications from both observables in a lot of different ways. One observable can also impact the other one.
public class ConvertDirectionToAction<T>: DirectionVisitor<T>
{
    private bool isLeftCompleted;
    private bool isRightCompleted;
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(Direction<Notification<T>> direction)
    {
        Action<Notification<T>, IObserver<T>> action = (notif, observer) => {
            NotificationKind kind = notif.Kind;
            switch (notif.Kind)
            {
                case NotificationKind.OnNext: 
                    observer.OnNext(notif.Value);
                    break;
                case NotificationKind.OnError: 
                    observer.OnError(notif.Exception);
                    break;
                case NotificationKind.OnCompleted: 
                    if (this.isLeftCompleted && this.isRightCompleted)
                    {
                        observer.OnCompleted();
                    }
                    break;                    
            }    
        };
        return action;
    }
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(LeftDirection<Notification<T>> left)
    {
        NotificationKind kind = left.Value.Kind;
        if (kind == NotificationKind.OnCompleted || 
            kind == NotificationKind.OnError)
        {
            this.isLeftCompleted = true;
        }
        
        return VisitDirection(left as Direction<Notification<T>>);
    }
    
    protected override Action<Notification<T>, IObserver<T>> VisitDirection(RightDirection<Notification<T>> right)
    {
        NotificationKind kind = right.Value.Kind;
        if (kind == NotificationKind.OnCompleted || 
            kind == NotificationKind.OnError)
        {
            this.isRightCompleted = true;
        }
        
        return VisitDirection(right as Direction<Notification<T>>);
    }    
}
Next we will need an implementation of IObserver which will be able to observe two observables and run a different action for a notification based on which one produced it. You can inject your own visitor into the Visitor property and change how the ParallelObserver will observe them.
public class ParallelObserver<T> : IObserver<Direction<Notification<T>>>
{
    IObserver<T> observer;
    
    public ParallelObserver(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }
        this.observer = observer;
    }
    
    public void OnNext(Direction<Notification<T>> value)
    {
        var action = this.Visitor.Visit(value);
        
        action(value.Value, this.observer);
    }
    
    void IObserver<Direction<Notification<T>>>.OnError(Exception error)
    {
    }
    
    void IObserver<Direction<Notification<T>>>.OnCompleted()
    {
    }
    
    private DirectionVisitor<T> visitor;

    public DirectionVisitor<T> Visitor 
    {
        get 
        { 
            return this.visitor ?? 
                   (this.visitor = new ConvertDirectionToAction<T>()); 
        }
        set 
        { 
            this.visitor = value; 
        }    
    }
}
Now you need an extension method which runs both observables in parallel and calls the OnCompleted only when both observables has already finished the work.
public static class ObservableExtensions
{
    public static IObservable<T> Run<T>(this IObservable<T> left, IObservable<T> right)
    {
        return Observable.Create<T>(observer => {
            var parallelObserver = new ParallelObserver<T>(observer);
            
            var disposable1 = left.Materialize()
                         .Select(v => Direction<Notification<T>>.FromLeft(v))
                         .Synchronize()
                         .Subscribe(parallelObserver);
                         
            var disposable2 = right.Materialize()
                          .Select(v => Direction<Notification<T>>.FromRight(v))
                          .Synchronize()
                          .Subscribe(parallelObserver);
                          
            return new CompositeDisposable(2) { disposable1, disposable2 };
        });
    }
}
Try it and test it as a homework.
var source1 = Observable.Repeat(4, 3);
var source2 = Observable.Repeat(1, 3);
    
var input = source1.Run(source2);

input.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Done"));
==========
4
1
4
1
4
1
Done
==========
Without this extension the observer wouldn't receive the same notifications.
var observer = Observer.Create<int>(
    Console.WriteLine, 
    Console.WriteLine, 
    () => Console.WriteLine("Done")
);

var source1 = Observable.Repeat(4, 3).Subscribe(observer);

var source2 = Observable.Repeat(1, 3).Subscribe(observer);
=========
4
4
4
Done
=========

No comments:

Post a Comment