[nanomsg] Re: Planning nanomsg on multithreaded and distributed applications

  • From: Roberto Fichera <kernel@xxxxxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Tue, 10 Jan 2017 19:35:45 +0100

On 01/05/2017 12:46 PM, Roberto Fichera wrote:

Hi Garrett,

On 01/05/2017 11:39 AM, Garrett D'Amore wrote:
nanomsg works quite well for many use cases, and lots of people are using 
it.  (I think mangos works even better, but
it is only applicable for folks using Go.  It is wire compatible with 
nanomsg, so you can mix and match the two
easily.  Mangos does have some extra capabilities though — especially for 
websocket and TLS based security.)

[...]

I’m not that passionate about this, even though I’ve written one of the only 
wire protocol implementations of the SP
protocols (mangos) and am the current nanomsg maintainer/BDFL.  For me, 
ZeroMQ’s heavy weight approach to everything,
and its reliance on C++, made it unpleasant for me to work with, which is 
what drew me to nanomsg originally.  I like
the protocols, and I am super proud of Mangos which I think is a very robust 
messaging framework that I’ve used in a
variety of commercial settings (and I know others have too).  But my 
feelings won’t be hurt if you choose ZeroMQ.  I
do hope in a couple of weeks, if you have time, you’ll come back and have a 
look at libnng.  I think by then it will
really be worth looking at.  (nng stands for nanomsg next-gen.)

Best of luck to you!
Thanks Garrett for the explanation. Most likely I will give it a try starting 
from now to
the next couple of weeks. I don't have any problem to wait the upcoming 
libnng.

As said, a feature I really like to have is the notification of topic that a 
subscriber is interested at.



I've almost finished the C implementation of my logic, but it seems there some 
"strange" errors regarding TCP transport.
IPC seems working fine. First of all the flow is the following:

           +------+     +------+
           | Sub1 | ... | SubN |
           +------+     +------+
              ^             ^
               \           /
                \         /
                +--+ +--+
                    | |
                    | |
                  +-----+
                  | Pub | TCP or IPC Transport
 ***------------+-----+---------------***
                  | Sub |
                  +-----+
                    ^ ^
                    | |
                 +--+ +--+
                /         \  INPROC Transport
               /           \
              /             \
           +------+     +------+
           | Pub1 | ... | PubN |  DataObserver Thread
           +------+     +------+
              ^             ^
              |             |
          +-------+     +-------+
          | Data1 | ... | DataN | data generator
          +-------+     +-------+

A test case of this flow can be represented by this simple python code

------------------- Publisher.py

import time
import random
import string
import threading

from nanomsg import SUB, SUB_SUBSCRIBE, PUB, Socket, Device

INPROC_URL = 'inproc://{}'
URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'


def in_publisher( name, stop ):
    s = Socket(PUB)
    s.bind(INPROC_URL.format(name))
    print "Enabling Data:", name

    while True:
        data =''.join(random.choice(string.ascii_uppercase + string.digits) for 
_ in range(8))
        s.send("Data|{}|{}".format(name, data))
        time.sleep(1)
        if stop():
            break


def in_subscriber( names, stop ):
    s = Socket(PUB)
    s.bind(URL)

    s1 = Socket(SUB)
    s1.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|') 

    for name in names:
        s1.connect(INPROC_URL.format(name))
        print "Subscribing Data:", name

    #d = Device( s1, s )
    #d.start()
    while True:
        # route the data from internal to external socket
        s.send(s1.recv())
        if stop():
            break


names = ['foo', 'bar', 'baz', 'bax']
threads = []
stop_threads = False

for name in names:
    t = threading.Thread(name=name, target=in_publisher, args=(name, lambda: 
stop_threads ))
    threads.append(t)
    t.start()

t = threading.Thread(target=in_subscriber, args=(names, lambda: stop_threads ))
threads.append(t)
t.start()

while True:
    a = raw_input()
    if a == 'q':
        stop_threads = True
        break

print "Quitting"

for t in threads:
    t.join()

print "Finish"

------------------- Subscriber.py

import time

from nanomsg import SUB, SUB_SUBSCRIBE, Socket

URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'

with Socket(SUB) as s:
    s.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|') 
    s.connect(URL)
    while True:
        a = s.recv()
        print "Received:", a
        time.sleep(1)

Everything works fine when using a IPC transport

Received: Data|foo|PT2U7TTC
Received: Data|bax|O5IW8MYF
Received: Data|bar|O2HUB4JF
Received: Data|baz|D01N4V72
Received: Data|bax|RGLBK4XL
Received: Data|foo|3CVJ2PZB
Received: Data|baz|G53IGSSK
Received: Data|bar|CWOCJGII
Received: Data|bax|LBJZF3AA
Received: Data|foo|79S9T0FD
Received: Data|bar|0RD7QVRG
Received: Data|baz|N1QAYK2B
Received: Data|bax|X3EMTKLN
Received: Data|bar|YWOSMW04

But as soon as I switch to TCP:

Traceback (most recent call last):
  File "Subscriber.py", line 20, in <module>
    s.connect(URL)
  File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 284, in connect
    wrapper.nn_connect(self.fd, address)
  File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 63, in _nn_check_positive_rtn
    raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument

Finally, regarding usage of the Device, if you uncomment the Device part in the 
publisher and comment the while
you get this

Traceback (most recent call last):
  File "Subscriber.py", line 20, in <module>
    s.connect(URL)
  File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 284, in connect
    wrapper.nn_connect(self.fd, address)
  File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 63, in _nn_check_positive_rtn
    raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument

Do you have an idea why this is happening? C implementation is showing the same 
problem.

Other related posts: