After my last performance tweaks (read more) I've finally become satisfied and have chosen to put MyNatsClient out on NuGet. There are some changes coming, like removing dependency on Newtonsoft's JSON.Net and adding support for SSL, but it's out and it's simple, fast and RX friendly!
Why a new one when there's an official project?
Because I wanted one that leaves .Net4.0 behind and therefore offers async
constructs. And I also wanted one that is reactive and supports ReactiveExtensions, by being based around IObservable<T>
. And I also wanted to keep as much of the domain language of NATS as possible, but not strictly be following/limited to the APIs of other NATS client, but instead offer one that fits the .NET domain.
Usage
The documentation can be found in README.md on GitHub, but lets show how to get started. Most API methods has one synchronous version and one asynchronous version.
First install the package:
Install-Package MyNatsClient
Then you need an instance of a NatsClient
. It will require a ConnectionInfo
, so lets look at that:
var cnInfo = new ConnectionInfo(new Host("192.168.1.176", 4222));
Now, we are ready to create a client:
var client = new NatsClient("fooBarClientId", cnInfo);
Consumer
If we want to consume incoming messages, we need to observe them. MyNatsClient
makes use of IObservable<T>
, so you could take advantage of Reactive Extensions.
client.MsgOpStream.Subscribe(msg =>
{
Console.WriteLine("===== MSG =====");
Console.WriteLine($"Subject: {msg.Subject}");
Console.WriteLine($"ReplyTo: {msg.ReplyTo}");
Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
Console.WriteLine($"Payload: {msg.GetPayloadAsString()");
});
The Subscribe
method returns an IDisposable
, calling Dispose()
would terminate the in-process subscription:
var inProcSub = client.MsgOpStream.Subscribe(handler);
inProcSub.Dispose(); //The handler will not be called
Now you need to tell the NATS-server that you want to subscribe to (listen on) messages on one or more subjects
:
client.Sub("Subject1", "sid1");
client.Sub("Subject2", "sid2");
await client.SubAsync("Subject3", "sid3");
await client.SubAsync("Subject4", "sid4");
This would make your client receive a copy of any message published to any of the subjects: "Subject1, Subject2, Subject3, Subject4".
Finally, you need to Connect
so that the Socket
gets created and a connection is established to the remote NATS server.
client.Connect();
Publisher
NATS supports publish-subscribe messaging patterns, but you can also do request-reply kind of messaging. You can also create consumer groups and there by share the load within a certain subject consumer group, so only one of them will receive it.
To publish a message, you use any of the Pub
or PubAsync
methods:
client.Pub("Subject1", body);
await client.PubAsync("Subject1", body);
client.PubMany(async p => {
p.Pub("Subject2", body);
p.PubAsync("Subject2", body);
});
Stop receiving messages
To stop receiving messages, you make use of UnSub
. You can pass an optional argument specifying the number of messages to wait for before automatically unsubscribing.
client.UnSub("sid1");
client.UnSub("sid2", 3); //Unsub automatically after three messages
await client.UnSub("sid3");
await client.UnSub("sid4", 3);
Am I limited to UTF8?
No you are not. There's an overload of client.Pub
that takes body:byte[]
and that is not touched anywhere. So to this you could potentially pass a Protobuf encoded byte array. Just ensure that you parse it correctly when you receive it as MsgOp
in the subscriber.
Metrics
Read more about this in previous post.
Environment
Everything is running on a physical Windows 10 64bit, 32GB RAM i7-4790K quad core computer.
The Sender console and Consumer console is running as separate processes on this machine.
The NATS server is running on the same machine, in a Docker container, on an Ubuntu 14.04.4 LTS instance, with 8GB RAM, 2 virtual cores, via Hyper-V.
Results
- The (old) metrics are old values for MyNatsClient after tweaks.
- The (new) metrics are the current values (
v0.1.0
) for MyNatsClient. - The (offical client) metrics are the values for
v0.5.0
.
That's it for now. Happy messaging.
Cheers,
//Daniel