Time has come to package the previous writings and concepts up and introduce a NatsClient
. A client which lets you publish and subscribe to subjects and consume them using observables etc. We are still working against one single server node, and have not yet introduced cluster aware clients. But we will get there.
All posts in this series
- NATS, What a beautiful protocol
- Simple incoming OP parser for NATS in C#
- Using the OpParser with NATS to create a long running consumer in C#
- Continuing with C# and NATS, now looking at NatsObservable
- Time to construct a bundled NATS client for C#
Disclaimer this is still on a level where we are looking into ways of interacting with Nats from C#. When time comes there will be a packaged up GitHub repo and probably a NuGet that you can make use of. Can't wait? Look at the official one.
The NatsClient
The client makes use of the previous constructed NatsOpParser
and NatsObservable
to interact with a single server over a Socket
.
public class ConnectionInfo
{
public string ClientId { get; }
public string Hostname { get; }
public int Port { get; }
public bool Verbose { get; set; }
public ConnectionInfo(
string clientId, string hostname, int port)
{
ClientId = clientId;
Hostname = hostname;
Port = port;
}
}
public class NatsClient : IDisposable
{
private const string Crlf = "\r\n";
private readonly ConnectionInfo _connectionInfo;
private CancellationTokenSource _cancellation;
private Socket _socket;
private NetworkStream _readStream;
private NatsObservable _observable;
protected bool IsDisposed { get; private set; }
public string Id => _connectionInfo.ClientId;
public IObservable<IOp> Observable => _observable;
private NatsClient(
ConnectionInfo connectionInfo,
Socket socket,
NetworkStream readStream)
{
_connectionInfo = connectionInfo;
_socket = socket;
_cancellation = new CancellationTokenSource();
_readStream = readStream;
_observable = new NatsObservable(_readStream, _cancellation.Token);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
IsDisposed = true;
}
protected virtual void Dispose(bool disposing)
{
if (IsDisposed || !disposing)
return;
_cancellation?.Cancel();
_cancellation?.Dispose();
_cancellation = null;
_observable?.Dispose();
_observable = null;
_readStream?.Close();
_readStream?.Dispose();
_readStream = null;
_socket?.Close();
_socket?.Dispose();
_socket = null;
}
protected void ThrowIfDisposed()
{
if (IsDisposed)
throw new ObjectDisposedException(GetType().Name);
}
public static NatsClient Connect(ConnectionInfo connectionInfo)
{
var socket = SocketFactory.Create();
socket.Connect(connectionInfo.Hostname, connectionInfo.Port);
var readStream = new NetworkStream(socket, FileAccess.Read, false);
var opParser = new NatsOpParser();
var op = opParser.Parse(readStream, () => readStream.DataAvailable).First();
if (op == null)
throw new Exception("Expected response after CONNECT. Got none.");
if (!(op is InfoOp))
throw new Exception($"Expected response of INFO after CONNECT. Got {op.Code}");
socket.SendUtf8($"CONNECT {{\"verbose\": {connectionInfo.Verbose.ToString().ToLower()}}}{Crlf}");
return new NatsClient(connectionInfo, socket, readStream);
}
public void Pub(string subject, string data)
{
ThrowIfDisposed();
Send($"PUB {subject} {data.Length}{Crlf}{data}{Crlf}");
}
public void Sub(string subject, string subscriptionId)
{
ThrowIfDisposed();
Send($"SUB {subject} {subscriptionId}@{Id}{Crlf}");
}
public void UnSub(string subject)
{
ThrowIfDisposed();
Send($"UNSUB {subject} {Id}{Crlf}");
}
public void Pong()
{
ThrowIfDisposed();
Send($"PONG{Crlf}");
}
private void Send(string data)
{
_socket.SendUtf8(data);
}
}
The Consumer
The Consumer is getting smaller and smaller as infrastructure concepts are being abstracted away, so that we only get the Nats interactivity parts left. Using the NatsClient
would look like this:
var connectionInfo = new ConnectionInfo("myclient1", "ubuntu01", 4222);
using (var client = NatsClient.Connect(connectionInfo))
{
client.Observable.Subscribe(op =>
{
Console.WriteLine("===== RECEIVED =====");
Console.Write(op.GetAsString());
});
client.Observable.OfType<PingOp>().Subscribe(ping =>
client.Pong());
client.Observable.OfType<MsgOp>().Subscribe(msg =>
{
Console.WriteLine("===== MSG =====");
Console.WriteLine($"Subject: {msg.Subject}");
Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
});
client.Sub("foo", "s1");
client.Sub("foo", "s2");
Console.WriteLine("Hit key to stop consuming.");
Console.ReadKey();
}
Notice that we have two subscriptions against the subject "foo"
:
client.Sub("foo", "s1");
client.Sub("foo", "s2");
Also note that we have two observers that will react on incoming MsgOp
messages:
client.Observable.Subscribe(op =>
{
Console.WriteLine("===== RECEIVED =====");
Console.Write(op.GetAsString());
});
client.Observable.OfType<MsgOp>().Subscribe(msg =>
{
Console.WriteLine("===== MSG =====");
Console.WriteLine($"Subject: {msg.Subject}");
Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
});
With this in mind, if we fire up the sender and let it send and then the consuming application, we will see something like this:
What's next
We are now at a level where we can start focusing on concepts that aren't purely infrastructure related. Like looking into different messaging patterns or how-to create a message protocol. But of course, also some plumbing stuff like, how-to become cluster aware with automatic fail overs, etc.
Cheers,
//Daniel