[nanomsg] patterns for complex applications, pub/sub filtering, tls+tcp;//, and tunneling transports

  • From: Roman Puls <puls@xxxxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Wed, 27 Aug 2014 11:00:40 +0200

Hi all,

since 2010, approximately once a year, I am re-visiting zmq and friends. After my initial enthusiasm, I felt that the provided communication patterns were not suitable for my area of applications, and I struggled with some implemenation details and threading models of zmq, and put it aside again.

However, these days, I found nanomsg (thanks Martin!), and as a Achille's excellent nanomsgxx c++11 wrappers around it. That really forced me to try again, but this time get in touch with experienced people (you) first, and ask. So please excuse my lengthy posting, but I want to be complete, and hope that also other people can learn from the answers and suggestions made here on the list.

I) Real-World Application and complex messaging patterns

First let me introduce a real-life network application, which - in some variation - to me seems to be recurring pattern I see frequently.

Producer
I am having a number of "producers", that create different kind of jobs (joptype a,b,c,...) that need to be processed somewhere in the network. The producer is just collecting the jobs from different sub-systems, providing a unified interface for workers in the network. This currently happens by polling a few spool directories, pushing results in a local database, and hand out jobs in a FIFO pattern, based on a timestamp that is encoded in the jobs. When a job is handed out to a worker, we update the database with the information "job is being processed by worker xyz". If that takes too long, we remove that information, making that job available again for other workers (or the same).

Worker
Then we have a bunch of "workers" sitting in the network, each being able to process a specific job, specifically one of a, b, or c. A worker, after processing a job, needs to push it's results into a database, and give notification to the *initiating* producer that it succeeded or failed (that might i.e. happen if the database is offline, or some error occurs). In the failing case, a producer may re-schedule the job later, or move it to a special queue that needs inspection by an administrator. Finally, it may happen that a worker needs additional information from the producer to complete job processing, i.e. request an additional file. Most likely, all workers of a given class (job-type) are interested in that, so they would take a copy of that information, if not there.

Store
eventually part of the worker process (as a fan-in), accepting processed jobs, storing that into a database backend. A store can get blocked, i.e. due to database repair, outage, etc.


Topology
- each worker should be able to receive jobs from each producer
- each worker needs to report back to the initiating producer
  about whether or not the job could be processed
- eventually, a worker needs to fetch additional information (files)
  from the initiating producer. However, most likely all workers of
  that job-type are interested in that.

[now let's hope that we have a mono-spaced font here]:


 prod1  prod2    prod3
   /\  /  /       /  |
  /  \/  /   +---+   |
 /   /\  |  /        |
w1(a) w2(b)          |
|                    |
+--------------------+

Of course, processing time of job-type(a) is completely different compared to processing times of job types b and c (processing times are in the range from msec to sec). Also, the workers have different capabilities (CPU, single or multi-threaded), so a strict round-robin push from the producers to the workers will fail in that sense that you get "blocked" on the worker with the least capabilities.

Also, letting the workers create a req-socket, and connect every producer to that socket does not help. As soon as one producer is running out of work, we're blocked. In addition, we'd need to ask the producer for a specific job-type (a,b, or c), so blocking on a particular job-type would also block other workers with different job-types. However, that pattern *could* be solved by using an nn_socket(AF_SP_RAW, NN_REP) on the producer side.

BTW: binding different ports on the producers to represent different job-types is not a preferred solution, and source-port inspection is neither (and most likely impossible?).

Concurrency and latency
Assuming that job-type "b" is expensive in terms of CPU, and that processing takes some seconds, we have workers that use every CPU of the host, providing parallel processing. In my eyes, it would be quite useless to connect each of the worker-threads to the producers (assume we have 5x 12-code machines, each connecting to 5 producers, resulting in at least 300 tcp connections).

Instead, if we have 'super-scalar' workers, they shall have some sort of fan-in queue (which would be an instance collecting jobs from all producers), and worker-threads would get work from that local queue. The larger the queue (to some reasonable limit), the lower the latency, IMO.

General Requirements
- re-connecting sockets at high frequency shall be avoided
- polling producers shall be avoided, but some sort of natural
  flow control should happen
- the number of tcp channels between all parties shall be minimized


I'd now be curios to see your ideas what patterns are applicable here.


II) PUB/SUB Filtering

in the context of I), where some files are needed, pub/sub came into my mind to push the missing piece of information from a producer into all workers. Am I right that currently the pub/sub subscription is a host-local filtering. or do we have sender-side filtering in nanomsg?

If no pub-side filtering: what are the reasons to do that on the receiver side only, and are there plans to change that to pub-side filtering?


III) Transports

I have read a few postings here about creating new transports. The first stuff I'd be interested in (of course) would be to use a "tcp+tls://" socket. Is somebody working on that one? If not, do we have a trivial example on how to create new transports, i.e. a loop-back transport?

As a second topic: did somebody ever consider to write a multiplexing transport, which would allow you to run i.e. a req/rep and a pub/sub over the same tcp or tls+tcp connection? Head of line blocking is known, but can be solved here IMO. Usage pattern would be something like


[code]
--- client ---
auto tunnel = socket(AF_SP_RAW, NN_TUNNEL);
tunnel.connect("tcp+tls://127.0.0.1:1234?name=tunnel1");

auto req = socket(AF_SP, NN_REQ);
pipe.connect("tunnel://tunnel1/req-endpoint");

auto pub = socket(AF_SP, NN_PUB);
pub.connect("tunnel://tunnel1/pub-endpoint");

--- server ---
auto runnel = socket(AF_SP_RAW, NN_TUNNEL);
tunnel.bind("tcp+tls://127.0.0.1:1234");

auto rep = socket(AF_SP, NN_REP);
rep.bind("ipc://reqrep");

auto sub = socket(AF_SP, NN_SUB);
sub.bind("ipc://pubsub");

tunnel.setsockopt(NN_SOL_TUNNEL, OPT_CONNECT, struct {rep, "req-endpoint");
tunnel.setsockopt(NN_SOL_TUNNEL, OPT_CONNECT, struct {sub, "pub-endpoint");
[/code]

IV) Endianness

I have no information found on that topic, and grepping the sources for hton-functions, and the endian keyword did not show me any entry-points, so here yet another question: is nanomsg endianess aware? I don't mean user payload, but are the headers swapped in network-byte order, so you can connect big-endian with a little-endian system?


So far my questions. As closing words: if you're not familiar with zmq, it's quite hard to understand what nanomsg does, as nanomsg does not come with an extensive documentation about the patterns. Tim's "Getting Started with 'nanomsg'" is a great introduction, but is missing the fan-in, fan-out relationships, raw-socket types (i just found that topic in the list).

@Martin: do you have road-map for nanomsg?

Thanks in advance, and have a good day,
  Roman



Other related posts: