This week I held a talk at Swetugg, a two day Swedish conference for .NET developers. My title was: "NATS, what a beautiful message protocol!". In order to show a good use-case of NATS I put together a small demo where I read my heart rate in real time using a sensor and ANT+ SDK, which was published via NATS. I then had two different services (apps) running. Both subscribed to the NATS subject to which the heart rate readings were published. One service wrote the data to InfluxDB so that it could be visualised in Grafana. In another subscriber I took the data and matched it to a certain colour and then used the HTTP-API of the Philips Hue bridge to make a Philips Hue Go indicate if I was "calm" or "stressed".
I got comments about it both during and after the talk:
Oh! Now it changed!
or...
When you stood behind the computer you where more calm. Then it turned green. At one point even blue.
It looked something like this:
"NATS, what a beautiful message protocol!" med @danielwertheim pic.twitter.com/hu0jD95QIE
— swetugg (@swetugg) January 30, 2017
The overall solution
I put together a small sketch to give a quick overview of how the pieces worked together
Some Docker containers
I should have used Docker compose but anyway. All I needed was:
- NATS (only 5.7mb)
- InfluxDB
- Grafana
docker run -d -p 4222:4222 -p 8222:8222
--name my-nats nats -m 8222
docker run -d -p 8083:8083 -p 8086:8086
-v /docker-data/my-influx:/var/lib/influxdb
-v /docker-conf/influxdb.conf:/etc/influxdb/influxdb.conf:ro
--name my-influx influxdb
-config /etc/influxdb/influxdb.conf
docker run -d -p 3000:3000
-v /docker-data/my-grafana:/var/lib/grafana
-v /docker-conf/my-grafana:/etc/grafana
--name my-grafana grafana/grafana
The "message" HeartRateReceived
The solution only uses one message which is distributed via a NATS subject: "heartratereceived"
.
public class HeartRateReceived
{
public static readonly string NatsSubject
= nameof(HeartRateReceived).ToLower();
public string User { get; private set; }
public byte BeatPerMinute { get; private set; }
public DateTime TimeStamp { get; private set; }
private HeartRateReceived() { }
public static HeartRateReceived Parse(string data)
{
//See ToString below
//Data is sent over NATS as simple string
//with values separated by @
var parts = data.Split('@');
return new HeartRateReceived
{
User = parts[0],
BeatPerMinute = byte.Parse(parts[1]),
TimeStamp = DateTime.Parse(parts[2])
};
}
public static HeartRateReceived Create(
string user,
byte beatPerMinute)
=> new HeartRateReceived
{
User = user,
BeatPerMinute = beatPerMinute,
TimeStamp = DateTime.UtcNow
};
//Generates payload representation,
//which is sent over NATS
public override string ToString()
=> $"{User}@{BeatPerMinute}@{TimeStamp:O}";
}
ANT+ SDK
Due to licence restrictions and agreements, I'm not allowed to show all bits and pieces here. But if you head over to: https://www.thisisant.com/developer/ you will be able to signup and become an adopter and thereby gain access to SDKs and sensor/device profiles and keys.
That is what I did to be able to use a small Garmin USB receiver to access my Garmin heart rate sensor. The high level code looked like this:
var config = new ReceiverConfig(
"someuser",
NatsConfig.GetConnectionInfo(),
NetworkKey,
ushort.Parse(ConfigurationManager.AppSettings["device_number"]));
var receiver = new Receiver(config);
receiver.Start();
public class Receiver : IDisposable
{
//Device uses USB receiver to read from
//heart rate sensor in real time
//Due to license restrictions I'm not allowed to
//show underlying code.
private HeartRateDevice _heartRateDevice;
private NatsClient _natsClient;
public Receiver(ReceiverConfig config)
{
_natsClient = new NatsClient("HeartRateReceiver", config.NatsConnectionInfo);
_heartRateDevice = new HeartRateDevice(config.NetworkKey, config.DeviceNumber)
{
OnHeartRateDataFrameReceived = dataFrame =>
{
//Received heart rate data frame from sensor.
//Create unified message and dispatch via NATS
var msg = HeartRateReceived.Create(config.User, dataFrame.BeatPerMinute);
_natsClient.Pub(HeartRateReceived.NatsSubject, msg.ToString());
}
};
}
public void Dispose()
{
_heartRateDevice?.Disconnect();
_heartRateDevice?.Dispose();
_heartRateDevice = null;
_natsClient?.Dispose();
_natsClient = null;
}
public void Start()
{
_natsClient.Connect();
_heartRateDevice.Connect();
}
public void Stop()
{
_heartRateDevice.Disconnect();
_natsClient.Disconnect();
}
}
The Indexer
The Indexer that wrote the data to InfluxDB was a simple service/app that looked like this:
var config = new IndexerConfig(
NatsConfig.GetConnectionInfo(),
"http://192.168.2.17:8086",
"healthdata");
var indexer = new Indexer(config);
await indexer.StartAsync();
public class Indexer : IDisposable
{
private readonly IndexerConfig _config;
private NatsClient _natsClient;
private InfluxDbClient _influxDbClient;
private ISubscription _natsSubscription;
public Indexer(IndexerConfig config)
{
_config = config;
_natsClient = new NatsClient("HeartRateIndexer", config.NatsConnectionInfo);
_influxDbClient = new InfluxDbClient(config.InfluxDbHost);
//Shows that you don't have to subscribe against
//NATS and MsgOpStream in one go.
//Here we subscribe to the in-process message dispatching
//via observable stream of MsgOp
//The NATS subscription is configured in StartAsync
_natsClient.MsgOpStream
.Subscribe(async msg => await HandleHeartRateReceived(msg));
}
private async Task HandleHeartRateReceived(MsgOp msg)
{
//Parse out the message
var heartRateReceived = HeartRateReceived.Parse(msg.GetPayloadAsString());
//Create points for writing to time series health database
var points = new InfluxPoints()
.Add(new InfluxPoint(nameof(HeartRateReceived).ToLower())
.AddTag("user", heartRateReceived.User)
.AddField("beatperminute", heartRateReceived.BeatPerMinute)
.AddTimeStamp(heartRateReceived.TimeStamp, new SecondsResolution()));
//Write points to InfluxDB so that they can be visualized in Grafana
await _influxDbClient
.WriteAsync(_config.InfluxDbName, points)
.ConfigureAwait(false);
}
public void Dispose()
{
_natsClient?.Dispose();
_natsClient = null;
_influxDbClient.Dispose();
_influxDbClient = null;
}
public async Task StartAsync()
{
//Ensure time series database exists
await _influxDbClient
.CreateDatabaseAsync(_config.InfluxDbName)
.ConfigureAwait(false);
//Connect to NATS and subscribe to subject for
//HeartRateReceived messages
_natsClient.Connect();
_natsSubscription = await _natsClient
.SubAsync(HeartRateReceived.NatsSubject)
.ConfigureAwait(false);
}
public Task StopAsync()
{
//Be a good citizen and Unsub from NATS subject
_natsSubscription?.Dispose();
_natsSubscription = null;
//Can also do...
//await _natsClient
// .UnsubAsync(_natsSubscription.SubscriptionInfo)
// .ConfigureAwait(false);
_natsClient.Disconnect();
return Task.CompletedTask;
}
}
Which allowed my to visualise this in Grafana looking like this:
The HueController
The last piece was a service/app that got a copy of the published message and updated the colour of the lamp. If you are interested, you can get more information about the API of the Philps Hue bridge: https://www.developers.meethue.com/documentation/getting-started
The service/app entry point, built up some configuration for available lamps and the heart rate zones:
private const byte Brightness = 85;
private const byte Saturation = 254;
...
var config = new HueControllerConfig(
NatsConfig.GetConnectionInfo(),
"http://192.168.2.2",
"some api key",
new List<HueLamp>
{
new HueLamp(1, new List<HeartBeatColor>
{
new HeartBeatColor(0, 60, HueColor.Blue(Brightness, Saturation)),
new HeartBeatColor(61, 75, HueColor.Green(Brightness, Saturation)),
new HeartBeatColor(76, 80, HueColor.Orange(Brightness, Saturation)),
new HeartBeatColor(81, 110, HueColor.Red(Brightness, Saturation)),
new HeartBeatColor(111, 200, HueColor.Pink(Brightness, Saturation))
})
});
var controller = new HueController(config);
await controller.StartAsync();
The lamp and colours:
public class HueLamp
{
public int Id { get; }
public List<HeartBeatColor> ColorRanges { get; }
public HueLamp(int id, List<HeartBeatColor> colorRanges)
{
Id = id;
ColorRanges = colorRanges;
}
}
public class HeartBeatColor
{
public int MinBpm { get; }
public int MaxBpm { get; }
public HueColor HueColor { get; }
public HeartBeatColor(int minBpm, int maxBpm, HueColor hueColor)
{
MinBpm = minBpm;
MaxBpm = maxBpm;
HueColor = hueColor;
}
public bool CoversBpmOf(byte bpm)
=> MinBpm <= bpm && bpm <= MaxBpm;
}
public class HueColor
{
public static readonly HueColor Default = new HueColor(45, 117, 8501);
public byte Bri { get; }
public byte Sat { get; }
public ushort Hue { get; }
public HueColor(byte bri, byte sat, ushort hue)
{
Bri = bri;
Sat = sat;
Hue = hue;
}
public static HueColor Green(byte bri, byte sat)
=> new HueColor(bri, sat, 24200);
public static HueColor Orange(byte bri, byte sat)
=> new HueColor(bri, sat, 4812);
public static HueColor Red(byte bri, byte sat)
=> new HueColor(bri, sat, 0);
public static HueColor Blue(byte bri, byte sat)
=> new HueColor(bri, sat, 42152);
public static HueColor Pink(byte bri, byte sat)
=> new HueColor(bri, sat, 59526);
}
And finally the actual controller. Here I make use of the RX capabilities of MyNatsClient, to sample out a value each 5s to not update the lamp colour to often.
public class HueController : IDisposable
{
private static readonly TimeSpan SamplingFreq = TimeSpan.FromSeconds(5);
private readonly HueControllerConfig _config;
private NatsClient _natsClient;
private HttpRequester _requester;
private ISubscription _natsSubscription;
private byte _lastKnownBpm;
private readonly Dictionary<int, HueColor> _lastUsedColorByLamp;
public HueController(HueControllerConfig config)
{
_config = config;
_natsClient = new NatsClient("LightsController", config.NatsConnectionInfo);
_requester = HttpRequester.Create($"{config.HueHost}/api/{config.HueUser}");
_lastUsedColorByLamp = config.HueLamps.ToDictionary(l => l.Id, _ => null as HueColor);
}
public void Dispose()
{
_natsClient?.Dispose();
_natsClient = null;
_requester.Dispose();
_requester = null;
}
public async Task StartAsync()
{
//Turn on lamps
foreach (var lamps in _config.HueLamps)
{
await _requester.PutEntityAsJsonAsync(
new { On = true },
$"/lights/{lamps.Id}/state").ConfigureAwait(false);
}
//Subscribe to HeartRateReceived, use sampling to react at most each 5s
_natsClient.Connect();
_natsSubscription = await _natsClient
.SubWithObservableSubscriptionAsync(
HeartRateReceived.NatsSubject,
msgOps => msgOps
.Sample(SamplingFreq)
.Select(msgOp => HeartRateReceived.Parse(msgOp.GetPayloadAsString()))
.Where(ev => _lastKnownBpm != ev.BeatPerMinute)
.Subscribe(async ev => await AdjustColorAsync(ev)))
.ConfigureAwait(false);
}
private async Task AdjustColorAsync(HeartRateReceived heartRateReceived)
{
_lastKnownBpm = heartRateReceived.BeatPerMinute;
foreach (var lamp in _config.HueLamps)
{
var newColor = GetNewColor(lamp, heartRateReceived);
var lastUsedColor = _lastUsedColorByLamp[lamp.Id];
if (lastUsedColor != null && lastUsedColor.Hue == newColor.Hue)
continue;
var response = await _requester.PutEntityAsJsonAsync(
newColor,
$"/lights/{lamp.Id}/state").ConfigureAwait(false);
if (response.IsSuccess)
_lastUsedColorByLamp[lamp.Id] = newColor;
}
}
private static HueColor GetNewColor(
HueLamp hueLamp,
HeartRateReceived heartRateReceived)
{
var bpm = heartRateReceived.BeatPerMinute;
return hueLamp.ColorRanges
.FirstOrDefault(cr => cr.CoversBpmOf(bpm))?.HueColor ?? HueColor.Default;
}
public async Task StopAsync()
{
//Unsubscribe
if (_natsSubscription != null)
{
await _natsClient
.UnsubAsync(_natsSubscription.SubscriptionInfo)
.ConfigureAwait(false);
//It is actually enough to just do this Dispose, as it will unsub
_natsSubscription.Dispose();
}
_natsClient.Disconnect();
}
}
This demo worked well for me. And shows a very good use-case for NATS. Pumping frequent, small sized sensor data, where transactional guarantees is not needed and "at most once" delivery is OK.
That's it. Hope it can inspire you to build something as well.
Cheers,
//Daniel