[nanomsg] Re: Broadcast protocol

  • From: Martin Sustrik <sustrik@xxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Thu, 12 Sep 2013 13:02:15 +0200

Ok,

The underlying goal is to keep consumers synchronised with the state on the producer. Given that the history of the state is not needed, only the actual state, we can avoid the problem of overflowing queues when a consumer is slow/stuck.

To simplify it a bit, let's now assume that there's only one topic involved, i.e. there's only one message cached on the producer side. It also means that there's no need for explicit subscriptions. Connecting would imply subscribing.

So, from the consumers point of view, you just connect and wait for updates. Note that you can connect to only one upstream node as several would overwrite each other's state.

From the publisher's point of view there's no need for the ugly "two threads handle the same socket" solution as originally proposed. Instead, the single cached message will be set to empty (0-byte) message when the socket is created and updated every time producer does nn_send().

Internally, following algorithm will apply:

1. When new consumer connects, cached message is sent to it.
2. Each time the cached message is updated, it's sent to all the consumers.
3. If, because of pushback, we've had to drop messages on any particular connection, once the connection becomes writeable once again, current cached message will be sent to it. 4. On the consumer side, if new message arrives and there's still an old message queued for consumption, it will be replaced by the new one.

When writing the producer application, a care should be taken to avoid "empty" message to be delivered to the consumers. Presumably, this way:

s = nn_socket (AF_SP, NN_BROADCAST);
nn_send (s, "ABC", 3, 0);
nn_bind (s, "tcp://eth0;5555");

All that being said, the protocol looks almost like a trivial case of the pub/sub protocol with leaky-bucket style of buffering implemented. Can we possibly think of an abstraction that unifies both protocols? And is it even desirable?

Martin

On 11/09/13 15:13, Paul Colomiets wrote:
Hi,

I would like to propose a new protocol for nanomsg. I call it
"broadcast" just so we have some name to discuss. Better name is
appreciated.


Overview
--------------

I may define the protocol with the following:

1. There are many watchers

2. Watcher subscribes to a channel at the start

3. When subscribing watcher gets current data on a channel

4. When channel data changes, all its watchers get new value


Combination of pubsub and reqrep can't be used here, because when pubsub
temporarily disconnects some updates can be lost.


API
-----

There are two endpoints: WATCHER and BROADCASTER

The WATCHER (pseudocode, probably python):

     sock = nn_socket(AF_SP, NN_WATCHER)
     sock.send("channel:foo")  # subscription
     while True:
         print(sock.recv())  # receives updated value

     sock.send("channel:bar")  # cancel's previous subscription

The BROADCASTER:

     sock = nn_socket(AF_SP, NN_BROADCASTER)
     while True:
         sub = sock.recv()  # get subscription request from watcher
         _literal, channel = sub.split(':')
         assert _literal == 'channel'
         sock.send(channel + '\0' + data[channel], 0)

     # probably in another thread
     data['foo'] = 'newvalue'
     sock.send('foo' + '\0' + 'newvalue', NN_BROADCAST)

Important points are:

1. Request is different from the channel that watcher would be subscribed to

2. NN_BROADCAST flag says that message is not a reply but a broadcast
message


To keep message short I do not discuss all the apparent issues, just
asking for general agreement.


Thoughts?


P.S.: This protocol is result of discussion of the name service. I hope
we will return back through the "monitoring data -> name service ->
broadcast proto" chain shortly :)

--
Paul


Other related posts: