danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Introducing MyNatsClient

After my last performance tweaks (read more) I've finally become satisfied and have chosen to put MyNatsClient out on NuGet. There are some changes coming, like removing dependency on Newtonsoft's JSON.Net and adding support for SSL, but it's out and it's simple, fast and RX friendly!

Why a new one when there's an official project?

Because I wanted one that leaves .Net4.0 behind and therefore offers async constructs. And I also wanted one that is reactive and supports ReactiveExtensions, by being based around IObservable<T>. And I also wanted to keep as much of the domain language of NATS as possible, but not strictly be following/limited to the APIs of other NATS client, but instead offer one that fits the .NET domain.

Usage

The documentation can be found in README.md on GitHub, but lets show how to get started. Most API methods has one synchronous version and one asynchronous version.

First install the package:

Install-Package MyNatsClient

Then you need an instance of a NatsClient. It will require a ConnectionInfo, so lets look at that:

var cnInfo = new ConnectionInfo(new Host("192.168.1.176", 4222));

Now, we are ready to create a client:

var client = new NatsClient("fooBarClientId", cnInfo);

Consumer

If we want to consume incoming messages, we need to observe them. MyNatsClient makes use of IObservable<T>, so you could take advantage of Reactive Extensions.

client.MsgOpStream.Subscribe(msg =>
{
    Console.WriteLine("===== MSG =====");
    Console.WriteLine($"Subject: {msg.Subject}");
    Console.WriteLine($"ReplyTo: {msg.ReplyTo}");
    Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
    Console.WriteLine($"Payload: {msg.GetPayloadAsString()");
});

The Subscribe method returns an IDisposable, calling Dispose() would terminate the in-process subscription:

var inProcSub = client.MsgOpStream.Subscribe(handler);
inProcSub.Dispose(); //The handler will not be called

Now you need to tell the NATS-server that you want to subscribe to (listen on) messages on one or more subjects:

client.Sub("Subject1", "sid1");
client.Sub("Subject2", "sid2");
await client.SubAsync("Subject3", "sid3");
await client.SubAsync("Subject4", "sid4");

This would make your client receive a copy of any message published to any of the subjects: "Subject1, Subject2, Subject3, Subject4".

Finally, you need to Connect so that the Socket gets created and a connection is established to the remote NATS server.

client.Connect();

Publisher

NATS supports publish-subscribe messaging patterns, but you can also do request-reply kind of messaging. You can also create consumer groups and there by share the load within a certain subject consumer group, so only one of them will receive it.

To publish a message, you use any of the Pub or PubAsync methods:

client.Pub("Subject1", body);
await client.PubAsync("Subject1", body);

client.PubMany(async p => {
    p.Pub("Subject2", body);
    p.PubAsync("Subject2", body);
});

Stop receiving messages

To stop receiving messages, you make use of UnSub. You can pass an optional argument specifying the number of messages to wait for before automatically unsubscribing.

client.UnSub("sid1");
client.UnSub("sid2", 3); //Unsub automatically after three messages
await client.UnSub("sid3");
await client.UnSub("sid4", 3);

Am I limited to UTF8?

No you are not. There's an overload of client.Pub that takes body:byte[] and that is not touched anywhere. So to this you could potentially pass a Protobuf encoded byte array. Just ensure that you parse it correctly when you receive it as MsgOp in the subscriber.

Metrics

Read more about this in previous post.

Environment

Everything is running on a physical Windows 10 64bit, 32GB RAM i7-4790K quad core computer.

The Sender console and Consumer console is running as separate processes on this machine.

The NATS server is running on the same machine, in a Docker container, on an Ubuntu 14.04.4 LTS instance, with 8GB RAM, 2 virtual cores, via Hyper-V.

Results

That's it for now. Happy messaging.

Cheers,

//Daniel

View Comments