danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Introducing NATSConsumer and .NET Core support

Recently I just added support for .NET Core in MyNatsClient. And I also introduced the first of a few simplifying components that is coming to the MyNatsClient. All changes will be done to ease the construction of the upcoming NatsMessageBus which offer messaging over NATS, using subject conventions and message formatters and in-process routing etc. But this is something you can build today if you want. Lets have a look.

What is MyNatsClient?

The short answer is that MyNatsClient is the reactive .NET/.NET-Core client for NATS.io. You can find more motivations around it in the README.

What is NATS?

It's a message broker written in Go by the people from Apcera. Head over to nats.io and read up. There it states:

NATS Server is a simple, high performance open source messaging system for cloud native applications, IoT messaging, and microservices architectures.

They have also released a first version of NATS-Streaming, which offers durability etc. I'll soon start to work on a client for that as well.

What is NATSConsumer?

NatsConsumer is a simple class that works on-top of a NatsClient and simplifies the process of consuming messages from NATS servers. It makes it easier to subscribe by handling both the process of subscribing to the broker as well as to the in-process message stream of the underlying NatsClient. It also helps with re-subscribing against the broker if the client goes down and reconnects etc.

Subscribe

var consumer = new NatsConsumer(client);

//Subscribe will create subscription both against the NATS-broker
//and the in-process observable message stream
consumer.Subscribe("mySubject", msg => {});

//or async
await consumer.SubscribeAsync("mysubject", msg => {});

You can also inject an IObserver<MsgOp> so that you could inject logic for OnException and OnCompleted.

var observer = new DelegatingObserver<MsgOp>(
    msg => {},
    ex => {},
    () => {}
);

consumer.Subscribe("mySubject", observer);

//or async
await consumer.SubscribeAsync("mysubject", observer);

Unsubscribe

To unsubscribe, you can do any of the following:

Auto unsubscribe

When using any of the Subscribe|SubscribeAsyncoverloads that take a SubscriptionInfo, then you can optionally specify maxMessages, which will send an Unsub to the broker with this argument for the subject, so that you only receive max number of messages on that subscription.

NOTE it's perfectly fine to do both e.g. subscription.Dispose as well as consumer.Dispose or e.g. consumer.Unsubscribe and then subscription.Dispose.

Simple sample

Lets look at a simple case. To simplify the code, I've left out the disposing using statements.

install-package MyNatsClient

Remember if you want to make use of RX (reactive extensions) you can bring that in by instead installing MyNatsClient.Rx

install-package MyNatsClient.Rx
//Subject to where we will publish and consume messages from
const string subject = "test";

//NatsConsumer operates on a client, create one
var cnInfo = new ConnectionInfo(new Host("demo.nats.io"));
var client = new NatsClient(cnInfo);

//The consumer below will re-subscribe if the client reconnects
client.Connect();

//Define a handler we want for our messages
//You can of course have many different handlers
Action<MsgOp> handler = msg => {
  Console.WriteLine("REC:" + msg.GetPayloadAsString());
};

//Create a consumer and subscribe.
//NOTE. Each consumer can handle many subscriptions
var consumer = new NatsConsumer(client);
await consume.SubscribeAsync(subject, handler);

//Publish some messages
while (true) {
  Console.WriteLine("Send: ");
  var message = Console.ReadLine();
  if (string.IsNullOrWhiteSpace(message))
    break;

  await client.PubAsync(subject, message);
}

Lets throw in Routemeister and commands etc.

Now. Lets bring in some simple code for JSON message formatting and in-process routing etc. For this I will make use of:

install-package Routemeister
install-package Newtonsoft.Json

The idea is to send a command: MySample.Commands.RegisterStudent:

public class RegisterStudent {
  public string Student { get; set; }
  public string Course { get; set; }
}

Using Routemeister we define a interface that will be implemented in one or more command handlers:

public interface ICommandHandler<in T> {
    //Can be named whatever.
    //Must return a Task
    //Must take one class argument only
    //Can have generic constraints
    Task HandleAsync(T message);
}

The handler will just dump it:

public class StudentCommandHandlers
  : ICommandHandler<RegisterStudent>
{
  public Task HandleAsync(RegisterStudent cmd) {
    Console.WriteLine($"Registrering {cmd.Student} to {cmd.Course}");
  }
}  

Lets tackle the dispatching etc.

//Use Routemeister to create routes from everything that is
//implementing one or more ICommandHandler<>
var factory = new MessageRouteFactory();
var routes = factory.Create(
  typeof(Program).Assembly,
  typeof(ICommandHandler<>));

//Pick a router and define how handlers should be created
var router = new SequentialAsyncMessageRouter(
  (handlerType, envelope) => Activator.CreateInstance(handlerType),
  routes);

//Define a MyNatsClient handler for the messages
Action<MsgOp> handler = async msg =>
{
  var tmp = JsonConvert.DeserializeObject(
    msg.GetPayloadAsString(),
    Type.GetType(msg.Subject));
  
  //Route the message in-process
  await router.RouteAsync(tmp);
};

//Create a client and the consumer
var cnInfo = new ConnectionInfo(new Host("demo.nats.io"));
var client = new NatsClient("test", cnInfo);
var consumer = new NatsConsumer(client);
client.Connect();

//Use the Routemeister routes to register to NATS subjects using
//subject wildcards (> or *)
//for namespaces. You can subscriibe on a higher level like root namespace.
//That is up to you.
foreach (var ns in routes.Select(r => r.MessageType.Namespace).Distinct())
  await consumer.SubscribeAsync($"{ns}.>", handler);

//Send some messages
while (true)
{
  Console.WriteLine("Student:");
  var student = Console.ReadLine();
  if (string.IsNullOrWhiteSpace(student))
    break;

  Console.WriteLine("Course:");
  var course = Console.ReadLine();
  if (string.IsNullOrWhiteSpace(course))
    break;

  var cmd = new RegisterStudent {
    Student = student,
    Course = course
  };

  //Subscription above was to namespace of cmd(s)
  //So, MySample.Commands.> and we publish to
  //MySample.Commands.RegisterStudent
  await client.PubAsync(
    cmd.GetType().FullName,
    JsonConvert.SerializeObject(cmd));
}

That's it. Some plumbing, but again. This is stuff that the upcoming message bus will take care of for you. Hope you can find it useful or got inspired to do something.

Cheers,

//Daniel

View Comments