danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Request and response with C# and NATS

I'm currently adding support for request-response message pattern in MyNatsClient. And as it turns out, it's really simple, much thanks to having it being based around Observables and thereby supporting Reactive Extensions. And, Yes. It does support DotNetCore.

This will soon reach the client itself but for now lets look at how easy it would be to set something up. To keep it simple I'll stick to request and response of pure strings.

Getting connected

This is the same for both ends. Both the requesting and as well as the replying end.

var cnInfo = new ConnectionInfo(new Host("192.168.2.20"));
var client = new NatsClient("testid", cnInfo);
client.Connect();

The Replying server

The server listens to subject "getTemp" and uses an external service to retrieve the temperature for a country code and city.

client.MsgOpStream
  .Where(msg => msg.Subject == "getTemp")
  .Subscribe(msg =>
{
  var parts = msg.GetPayloadAsString().Split('@');
  var city = parts[0];
  var country = parts[1];
  _client.Pub(msg.ReplyTo, $"Temp is {TempService.Get(city, country)}C");
});
client.Sub("getTemp", "someSubscriptionId");

The Requesting client (the manual way)

NATS has the concept of being able to say: replyTo="mySubject" and then you have the sender subcribe to the subject you pass in the replyTo param. You can also have it auto unsubscribe after receiving e.g. one message on this subject, which then completes the request-response message loop. This would look something like this:

//Get a unique request id that we use as reply to
var requestId = Guid.NewGuid().ToString("N");

//Subscribe to the in-process message stream
//and act on the reply
client.MsgOpStream
  .Where(msg => msg.Subject == requestId)
  .Subscribe(msg =>
  {
    Console.WriteLine(
      $"Got reply on request: '{msg.GetPayloadAsString()}'");
  });

//Subscribe the client to the subject
client.Sub(requestId, "someOtherSubscriptionId");

//Auto unsub after 1 reply
client.Unsub("someOtherSubscriptionId", 1);

//Send the request, using requestId as replyTo
client.Pub("getTemp", "STOCKHOLM@SWEDEN", requestId);

The Requesting client (a better way)

The above solution has one major flaw, which is the fact that the NATS-server will need to manage one subscription for each request. To get rid of this, the idea is to make use of NATS wildcard subcription possibilites. With this knowledge we can setup ONE subscription that is for the NatsClient and have sub-subjects for each request.

NatsRequester a simple abstraction

This simple abstraction gives you an idea of how this will be implemented in the client. The sample is simplified, e.g. using only strings.

public class NatsRequester : IDisposable
{
  private readonly string _clientInbox;
  private readonly INatsClient _client;

  public NatsRequester(INatsClient client)
  {
    _client = client;
    _clientInbox = Guid.NewGuid().ToString("N");
    _client.Sub($"{_clientInbox}.>", _clientInbox);
  }

  public async Task<string> RequestAsync(string subject, string body)
  {
    var requestId = Guid.NewGuid().ToString("N");
    var subject = $"{_clientInbox}.{requestId}";
    var taskComp = new TaskCompletionSource<string>();
    _client.MsgOpStream
      .Where(msg => msg.Subject == subject)
      .Take(1)
      .Subscribe(msg =>
      {
        taskComp.SetResult(msg.GetPayloadAsString());
      });

    await _client.PubAsync(subject, body, subject);

    return await taskComp.Task;
  }

  public void Dispose()
  {
    _client?.Unsub(_clientInbox);
  }
}

Issuing requests

Now we can make many requests, and only have one subcription against the NATS-server.

using (var requester = new NatsRequester(_client))
{
  while (true)
  {
    Console.WriteLine("Query? (y=yes;n=no)");
    if (Console.ReadKey().KeyChar == 'n')
      break;

    Console.WriteLine();

    var result = await requester.RequestAsync(
      "getTemp", "STOCKHOLM@SWEDEN");

    Console.WriteLine($"Got reply: {result}");
  }
}

Hope it gives you an idea of how the simplicity of NATS and the flexibility of MyNatsClient can be combined to provide effective solutions.

Cheers,

//Daniel

View Comments