danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

How to create a simple in memory aggregator using Routemeister

Routemeister in it self is designed to be small and solve one thing well. How-ever, the powers of it is still great. Lets presume you need a router that routes messages to one particular instance of an object. This could be useful for e.g. aggregating something based upon a bunch of events.

Define the handler interface and some messages

The definition of the handler is something you own in your domain. Lets create one and some messages that will be aggregated to cart contents.

public interface ICartEventHandler<in T> where T : ICartEvent
{
    Task HandleAsync(T ev);
}
public interface ICartEvent
{
    Guid CartId { get; }
}

public class ArticleAddedToCart : ICartEvent
{
    public Guid CartId { get; }
    public string ArticleNo { get; }
    public int Qty { get; }

    public ArticleAddedToCart(Guid cartId, string articleNo, int qty)
    {
        CartId = cartId;
        ArticleNo = articleNo;
        Qty = qty;
    }
}

public class ArticleRemovedFromCart : ICartEvent
{
    public Guid CartId { get; }
    public string ArticleNo { get; }
    public int Qty { get; }

    public ArticleRemovedFromCart(Guid cartId, string articleNo, int qty)
    {
        CartId = cartId;
        ArticleNo = articleNo;
        Qty = qty;
    }
}

Create a custom router

In Routemeister, the thing that is "expensive" is the MessagRoutes. Once created using the MessageRouteFactory, you should hold on to them. For this purpose, we "cache" them away per object instance.

public class SpecificObjectAsyncMessageRouter : IAsyncMessageRouter
{
    private static readonly ConcurrentDictionary<Type, MessageRoutes>
        RoutesCache = new ConcurrentDictionary<Type, MessageRoutes>();

    private readonly IAsyncMessageRouter _router;

    public SpecificObjectAsyncMessageRouter(
        object target,
        params Type[] messageHandlerMarkers)
    {
        if (target == null)
            throw new ArgumentNullException(nameof(target));

        if (messageHandlerMarkers == null)
            throw new ArgumentNullException(nameof(messageHandlerMarkers));

        if (!messageHandlerMarkers.Any())
            throw new ArgumentException(
                "Can not be empty.",
                nameof(messageHandlerMarkers));

        var messageRoutes = RoutesCache.GetOrAdd(
            target.GetType(),
            objectType =>
        {
            var assembly = objectType.Assembly;
            var routeFactory = new MessageRouteFactory();
            var routes = new MessageRoutes();
            foreach (var marker in messageHandlerMarkers)
                routes.Add(routeFactory.Create(assembly, marker));

            return routes;
        });

        _router = new SequentialAsyncMessageRouter(
            (type, envelope) => target,
            messageRoutes);
    }

    public Task RouteAsync<T>(T message)
    {
        return _router.RouteAsync(message);
    }
}

Implement a handler

Now, lets create something that represents a Cart that aggregates some events:

public class Cart :
    ICartEventsHandler<ArticleAddedToCart>,
    ICartEventsHandler<ArticleRemovedFromCart>
{
    private readonly ConcurrentDictionary<string, int> _articles;

    public Guid CartId { get; }

    public Cart()
    {
        CartId = Guid.NewGuid();
        _articles = new ConcurrentDictionary<string, int>();
    }

    public Task HandleAsync(ArticleAddedToCart ev)
    {
        Console.WriteLine(
            $"Adding {ev.Qty}pc of {ev.ArticleNo} to {ev.CartId}");

        _articles.AddOrUpdate(
            ev.ArticleNo,
            ev.Qty,
            (s, qty) => qty + ev.Qty);

        return Task.FromResult(0);
    }

    public Task HandleAsync(ArticleRemovedFromCart ev)
    {
        Console.WriteLine(
            $"Removing {ev.Qty}pc of {ev.ArticleNo} from {ev.CartId}");

        _articles.AddOrUpdate(
            ev.ArticleNo,
            0,
            (s, qty) => qty - ev.Qty);

        return Task.FromResult(0);
    }

    public IEnumerable<KeyValuePair<string, int>> GetArticles()
    {
        return _articles;
    } 
}

Dispatch some events

Now we can use an instance of the router to route messages to one particular instance. In our case an instance of a Cart.

static void Main(string[] args)
{
    var cart = new Cart();

    var router = new SpecificObjectAsyncMessageRouter(
        cart,
        typeof(ICartEventsHandler<>));

    //Demo code ahead...
    Task.WaitAll(
        router.RouteAsync(new ArticleAddedToCart(cart.CartId, "a1", 1)),
        router.RouteAsync(new ArticleAddedToCart(cart.CartId, "a2", 2)),
        router.RouteAsync(new ArticleRemovedFromCart(cart.CartId, "a2", 1)),
        router.RouteAsync(new ArticleAddedToCart(cart.CartId, "a1", 1)));

    foreach (var pair in cart.GetArticles())
    {
        Console.WriteLine($"Order {pair.Value}pc of {pair.Key}");
    }
}

View Comments