[nanomsg] Generalized subscription upstreaming

  • From: Alex Elsayed <eternaleye@xxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Fri, 19 Sep 2014 16:47:35 -0700

This is an attempt to define a general system for upstreaming subscriptions 
on bidirectional transports. All comments welcome and deeply appreciated.

Note that all messages from subscribers to producers expect a confirmation, 
and thus when I say that S sends something, it is assumed that S will resend 
the message if a timeout elapses. To facilitate this, the effects of repeat 
messages on producers are idempotent.

The subscriber maintains a trie of subscriptions, where each leaf carries a 
bitfield of the following:
POSITIVE (when absent, is a "negative" or "ignore" subscription)
SYNCHRONIZED (Has the upstream acknowledged the subscription?)
PENDING (...have we asked them?)
CANONICAL (Is this a subscription by the user, or an internal 
synchronization artifact?)

the subscriber S' trie begins with a single subscription to "", which is 
POSITIVE, SYNCHRONIZED, CANONICAL, and not PENDING.

When the user explicitly subscribes to a topic, the CANONICAL flag is 
cleared from the empty-string subscription. The inserted entry for the 
explicit subscription is POSITIVE and CANONICAL, but neither SYNCHRONIZED 
nor PENDING.

If the empty-string subscription is not present, a SUBSCRIBE message is 
immediately sent upstream, and the PENDING flag _is_ set.

On receiving a normal message, S will look up the topic in the trie.

If no prefix of the topic is found in the trie, a leaf is created for 
shortest prefix for which there was no entry, with PENDING true and all 
other fields false. The subscriber then sends a REJECT message upstream.

Otherwise, over a list of all leaves that are prefixes to the topic (root 
first, end last), the following occurs:

for leaf in list:
    if leaf is SYNCHRONIZED or PENDING:
        continue
    else
        if leaf is CANONICAL and POSITIVE:
        mark leaf PENDING
        send SUBSCRIBE message upstream
        break

With this, a natural throttle is placed on the rate of subscriptions.

P also keeps a trie, but a much simpler one - the value is a single bit, 
indicating POSITIVE vs NEGATIVE.

On P receiving a REJECT message, a NEGATIVE entry is inserted, and on 
receiving a SUBSCRIBE message, a POSITIVE entry is inserted. Then, P sends S 
a CONFIRM (REJECT|SUBSCRIBE) message as appropriate.

On receiving a CONFIRM (REJECT or SUBSCRIBE) message, S:
1.) Marks the appropriate node N as SYNCHRONIZED and clears PENDING
2.) Walks from the node towards the root until it encounters a node X that 
is not CANONICAL
3.) If all entries in the tree under X either:
    a.) Are SUBSCRIBED or
    b.) Have a SUBSCRIBED node on the path between them and X,
  then S sets the PENDING flag on X, and sends a RESET message upstream for 
X.

On P receiving a RESET message, it deletes the relevant node from its trie, 
and sends a CONFIRM RESET message to S.

On S receiving a CONFIRM RESET message, it removes the node from its trie, 
and then executes steps 2 and 3 from above.


With this, the subscription state of the producer converges on that of the 
subscriber, while restricting the subscription messages to 1 per producer 
message. Negative subscriptions suppress broad categories of useless 
messages, and then are removed once all canonical subscriptions have been 
upstreamed. In the case of an unresponsive peer, disconnection is viable, 
because re-connection will not cause a flood of subscription messages.

A subscriber wishing to subscribe to multiple producers would keep one trie 
for each producer, and insert CANONICAL entries into all tries.

Adding a producer after tries already exist also works, as one can simply 
walk any of the existing tries and insert the CANONICAL nodes into the new 
trie.

Messages are only passed to the user if they match a POSITIVE, CANONICAL 
entry in the trie.


Other related posts: