I just published a post where I describe the OpParser
and which lets you consume downstream ops from a NATS server. On demand, lets show how to use it with a NATS server to get a long running subscriber.
All posts in this series
- NATS, What a beautiful protocol
- Simple incoming OP parser for NATS in C#
- Using the OpParser with NATS to create a long running consumer in C#
- Continuing with C# and NATS, now looking at NatsObservable
- Time to construct a bundled NATS client for C#
Disclaimer this is still on a level where we are looking into ways of interacting with NATS from C#. When time comes there will be a packaged up GitHub repo and probably a NuGet that you can make use of. Can't wait? Look at the official one.
Get Nats running using Docker
Get a Docker environment up and running. I'm using virtual Ubuntu host where I run my Docker containers. For this excercise, all we need is one Nats server. I'll expose the port so that I can access it from my Windows dev machine:
docker run -d -p 4222:4222 --name nats1 nats
Ta-da! Done. You now have a Nats server running. You can of course also use the demo.nats.io
server as described here.
Create a long running consumer
The consumer will just connect and subscribe to the subject=foo
. If it receives a PING
it replies with a PONG
, so that the server doesn't treat it as a Stale Connection
.
var cancellation = new CancellationTokenSource();
using (var socket = SocketFactory.Create())
{
socket.Connect("ubuntu01", 4222);
//socket.SendUtf8("CONNECT {\"verbose\": false}\r\n");
socket.SendUtf8("SUB foo c1\r\n");
Console.WriteLine("Now consuming subject 'foo'. Hit key to stop.");
var consumingTask = Consume(socket, cancellation.Token, op =>
{
Console.WriteLine("===== RECEIVED =====");
Console.Write(op.GetAsString());
if (op is PingOp)
socket.SendUtf8("PONG\r\n");
var msg = op as MsgOp;
if (msg != null)
{
Console.WriteLine("===== MSG =====");
Console.WriteLine($"Subject: {msg.Subject}");
Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
}
});
Console.ReadKey();
cancellation.Cancel();
consumingTask.Wait();
socket.SendUtf8("UNSUB foo c1\r\n");
socket.Close();
}
cancellation.Dispose();
If you don't want verbose output, like the server sending +OK
to confirm ops, you could send a CONNECT {"verbose": false}\r\n
message immediately after being connected.
socket.SendUtf8("CONNECT {\"verbose\": false}\r\n");
Now, lets look at the consume method which actually reads incoming payloads from the socket as they arrive. Most of the code is actually painful task cancellation stuff. But basically. Create a NetworkStream
from the Socket
, then use the OpParser
and do something with the messages. In this case, just invoke onReceived
.
private static Task Consume(
Socket socket,
CancellationToken cancellation,
Action<IOp> onReceived)
{
var opParser = new OpParser();
return Task.Factory.StartNew(() =>
{
using (var ns = new NetworkStream(socket, FileAccess.Read, false))
{
Func<bool> hasData = () =>
ns.DataAvailable &&
!cancellation.IsCancellationRequested;
while (!cancellation.IsCancellationRequested)
{
SpinWait.SpinUntil(() =>
ns.DataAvailable ||
cancellation.IsCancellationRequested);
if (cancellation.IsCancellationRequested)
break;
foreach (var op in opParser.Parse(ns, hasData)
.Take(2)
.TakeWhile(op => !cancellation.IsCancellationRequested))
{
onReceived(op);
}
}
}
}, cancellation, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
And a little extension method, since Nats wants UTF8 encoded messages
:
public static class SocketExtensions
{
public static void SendUtf8(this Socket socket, string data)
=> socket.Send(Encoding.UTF8.GetBytes(data));
}
With this in place, you can just start publishing messages to the subject foo
and you should see it being shown in the consumer console. Silently answering to PING messages.
There's more to come. Like sending PING
messages to the server to see it's still there. How to handle Request-Response
messaging pattern. How to combine it with publishing over the same socket.
//Daniel