Instead of a big blog post I'll continue to write smaller parts in my journey with Nats. Feels like it rimes better with the philosophy of Nats. So, this time, lets look at a simple parser that parses a downward stream from a Nats server to a client in C#.
The implementations shown will change in future posts and change before everything is packaged up as a "client", with support for observables and Rx, hosted on GitHub. For now, it's just me learning and having fun.
All posts in this series
- NATS, What a beautiful protocol
- Simple incoming OP parser for NATS in C#
- Using the OpParser with NATS to create a long running consumer in C#
- Continuing with C# and NATS, now looking at NatsObservable
- Time to construct a bundled NATS client for C#
The protocol of NATS is really simple to understand (previous post) and the downwards op that a client needs to parse are few:
- INFO
- PING
- PONG
- MSG
- +OK
- -ERR
In order to just test this out without actually running against a NATS server, I'll make the OpParser
work against a Stream
. The consumer of the OpParser
will provide a strategy for letting the parser know when there's more data or not. When using a MemoryStream
you could e.g just try and peek at a char further ahead using PeekChar
, or look at the current position compared to the total length. But when using NetworkStream
this is not possible. But then we can make use of the stream.DataAvailable
property.
The parser works with iterators and yields IOp
implementations. Either:
- InfoOp
- PingOp
- PongOp
- MsgOp
- OkOp
- ErrOp
The IOp
is really simple:
public interface IOp
{
string Code { get; }
string GetAsString();
}
Since the parser yields operators per request, and works against a stream, you can read for instance two first (Take(2)
) , then one (Take(1)
), then the rest.
Lets fake some downstream ops from a Nats server by the use of a MemoryStream
:
var opParser = new OpParser();
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(
"INFO {\"server_id\":\"H8RgvFtiq2zlQTA5dB0deh\"}\r\n"
+ "+OK\r\n"
+ "MSG foo siddw1 4\r\ntest\r\n"
+ "+OK\r\n"
+ "MSG foo grp1 siddw1 4\r\ntest\r\n"
+ "PING\r\n"
+ "MSG foo grp1 siddw1 4\r\ntest\r\n"
+ "PONG\r\n"
+ "-ERR 'Unknown Protocol Operation'\r\n")))
{
Func<bool> hasData = () => stream.Position < stream.Length;
Console.WriteLine("===== TAKE 2 =====");
foreach (var op in opParser.Parse(stream, hasData).Take(2))
Console.Write(op.GetAsString());
Console.WriteLine("===== TAKE 1 =====");
foreach (var op in opParser.Parse(stream, hasData).Take(1))
Console.Write(op.GetAsString());
Console.WriteLine("===== All the remaining =====");
foreach (var op in opParser.Parse(stream, hasData))
Console.Write(op.GetAsString());
}
Run this and you should see:
Lets look at MsgOp
using OfType
:
var opParser = new OpParser();
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(
"INFO {\"server_id\":\"H8RgvFtiq2zlQTA5dB0deh\"}\r\n"
+ "+OK\r\n"
+ "MSG foo siddw1 4\r\ntest\r\n"
+ "+OK\r\n"
+ "MSG foo grp1 siddw1 4\r\ntest\r\n"
+ "PING\r\n"
+ "MSG foo grp1 siddw1 4\r\ntest\r\n"
+ "PONG\r\n"
+ "-ERR 'Unknown Protocol Operation'\r\n")))
{
Func<bool> hasData = () => stream.Position < stream.Length;
foreach (var op in opParser.Parse(stream, hasData).OfType<MsgOp>())
{
Console.WriteLine("===== MSG =====");
Console.WriteLine($"Subject: {op.Subject}");
Console.WriteLine($"QueueGroup: {op.QueueGroup}");
Console.WriteLine($"SubscriptionId: {op.SubscriptionId}");
Console.WriteLine($"Payload: {Encoding.UTF8.GetString(op.Payload)}");
}
}
Running this would give:
To get this to work, you of course need the different IOp
implementations:
public class InfoOp : IOp
{
public string Code => "INFO";
public string Message { get; }
public InfoOp(string message)
{
Message = message;
}
public string GetAsString()
{
return $"INFO {Message}\r\n";
}
}
public class PingOp : IOp
{
public static readonly PingOp Instance = new PingOp();
public string Code => "PING";
private PingOp() { }
public string GetAsString()
{
return "PING\r\n";
}
}
public class PongOp : IOp
{
public static readonly PongOp Instance = new PongOp();
public string Code => "PONG";
private PongOp() { }
public string GetAsString()
{
return "PONG\r\n";
}
}
public class MsgOp : IOp
{
public string Code => "MSG";
public string Subject { get; }
public string QueueGroup { get; }
public string SubscriptionId { get; }
public byte[] Payload { get; }
private readonly int _size;
public MsgOp(
string subject,
string subscriptionId,
byte[] payload,
string queueGroup = null)
{
Subject = subject;
QueueGroup = queueGroup;
SubscriptionId = subscriptionId;
Payload = payload;
_size = 3 + //MSG
1 + //BLANK
Subject.Length +
1 + //BLANK
(QueueGroup?.Length + 1 ?? 0) + //Optinal GRP + BLANK
SubscriptionId.Length +
1 + //BLANK
Payload.Length.ToString().Length +
2 + //CRLF
Payload.Length +
2; //CRLF
}
public string GetPayloadAsString()
{
return Encoding.UTF8.GetString(Payload);
}
public string GetAsString()
{
var sb = new StringBuilder(_size);
sb.Append("MSG");
sb.Append(" ");
sb.Append(Subject);
sb.Append(" ");
if (QueueGroup != null)
{
sb.Append(QueueGroup);
sb.Append(" ");
}
sb.Append(SubscriptionId);
sb.Append(" ");
sb.Append(Payload.Length);
sb.Append("\r\n");
sb.Append(GetPayloadAsString());
sb.Append("\r\n");
return sb.ToString();
}
}
public class OkOp : IOp
{
public static readonly OkOp Instance = new OkOp();
public string Code => "+OK";
private OkOp() { }
public string GetAsString() => "+OK\r\n";
}
public class ErrOp : IOp
{
public string Code => "-ERR";
public string Message { get; }
public ErrOp(string message)
{
Message = message;
}
public string GetAsString()
{
return $"-ERROR {Message}\r\n";
}
}
Finally the OpParser
it self:
public class OpParser
{
private static readonly Dictionary<string, Func<BinaryReader, IOp>> Ops;
private const char DelimMarker = ' ';
private const char Cr = '\r';
private const char Lf = '\n';
static OpParser()
{
Ops = new Dictionary<string, Func<BinaryReader, IOp>>
{
{ "+OK", ParseOkOp },
{ "MSG", ParseMsgOp },
{ "-ERR", ParseErrorOp },
{ "INFO", ParseInfoOp },
{ "PING", ParsePingOp },
{ "PONG", ParsePongOp }
};
}
public IEnumerable<IOp> Parse(Stream stream, Func<bool> hasData)
{
if (!hasData())
yield break;
var opMarkerChars = new List<char>();
using (var reader = new BinaryReader(stream, Encoding.UTF8, true))
{
while (hasData())
{
var c = reader.ReadChar();
if (c != DelimMarker && c != Cr)
{
opMarkerChars.Add(c);
continue;
}
var op = new string(opMarkerChars.ToArray());
opMarkerChars.Clear();
if (Ops.ContainsKey(op))
yield return Ops[op](reader);
else
throw CreateUnsupportedOpException(op);
}
reader.Close();
}
}
private static Exception CreateUnsupportedOpException(string foundOp)
=> new Exception($"Unsupported OP, don't know how to parse OP '{foundOp}'.");
private static Exception CreateParserException(string op, char expected, char got)
=> new Exception($"Error while parsing {op}. Expected char code
'{(byte)expected}' got '{(byte)got}'.");
private static InfoOp ParseInfoOp(BinaryReader reader)
{
var msg = new StringBuilder();
while (true)
{
var c = reader.ReadChar();
if (c == Cr)
{
var burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(InfoOp), Lf, burn);
break;
}
msg.Append(c);
}
return new InfoOp(msg.ToString());
}
private static OkOp ParseOkOp(BinaryReader reader)
{
var burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(OkOp), Lf, burn);
return OkOp.Instance;
}
private static PingOp ParsePingOp(BinaryReader reader)
{
var burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(PingOp), Lf, burn);
return PingOp.Instance;
}
private static PongOp ParsePongOp(BinaryReader reader)
{
var burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(PongOp), Lf, burn);
return PongOp.Instance;
}
private static ErrOp ParseErrorOp(BinaryReader reader)
{
var msg = new StringBuilder();
while (true)
{
var c = reader.ReadChar();
if (c == Cr)
{
var burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(ErrOp), Lf, burn);
break;
}
msg.Append(c);
}
return new ErrOp(msg.ToString());
}
private static MsgOp ParseMsgOp(BinaryReader reader)
{
var segments = new List<char[]>();
var segment = new List<char>();
int payloadSize;
char burn;
while (true)
{
var c = reader.ReadChar();
if (c == Cr)
{
payloadSize = int.Parse(new string(segment.ToArray()));
burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(MsgOp), Lf, burn);
break;
}
if (c != DelimMarker)
segment.Add(c);
else
{
segments.Add(segment.ToArray());
segment.Clear();
}
}
var msg = new MsgOp(
new string(segments.First()),
new string(segments.Last()),
reader.ReadBytes(payloadSize),
segments.Count > 2 ? new string(segments[1]) : null);
burn = reader.ReadChar();
if (burn != Cr)
throw CreateParserException(nameof(MsgOp), Cr, burn);
burn = reader.ReadChar();
if (burn != Lf)
throw CreateParserException(nameof(MsgOp), Lf, burn);
return msg;
}
}
That's it. There's more to come really soon.
//Daniel