danielwertheim

danielwertheim


notes from a passionate developer

Developer that lives by the mantra "code is meant to be shared".

Share


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Simple incoming OP parser for Nats in C#

Daniel WertheimDaniel Wertheim

Instead of a big blog post I'll continue to write smaller parts in my journey with Nats. Feels like it rimes better with the philosophy of Nats. So, this time, lets look at a simple parser that parses a downward stream from a Nats server to a client in C#.

The implementations shown will change in future posts and change before everything is packaged up as a "client", with support for observables and Rx, hosted on GitHub. For now, it's just me learning and having fun.

All posts in this series

The protocol of NATS is really simple to understand (previous post) and the downwards op that a client needs to parse are few:

In order to just test this out without actually running against a NATS server, I'll make the OpParser work against a Stream. The consumer of the OpParser will provide a strategy for letting the parser know when there's more data or not. When using a MemoryStream you could e.g just try and peek at a char further ahead using PeekChar, or look at the current position compared to the total length. But when using NetworkStream this is not possible. But then we can make use of the stream.DataAvailable property.

The parser works with iterators and yields IOp implementations. Either:

The IOp is really simple:

public interface IOp  
{
    string Code { get; }
    string GetAsString();
}

Since the parser yields operators per request, and works against a stream, you can read for instance two first (Take(2)) , then one (Take(1)), then the rest.

Lets fake some downstream ops from a Nats server by the use of a MemoryStream:

var opParser = new OpParser();  
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(  
    "INFO {\"server_id\":\"H8RgvFtiq2zlQTA5dB0deh\"}\r\n"
    + "+OK\r\n"
    + "MSG foo siddw1 4\r\ntest\r\n"
    + "+OK\r\n"
    + "MSG foo grp1 siddw1 4\r\ntest\r\n"
    + "PING\r\n"
    + "MSG foo grp1 siddw1 4\r\ntest\r\n"
    + "PONG\r\n"
    + "-ERR 'Unknown Protocol Operation'\r\n")))
{
    Func<bool> hasData = () => stream.Position < stream.Length;

    Console.WriteLine("===== TAKE 2 =====");
    foreach (var op in opParser.Parse(stream, hasData).Take(2))
        Console.Write(op.GetAsString());

    Console.WriteLine("===== TAKE 1 =====");
    foreach (var op in opParser.Parse(stream, hasData).Take(1))
        Console.Write(op.GetAsString());

    Console.WriteLine("===== All the remaining =====");
    foreach (var op in opParser.Parse(stream, hasData))
        Console.Write(op.GetAsString());
}

Run this and you should see:

Lets look at MsgOp using OfType:

var opParser = new OpParser();  
using (var stream = new MemoryStream(Encoding.UTF8.GetBytes(  
    "INFO {\"server_id\":\"H8RgvFtiq2zlQTA5dB0deh\"}\r\n"
    + "+OK\r\n"
    + "MSG foo siddw1 4\r\ntest\r\n"
    + "+OK\r\n"
    + "MSG foo grp1 siddw1 4\r\ntest\r\n"
    + "PING\r\n"
    + "MSG foo grp1 siddw1 4\r\ntest\r\n"
    + "PONG\r\n"
    + "-ERR 'Unknown Protocol Operation'\r\n")))
{
    Func<bool> hasData = () => stream.Position < stream.Length;
    foreach (var op in opParser.Parse(stream, hasData).OfType<MsgOp>())
    {
        Console.WriteLine("===== MSG =====");
        Console.WriteLine($"Subject: {op.Subject}");
        Console.WriteLine($"QueueGroup: {op.QueueGroup}");
        Console.WriteLine($"SubscriptionId: {op.SubscriptionId}");
        Console.WriteLine($"Payload: {Encoding.UTF8.GetString(op.Payload)}");
    }
}

Running this would give:

To get this to work, you of course need the different IOp implementations:

public class InfoOp : IOp  
{
    public string Code => "INFO";
    public string Message { get; }

    public InfoOp(string message)
    {
        Message = message;
    }

    public string GetAsString()
    {
        return $"INFO {Message}\r\n";
    }
}

public class PingOp : IOp  
{
    public static readonly PingOp Instance = new PingOp();

    public string Code => "PING";

    private PingOp() { }

    public string GetAsString()
    {
        return "PING\r\n";
    }
}

public class PongOp : IOp  
{
    public static readonly PongOp Instance = new PongOp();

    public string Code => "PONG";

    private PongOp() { }

    public string GetAsString()
    {
        return "PONG\r\n";
    }
}

public class MsgOp : IOp  
{
    public string Code => "MSG";
    public string Subject { get; }
    public string QueueGroup { get; }
    public string SubscriptionId { get; }
    public byte[] Payload { get; }

    private readonly int _size;

    public MsgOp(
        string subject,
        string subscriptionId,
        byte[] payload,
        string queueGroup = null)
    {
        Subject = subject;
        QueueGroup = queueGroup;
        SubscriptionId = subscriptionId;
        Payload = payload;
        _size = 3 + //MSG
                1 + //BLANK
                Subject.Length +
                1 + //BLANK
                (QueueGroup?.Length + 1 ?? 0) + //Optinal GRP + BLANK
                SubscriptionId.Length +
                1 + //BLANK
                Payload.Length.ToString().Length +
                2 + //CRLF
                Payload.Length +
                2; //CRLF
    }

    public string GetPayloadAsString()
    {
        return Encoding.UTF8.GetString(Payload);
    }

