danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Using the OpParser with Nats to create a long running consumer in C#

I just published a post where I describe the OpParser and which lets you consume downstream ops from a NATS server. On demand, lets show how to use it with a NATS server to get a long running subscriber.

All posts in this series

Disclaimer this is still on a level where we are looking into ways of interacting with NATS from C#. When time comes there will be a packaged up GitHub repo and probably a NuGet that you can make use of. Can't wait? Look at the official one.

Get Nats running using Docker

Get a Docker environment up and running. I'm using virtual Ubuntu host where I run my Docker containers. For this excercise, all we need is one Nats server. I'll expose the port so that I can access it from my Windows dev machine:

docker run -d -p 4222:4222 --name nats1 nats

Ta-da! Done. You now have a Nats server running. You can of course also use the demo.nats.io server as described here.

Create a long running consumer

The consumer will just connect and subscribe to the subject=foo. If it receives a PING it replies with a PONG, so that the server doesn't treat it as a Stale Connection.

var cancellation = new CancellationTokenSource();

using (var socket = SocketFactory.Create())
{
    socket.Connect("ubuntu01", 4222);
    //socket.SendUtf8("CONNECT {\"verbose\": false}\r\n");
    socket.SendUtf8("SUB foo c1\r\n");

    Console.WriteLine("Now consuming subject 'foo'. Hit key to stop.");

    var consumingTask = Consume(socket, cancellation.Token, op =>
    {
        Console.WriteLine("===== RECEIVED =====");
        Console.Write(op.GetAsString());

        if (op is PingOp)
            socket.SendUtf8("PONG\r\n");

        var msg = op as MsgOp;
        if (msg != null)
        {
            Console.WriteLine("===== MSG =====");
            Console.WriteLine($"Subject: {msg.Subject}");
            Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
            Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
            Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
        }
    });

    Console.ReadKey();
    cancellation.Cancel();
    consumingTask.Wait();

    socket.SendUtf8("UNSUB foo c1\r\n");
    socket.Close();
}

cancellation.Dispose();

If you don't want verbose output, like the server sending +OK to confirm ops, you could send a CONNECT {"verbose": false}\r\n message immediately after being connected.

socket.SendUtf8("CONNECT {\"verbose\": false}\r\n");

Now, lets look at the consume method which actually reads incoming payloads from the socket as they arrive. Most of the code is actually painful task cancellation stuff. But basically. Create a NetworkStream from the Socket, then use the OpParser and do something with the messages. In this case, just invoke onReceived.

private static Task Consume(
    Socket socket,
    CancellationToken cancellation,
    Action<IOp> onReceived)
{
    var opParser = new OpParser();

    return Task.Factory.StartNew(() =>
    {
        using (var ns = new NetworkStream(socket, FileAccess.Read, false))
        {
            Func<bool> hasData = () =>
                ns.DataAvailable &&
                !cancellation.IsCancellationRequested;

            while (!cancellation.IsCancellationRequested)
            {
                SpinWait.SpinUntil(() =>
                    ns.DataAvailable ||
                    cancellation.IsCancellationRequested);

                if (cancellation.IsCancellationRequested)
                    break;

                foreach (var op in opParser.Parse(ns, hasData)
                    .Take(2)
                    .TakeWhile(op => !cancellation.IsCancellationRequested))
                {
                    onReceived(op);
                }
            }
        }
    }, cancellation, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}

And a little extension method, since Nats wants UTF8 encoded messages:

public static class SocketExtensions
{
    public static void SendUtf8(this Socket socket, string data)
        => socket.Send(Encoding.UTF8.GetBytes(data));
}

With this in place, you can just start publishing messages to the subject foo and you should see it being shown in the consumer console. Silently answering to PING messages.

There's more to come. Like sending PING messages to the server to see it's still there. How to handle Request-Response messaging pattern. How to combine it with publishing over the same socket.

//Daniel

View Comments