Recently I just added support for .NET Core in MyNatsClient. And I also introduced the first of a few simplifying components that is coming to the MyNatsClient. All changes will be done to ease the construction of the upcoming NatsMessageBus
which offer messaging over NATS, using subject
conventions and message formatters and in-process routing etc. But this is something you can build today if you want. Lets have a look.
What is MyNatsClient?
The short answer is that MyNatsClient is the reactive .NET/.NET-Core client for NATS.io. You can find more motivations around it in the README.
What is NATS?
It's a message broker written in Go by the people from Apcera. Head over to nats.io and read up. There it states:
NATS Server is a simple, high performance open source messaging system for cloud native applications, IoT messaging, and microservices architectures.
They have also released a first version of NATS-Streaming, which offers durability etc. I'll soon start to work on a client for that as well.
What is NATSConsumer?
NatsConsumer
is a simple class that works on-top of a NatsClient
and simplifies the process of consuming messages from NATS servers. It makes it easier to subscribe by handling both the process of subscribing to the broker as well as to the in-process message stream of the underlying NatsClient
. It also helps with re-subscribing against the broker if the client goes down and reconnects etc.
Subscribe
var consumer = new NatsConsumer(client);
//Subscribe will create subscription both against the NATS-broker
//and the in-process observable message stream
consumer.Subscribe("mySubject", msg => {});
//or async
await consumer.SubscribeAsync("mysubject", msg => {});
You can also inject an IObserver<MsgOp>
so that you could inject logic for OnException
and OnCompleted
.
var observer = new DelegatingObserver<MsgOp>(
msg => {},
ex => {},
() => {}
);
consumer.Subscribe("mySubject", observer);
//or async
await consumer.SubscribeAsync("mysubject", observer);
Unsubscribe
To unsubscribe, you can do any of the following:
- Dispose the
IConsumerSubscription
returned by theconsumer.Subscribe
orconsumer.SubscribeAsync
methods. - Dispose the
NatsConsumer
and it will take care of the subscriptions. - Pass the
IConsumerSubscription
to any of theconsumer.Unsubscribe(subscription)
orconsumer.UnsubscribeAsync(subscription)
Auto unsubscribe
When using any of the Subscribe|SubscribeAsync
overloads that take a SubscriptionInfo
, then you can optionally specify maxMessages
, which will send an Unsub
to the broker with this argument for the subject, so that you only receive max number of messages on that subscription.
NOTE it's perfectly fine to do both e.g. subscription.Dispose
as well as consumer.Dispose
or e.g. consumer.Unsubscribe
and then subscription.Dispose
.
Simple sample
Lets look at a simple case. To simplify the code, I've left out the disposing using
statements.
install-package MyNatsClient
Remember if you want to make use of RX (reactive extensions) you can bring that in by instead installing MyNatsClient.Rx
install-package MyNatsClient.Rx
//Subject to where we will publish and consume messages from
const string subject = "test";
//NatsConsumer operates on a client, create one
var cnInfo = new ConnectionInfo(new Host("demo.nats.io"));
var client = new NatsClient(cnInfo);
//The consumer below will re-subscribe if the client reconnects
client.Connect();
//Define a handler we want for our messages
//You can of course have many different handlers
Action<MsgOp> handler = msg => {
Console.WriteLine("REC:" + msg.GetPayloadAsString());
};
//Create a consumer and subscribe.
//NOTE. Each consumer can handle many subscriptions
var consumer = new NatsConsumer(client);
await consume.SubscribeAsync(subject, handler);
//Publish some messages
while (true) {
Console.WriteLine("Send: ");
var message = Console.ReadLine();
if (string.IsNullOrWhiteSpace(message))
break;
await client.PubAsync(subject, message);
}
Lets throw in Routemeister and commands etc.
Now. Lets bring in some simple code for JSON message formatting and in-process routing etc. For this I will make use of:
install-package Routemeister
install-package Newtonsoft.Json
The idea is to send a command: MySample.Commands.RegisterStudent
:
public class RegisterStudent {
public string Student { get; set; }
public string Course { get; set; }
}
Using Routemeister we define a interface that will be implemented in one or more command handlers:
public interface ICommandHandler<in T> {
//Can be named whatever.
//Must return a Task
//Must take one class argument only
//Can have generic constraints
Task HandleAsync(T message);
}
The handler will just dump it:
public class StudentCommandHandlers
: ICommandHandler<RegisterStudent>
{
public Task HandleAsync(RegisterStudent cmd) {
Console.WriteLine($"Registrering {cmd.Student} to {cmd.Course}");
}
}
Lets tackle the dispatching etc.
//Use Routemeister to create routes from everything that is
//implementing one or more ICommandHandler<>
var factory = new MessageRouteFactory();
var routes = factory.Create(
typeof(Program).Assembly,
typeof(ICommandHandler<>));
//Pick a router and define how handlers should be created
var router = new SequentialAsyncMessageRouter(
(handlerType, envelope) => Activator.CreateInstance(handlerType),
routes);
//Define a MyNatsClient handler for the messages
Action<MsgOp> handler = async msg =>
{
var tmp = JsonConvert.DeserializeObject(
msg.GetPayloadAsString(),
Type.GetType(msg.Subject));
//Route the message in-process
await router.RouteAsync(tmp);
};
//Create a client and the consumer
var cnInfo = new ConnectionInfo(new Host("demo.nats.io"));
var client = new NatsClient("test", cnInfo);
var consumer = new NatsConsumer(client);
client.Connect();
//Use the Routemeister routes to register to NATS subjects using
//subject wildcards (> or *)
//for namespaces. You can subscriibe on a higher level like root namespace.
//That is up to you.
foreach (var ns in routes.Select(r => r.MessageType.Namespace).Distinct())
await consumer.SubscribeAsync($"{ns}.>", handler);
//Send some messages
while (true)
{
Console.WriteLine("Student:");
var student = Console.ReadLine();
if (string.IsNullOrWhiteSpace(student))
break;
Console.WriteLine("Course:");
var course = Console.ReadLine();
if (string.IsNullOrWhiteSpace(course))
break;
var cmd = new RegisterStudent {
Student = student,
Course = course
};
//Subscription above was to namespace of cmd(s)
//So, MySample.Commands.> and we publish to
//MySample.Commands.RegisterStudent
await client.PubAsync(
cmd.GetType().FullName,
JsonConvert.SerializeObject(cmd));
}
That's it. Some plumbing, but again. This is stuff that the upcoming message bus will take care of for you. Hope you can find it useful or got inspired to do something.
Cheers,
//Daniel