danielwertheim

danielwertheim


notes from a passionate developer

Share


Sections


Tags


Disclaimer

This is a personal blog. The opinions expressed here represent my own and not those of my employer, nor current or previous. All content is published "as is", without warranty of any kind and I don't take any responsibility and can't be liable for any claims, damages or other liabilities that might be caused by the content.

Continuing with C# and Nats, now looking at NatsObservable

Time has come to encapsulate the consuming part a bit, and in this post we will move this to something called NatsObservable which extends IObservable<IOp> and therefore lets you observe incoming ops from a Nats server. Using ReactiveExtensions.Net you could do all kinds of fun stuff. How-ever, I'm not sure if the project is still alive. So in a future post I'll cover another "alternative" or solution to do in-memory routing. Where we also look into constructing an application message protocol.

All posts in this series

The consuming code

The consuming code is beeing more and more reduced. This time we make use of the NatsObservable (source code for it below).

var cancellation = new CancellationTokenSource();

using (var socket = SocketFactory.Create())
{
    socket.Connect("192.168.1.176", 4222);
    
    //Tell the server to e.g. not confirm client OPs with +OK
    //socket.SendUtf8("CONNECT {\"verbose\": false}\r\n");
    
    //Subscribe to subhect "foo"
    socket.SendUtf8("SUB foo c1\r\n");

    using (var observable = NatsObservable.Consuming(
        socket,
        cancellation.Token))
    {
        //Subscribe to all ops anb dump out
        observable.Subscribe(op =>
        {
            Console.WriteLine("===== RECEIVED =====");
            Console.Write(op.GetAsString());
        });

        //Subscribe to PingOps. Respond with
        //Pong to tell server we are still here
        observable.OfType<PingOp>().Subscribe(ping => socket.SendUtf8("PONG\r\n"));

        //Subscribe to MsgOp and see what we have extracted
        observable.OfType<MsgOp>().Subscribe(msg =>
        {
            Console.WriteLine("===== MSG =====");
            Console.WriteLine($"Subject: {msg.Subject}");
            Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
            Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
            Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
        });

        Console.ReadKey();
        cancellation.Cancel();
    }

    socket.SendUtf8("UNSUB foo c1\r\n");
    socket.Close();
}

cancellation.Dispose();

Tip the Subscribe method returns an IDisposable which you could use to stop a subscription.

In the code above, I'm making use of the Reactive-Extensions NuGet to get support for Observable.OfType<T>. So, install the package:

install-package rx-main

If we publish some message to the subject "foo", it will now look something like this:

The NatsObservable

Currently it's working against the Socket to construct its own read-only NetworkStream. This is work in progress. Feel free to redesign.

public class NatsObservable : IObservable<IOp>, IDisposable
{
    private readonly ConcurrentDictionary<Guid, Subscription> _subscriptions;
    private NetworkStream _stream;
    private Func<bool> _streamHasData;
    private Task _worker;

    private NatsObservable(Socket socket, CancellationToken cancellation)
    {
        _subscriptions = new ConcurrentDictionary<Guid, Subscription>();
        _stream = new NetworkStream(socket, FileAccess.Read, false);
        _streamHasData = () =>
            _stream.DataAvailable &&
            !cancellation.IsCancellationRequested;

        _worker = Task.Factory.StartNew(() =>
        {
            var opParser = new OpParser();

            while (!cancellation.IsCancellationRequested)
            {
                SpinWait.SpinUntil(() =>
                    cancellation.IsCancellationRequested ||
                    (_stream != null &&
                    _stream.DataAvailable));

                if (cancellation.IsCancellationRequested)
                    break;

                foreach (var op in opParser
                    .Parse(_stream, _streamHasData)
                    .TakeWhile(op => !cancellation.IsCancellationRequested))
                {
                    Trigger(op);
                }
            }
        }, cancellation,
           TaskCreationOptions.LongRunning,
           TaskScheduler.Default);
    }

    public void Dispose()
    {
        _streamHasData = () => false;

        if (_worker != null && _worker.IsCompleted)
        {
            _worker.Dispose();
            _worker = null;
        }

        _stream?.Close();
        _stream?.Dispose();
        _stream = null;
    }

    public static NatsObservable Consuming(
        Socket socket,
        CancellationToken cancellation)
    {
        return new NatsObservable(socket, cancellation);
    }

    public IDisposable Subscribe(IObserver<IOp> observer)
    {
        var subscription = new Subscription(observer, s =>
        {
            if (_subscriptions.TryRemove(s.Id, out s))
                s.OnCompleted();
        });

        if (_subscriptions.TryAdd(subscription.Id, subscription))
            return subscription;

        throw new InvalidOperationException("Could not register observer.");
    }

    private void Trigger(IOp value)
    {
        foreach (var subscription in _subscriptions.Values)
        {
            try
            {
                subscription.OnNext(value);
            }
            catch (Exception ex)
            {
                subscription.OnError(ex);
            }
        }
    }
}

Then the Subscription:

public class Subscription : IDisposable
{
    public readonly Guid Id;
    private readonly IObserver<IOp> _observer;
    private readonly Action<Subscription> _onDispose;

    public int ValueCount { get; private set; }
    public int TotalErrorCount { get; private set; }

    public Subscription(IObserver<IOp> observer, Action<Subscription> onDispose)
    {
        Id = Guid.NewGuid();

        _observer = observer;
        _onDispose = onDispose;
    }

    public void Dispose()
    {
        _onDispose(this);
    }

    public void OnError(Exception ex)
    {
        TotalErrorCount++;
        if (TotalErrorCount > 3)
        {
            Dispose();
            _observer.OnError(ex);
        }
    }

    public void OnNext(IOp value)
    {
        ValueCount++;
        _observer.OnNext(value);
    }

    public void OnCompleted()
    {
        _observer.OnCompleted();
    }
}

That's all for now. There's more coming up.

Cheers,

//Daniel

View Comments