Time has come to encapsulate the consuming part a bit, and in this post we will move this to something called NatsObservable
which extends IObservable<IOp>
and therefore lets you observe incoming ops from a Nats server. Using ReactiveExtensions.Net you could do all kinds of fun stuff. How-ever, I'm not sure if the project is still alive. So in a future post I'll cover another "alternative" or solution to do in-memory routing. Where we also look into constructing an application message protocol.
All posts in this series
- NATS, What a beautiful protocol
- Simple incoming OP parser for NATS in C#
- Using the OpParser with NATS to create a long running consumer in C#
- Continuing with C# and NATS, now looking at NatsObservable
- Time to construct a bundled NATS client for C#
The consuming code
The consuming code is beeing more and more reduced. This time we make use of the NatsObservable
(source code for it below).
var cancellation = new CancellationTokenSource();
using (var socket = SocketFactory.Create())
{
socket.Connect("192.168.1.176", 4222);
//Tell the server to e.g. not confirm client OPs with +OK
//socket.SendUtf8("CONNECT {\"verbose\": false}\r\n");
//Subscribe to subhect "foo"
socket.SendUtf8("SUB foo c1\r\n");
using (var observable = NatsObservable.Consuming(
socket,
cancellation.Token))
{
//Subscribe to all ops anb dump out
observable.Subscribe(op =>
{
Console.WriteLine("===== RECEIVED =====");
Console.Write(op.GetAsString());
});
//Subscribe to PingOps. Respond with
//Pong to tell server we are still here
observable.OfType<PingOp>().Subscribe(ping => socket.SendUtf8("PONG\r\n"));
//Subscribe to MsgOp and see what we have extracted
observable.OfType<MsgOp>().Subscribe(msg =>
{
Console.WriteLine("===== MSG =====");
Console.WriteLine($"Subject: {msg.Subject}");
Console.WriteLine($"QueueGroup: {msg.QueueGroup}");
Console.WriteLine($"SubscriptionId: {msg.SubscriptionId}");
Console.WriteLine($"Payload: {Encoding.UTF8.GetString(msg.Payload)}");
});
Console.ReadKey();
cancellation.Cancel();
}
socket.SendUtf8("UNSUB foo c1\r\n");
socket.Close();
}
cancellation.Dispose();
Tip the Subscribe
method returns an IDisposable
which you could use to stop a subscription.
In the code above, I'm making use of the Reactive-Extensions NuGet to get support for Observable.OfType<T>
. So, install the package:
install-package rx-main
If we publish some message to the subject "foo"
, it will now look something like this:
The NatsObservable
Currently it's working against the Socket
to construct its own read-only NetworkStream
. This is work in progress. Feel free to redesign.
public class NatsObservable : IObservable<IOp>, IDisposable
{
private readonly ConcurrentDictionary<Guid, Subscription> _subscriptions;
private NetworkStream _stream;
private Func<bool> _streamHasData;
private Task _worker;
private NatsObservable(Socket socket, CancellationToken cancellation)
{
_subscriptions = new ConcurrentDictionary<Guid, Subscription>();
_stream = new NetworkStream(socket, FileAccess.Read, false);
_streamHasData = () =>
_stream.DataAvailable &&
!cancellation.IsCancellationRequested;
_worker = Task.Factory.StartNew(() =>
{
var opParser = new OpParser();
while (!cancellation.IsCancellationRequested)
{
SpinWait.SpinUntil(() =>
cancellation.IsCancellationRequested ||
(_stream != null &&
_stream.DataAvailable));
if (cancellation.IsCancellationRequested)
break;
foreach (var op in opParser
.Parse(_stream, _streamHasData)
.TakeWhile(op => !cancellation.IsCancellationRequested))
{
Trigger(op);
}
}
}, cancellation,
TaskCreationOptions.LongRunning,
TaskScheduler.Default);
}
public void Dispose()
{
_streamHasData = () => false;
if (_worker != null && _worker.IsCompleted)
{
_worker.Dispose();
_worker = null;
}
_stream?.Close();
_stream?.Dispose();
_stream = null;
}
public static NatsObservable Consuming(
Socket socket,
CancellationToken cancellation)
{
return new NatsObservable(socket, cancellation);
}
public IDisposable Subscribe(IObserver<IOp> observer)
{
var subscription = new Subscription(observer, s =>
{
if (_subscriptions.TryRemove(s.Id, out s))
s.OnCompleted();
});
if (_subscriptions.TryAdd(subscription.Id, subscription))
return subscription;
throw new InvalidOperationException("Could not register observer.");
}
private void Trigger(IOp value)
{
foreach (var subscription in _subscriptions.Values)
{
try
{
subscription.OnNext(value);
}
catch (Exception ex)
{
subscription.OnError(ex);
}
}
}
}
Then the Subscription
:
public class Subscription : IDisposable
{
public readonly Guid Id;
private readonly IObserver<IOp> _observer;
private readonly Action<Subscription> _onDispose;
public int ValueCount { get; private set; }
public int TotalErrorCount { get; private set; }
public Subscription(IObserver<IOp> observer, Action<Subscription> onDispose)
{
Id = Guid.NewGuid();
_observer = observer;
_onDispose = onDispose;
}
public void Dispose()
{
_onDispose(this);
}
public void OnError(Exception ex)
{
TotalErrorCount++;
if (TotalErrorCount > 3)
{
Dispose();
_observer.OnError(ex);
}
}
public void OnNext(IOp value)
{
ValueCount++;
_observer.OnNext(value);
}
public void OnCompleted()
{
_observer.OnCompleted();
}
}
That's all for now. There's more coming up.
Cheers,
//Daniel