    public string GetAsString()
    {
        var sb = new StringBuilder(_size);
        sb.Append("MSG");
        sb.Append(" ");
        sb.Append(Subject);
        sb.Append(" ");
        if (QueueGroup != null)
        {
            sb.Append(QueueGroup);
            sb.Append(" ");
        }

        sb.Append(SubscriptionId);
        sb.Append(" ");
        sb.Append(Payload.Length);
        sb.Append("\r\n");
        sb.Append(GetPayloadAsString());
        sb.Append("\r\n");

        return sb.ToString();
    }
}

public class OkOp : IOp  
{
    public static readonly OkOp Instance = new OkOp();

    public string Code => "+OK";

    private OkOp() { }

    public string GetAsString() => "+OK\r\n";
}

public class ErrOp : IOp  
{
    public string Code => "-ERR";
    public string Message { get; }

    public ErrOp(string message)
    {
        Message = message;
    }

    public string GetAsString()
    {
        return $"-ERROR {Message}\r\n";
    }
}

Finally the OpParser it self:

public class OpParser  
{
  private static readonly Dictionary<string, Func<BinaryReader, IOp>> Ops;
  private const char DelimMarker = ' ';
  private const char Cr = '\r';
  private const char Lf = '\n';

  static OpParser()
  {
    Ops = new Dictionary<string, Func<BinaryReader, IOp>>
    {
      { "+OK", ParseOkOp },
      { "MSG", ParseMsgOp },
      { "-ERR", ParseErrorOp },
      { "INFO", ParseInfoOp },
      { "PING", ParsePingOp },
      { "PONG", ParsePongOp }
    };
  }

  public IEnumerable<IOp> Parse(Stream stream, Func<bool> hasData)
  {
    if (!hasData())
      yield break;

    var opMarkerChars = new List<char>();
    using (var reader = new BinaryReader(stream, Encoding.UTF8, true))
    {
      while (hasData())
      {
        var c = reader.ReadChar();
        if (c != DelimMarker && c != Cr)
        {
          opMarkerChars.Add(c);
          continue;
        }

        var op = new string(opMarkerChars.ToArray());
        opMarkerChars.Clear();

        if (Ops.ContainsKey(op))
          yield return Ops[op](reader);
        else
          throw CreateUnsupportedOpException(op);
      }
      reader.Close();
    }
  }

  private static Exception CreateUnsupportedOpException(string foundOp)
    => new Exception($"Unsupported OP, don't know how to parse OP '{foundOp}'.");

  private static Exception CreateParserException(string op, char expected, char got)
    => new Exception($"Error while parsing {op}. Expected char code
                     '{(byte)expected}' got '{(byte)got}'.");

  private static InfoOp ParseInfoOp(BinaryReader reader)
  {
    var msg = new StringBuilder();
    while (true)
    {
      var c = reader.ReadChar();
      if (c == Cr)
      {
        var burn = reader.ReadChar();
        if (burn != Lf)
          throw CreateParserException(nameof(InfoOp), Lf, burn);
        break;
      }

      msg.Append(c);
    }

    return new InfoOp(msg.ToString());
  }

  private static OkOp ParseOkOp(BinaryReader reader)
  {
    var burn = reader.ReadChar();
    if (burn != Lf)
      throw CreateParserException(nameof(OkOp), Lf, burn);

    return OkOp.Instance;
  }

  private static PingOp ParsePingOp(BinaryReader reader)
  {
    var burn = reader.ReadChar();
    if (burn != Lf)
      throw CreateParserException(nameof(PingOp), Lf, burn);

    return PingOp.Instance;
  }

  private static PongOp ParsePongOp(BinaryReader reader)
  {
    var burn = reader.ReadChar();
    if (burn != Lf)
      throw CreateParserException(nameof(PongOp), Lf, burn);

    return PongOp.Instance;
  }

  private static ErrOp ParseErrorOp(BinaryReader reader)
  {
    var msg = new StringBuilder();
    while (true)
    {
      var c = reader.ReadChar();
      if (c == Cr)
      {
        var burn = reader.ReadChar();
        if (burn != Lf)
          throw CreateParserException(nameof(ErrOp), Lf, burn);
        break;
      }

      msg.Append(c);
    }

    return new ErrOp(msg.ToString());
  }

  private static MsgOp ParseMsgOp(BinaryReader reader)
  {
    var segments = new List<char[]>();
    var segment = new List<char>();
    int payloadSize;
    char burn;

    while (true)
    {
      var c = reader.ReadChar();
      if (c == Cr)
      {
        payloadSize = int.Parse(new string(segment.ToArray()));
        burn = reader.ReadChar();
        if (burn != Lf)
          throw CreateParserException(nameof(MsgOp), Lf, burn);
        break;
      }

      if (c != DelimMarker)
        segment.Add(c);
      else
      {
        segments.Add(segment.ToArray());
        segment.Clear();
      }
    }

    var msg = new MsgOp(
      new string(segments.First()),
      new string(segments.Last()),
      reader.ReadBytes(payloadSize),
      segments.Count > 2 ? new string(segments[1]) : null);

    burn = reader.ReadChar();
    if (burn != Cr)
      throw CreateParserException(nameof(MsgOp), Cr, burn);

    burn = reader.ReadChar();
    if (burn != Lf)
      throw CreateParserException(nameof(MsgOp), Lf, burn);

    return msg;
  }
}

That's it. There's more to come really soon.

//Daniel

Developer that lives by the mantra "code is meant to be shared".

Comments