danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Time to construct a bundled NatsClient for C#

Time has come to package the previous writings and concepts up and introduce a NatsClient. A client which lets you publish and subscribe to subjects and consume them using observables etc. We are still working against one single server node, and have not yet introduced cluster aware clients. But we will get there.

All posts in this series

Disclaimer this is still on a level where we are looking into ways of interacting with Nats from C#. When time comes there will be a packaged up GitHub repo and probably a NuGet that you can make use of. Can't wait? Look at the official one.

The NatsClient

The client makes use of the previous constructed NatsOpParser and NatsObservable to interact with a single server over a Socket.

public class ConnectionInfo
{
    public string ClientId { get; }
    public string Hostname { get; }
    public int Port { get; }
    public bool Verbose { get; set; }

    public ConnectionInfo(
        string clientId, string hostname, int port)
    {
        ClientId = clientId;
        Hostname = hostname;
        Port = port;
    }
}

public class NatsClient : IDisposable
{
    private const string Crlf = "\r\n";

    private readonly ConnectionInfo _connectionInfo;
    private CancellationTokenSource _cancellation;
    private Socket _socket;
    private NetworkStream _readStream;
    private NatsObservable _observable;

    protected bool IsDisposed { get; private set; }

    public string Id => _connectionInfo.ClientId;
    public IObservable<IOp> Observable => _observable;

    private NatsClient(
        ConnectionInfo connectionInfo,
        Socket socket,
        NetworkStream readStream)
    {
        _connectionInfo = connectionInfo;
        _socket = socket;
        _cancellation = new CancellationTokenSource();
        _readStream = readStream;
        _observable = new NatsObservable(_readStream, _cancellation.Token);
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
        IsDisposed = true;
    }

    protected virtual void Dispose(bool disposing)
    {
        if (IsDisposed || !disposing)
            return;

        _cancellation?.Cancel();
        _cancellation?.Dispose();
        _cancellation = null;

        _observable?.Dispose();
        _observable = null;

        _readStream?.Close();
        _readStream?.Dispose();
        _readStream = null;

        _socket?.Close();
        _socket?.Dispose();
        _socket = null;
    }

    protected void ThrowIfDisposed()
    {
        if (IsDisposed)
            throw new ObjectDisposedException(GetType().Name);
    }

    public static NatsClient Connect(ConnectionInfo connectionInfo)
    {
        var socket = SocketFactory.Create();
        socket.Connect(connectionInfo.Hostname, connectionInfo.Port);

        var readStream = new NetworkStream(socket, FileAccess.Read, false);
        var opParser = new NatsOpParser();
        var op = opParser.Parse(readStream, () => readStream.DataAvailable).First();
        if (op == null)
            throw new Exception("Expected response after CONNECT. Got none.");
        if (!(op is InfoOp))
            throw new Exception($"Expected response of INFO after CONNECT. Got {op.Code}");

        socket.SendUtf8($"CONNECT {{\"verbose\": {connectionInfo.Verbose.ToString().ToLower()}}}{Crlf}");

        return new NatsClient(connectionInfo, socket, readStream);
    }

    public void Pub(string subject, string data)
    {
        ThrowIfDisposed();

        Send($"PUB {subject} {data.Length}{Crlf}{data}{Crlf}");
    }

    public void Sub(string subject, string subscriptionId)
    {
        ThrowIfDisposed();

        Send($"SUB {subject} {subscriptionId}@{Id}{Crlf}");
    }

    public void UnSub(string subject)
    {
        ThrowIfDisposed();

        Send($"UNSUB {subject} {Id}{Crlf}");
    }

    public void Pong()
    {
        ThrowIfDisposed();

        Send($"PONG{Crlf}");
    }

    private void Send(string data)
    {
        _socket.SendUtf8(data);
    }
}

The Consumer

The Consumer is getting smaller and smaller as infrastructure concepts are being abstracted away, so that we only get the Nats interactivity parts left. Using the NatsClient would look like this:

var connectionInfo = new ConnectionInfo("myclient1", "ubuntu01", 4222);
using (var client = NatsClient.Connect(connectionInfo))
{
    client.Observable.Subscribe(op =>
    {
        Console.WriteLine("===== RECEIVED =====");
        Console.Write(op.GetAsString());
    });

    client.Observable.OfType<PingOp>().Subscribe(ping => 
        client.Pong());

    client.Observable.OfType<MsgOp>().Subscribe(msg =>
    {
        Console.WriteLine("===== MSG =====");
        Console.WriteLine($"Subject: {msg.Subject}");
        Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
        Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
        Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
    });

    client.Sub("foo", "s1");
    client.Sub("foo", "s2");
    Console.WriteLine("Hit key to stop consuming.");
    Console.ReadKey();
}

Notice that we have two subscriptions against the subject "foo":

client.Sub("foo", "s1");
client.Sub("foo", "s2");

Also note that we have two observers that will react on incoming MsgOp messages:

client.Observable.Subscribe(op =>
{
    Console.WriteLine("===== RECEIVED =====");
    Console.Write(op.GetAsString());
});

client.Observable.OfType<MsgOp>().Subscribe(msg =>
{
    Console.WriteLine("===== MSG =====");
    Console.WriteLine($"Subject: {msg.Subject}");
    Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
    Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
    Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
});

With this in mind, if we fire up the sender and let it send and then the consuming application, we will see something like this:

What's next

We are now at a level where we can start focusing on concepts that aren't purely infrastructure related. Like looking into different messaging patterns or how-to create a message protocol. But of course, also some plumbing stuff like, how-to become cluster aware with automatic fail overs, etc.

Cheers,

//Daniel

View Comments