Just reached v0.7.0 of MyNatsClient and focus has been on simplifying the usage of it. This means that the client now keeps track of subscriptions and auto-subscribes when the client gets connected or re-connected. It now also allows you to setup a subscription both against the NATS server and the internal observable stream of messages in one call. And the final bigger change is that it now supports simplifying methods for performing requests. Lets have a look at some short code samples.
Simplest pub-sub example
Before this release, you had to subscribe both to the in-process client.MsgOpStream as well as sending a sub command to the NATS server via client.Sub(...) or client.SubAsync(...). You still can. But you don't have to anymore. You can now do that in one call instead:
var connectionInfo = new ConnectionInfo("192.168.1.20")
{
AutoReconnectOnFailure = true
};
using(var client = new NatsClient("clientId1", connectionInfo))
{
await client.SubWithHandlerAsync("clock.ticked", msg => {
Dump($"Clock ticked. Tick is {msg.GetPayloadAsString()}");
});
await client.PubAsync("clock.ticked", getNextTick());
}
Use an observer to get exceptions etc
You can also make use of client.SubWithObserver(...) instead. This lets you hook in a callback for Action<Exception>.
using(var client = new NatsClient("clientId1", connectionInfo))
{
var observer = new DelegatingObserver<MsgOp>(
msg => {
//On message
},
ex => {
//On ex
});
await client.SubWithObserverAsync("clock.ticked", observer);
await client.PubAsync("clock.ticked", getNextTick());
}
Request-response
using(var client = new NatsClient("clientId1", connectionInfo))
{
//The response producer
await client.SubWithHandlerAsync("getTemp", msg => {
client.Pub(msg.ReplyTo, getTemp(msg.GetPayloadAsString()));
});
//The requesting party
var response = await client.RequestAsync(
"getTemp",
"stockholm@sweden");
Dump($"Temp in Stockholm is {response.GetPayloadAsString()}");
}
Subscribing & Unsubscribing
The Client will keep track of subscriptions. And you can set them up before connecting. Once it gets connected, it will register the subscriptions against the NATS server. If you make use of ConnectionInfo.AutoReconnectOnFailure it will also re-subscribe in the event of exceptions.
When subscribing to a subject using the client, you will be returned an ISubscription. The methods for subscribing are:
client.Sub(subscriptionInfo)client.SubAsync(subscriptionInfo)client.SubWithHandler(subscriptionInfo, msg => {})client.SubWithHandlerAsync(subscriptionInfo, msg => {})client.SubWithObserver(subscriptionInfo, observer)client.SubWithObserverAsync(subscriptionInfo, observer)client.SubWithObservableSubscription(subscriptionInfo, msgs => msgs.Subscribe(...))client.SubWithObservableSubscriptionAsync(subscriptionInfo, msgs => msgs.Subscribe(...))
To Unsubscribe, you can do any of the following:
- Dispose the
ISubscriptionreturned by any of the subscribing methods listed above. - Dispose the
NatsClientand it will take care of the subscriptions. - Pass the
ISubscription.SubscriptionInfoto any of theclient.Unsub|UnsubAsyncmethods - Create the subscription using a
SubscriptionInfowithMaxMessages, then it will auto unsubscribe after receiving the messages.
NOTE it's perfectly fine to do both e.g. subscription.Dispose as well as consumer.Dispose or e.g. consumer.Unsubscribe and then subscription.Dispose.
That's it for now. One step close to a stable release.
Merry Christmas.
//Daniel