[nanomsg] Re: Planning nanomsg on multithreaded and distributed applications

  • From: Garrett D'Amore <garrett@xxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Tue, 10 Jan 2017 11:27:27 -0800

Your TCP URL is invalid for connect. You need to specify a specific IP address 
such as 127.0.0.1 - the use of a wild card IP address means INADDR_ANY and only 
works on the bind side. 

Sent from my iPhone

On Jan 10, 2017, at 10:35 AM, Roberto Fichera <kernel@xxxxxxxxxxxxx> wrote:

On 01/05/2017 12:46 PM, Roberto Fichera wrote:

Hi Garrett,

On 01/05/2017 11:39 AM, Garrett D'Amore wrote:
nanomsg works quite well for many use cases, and lots of people are using 
it.  (I think mangos works even better, but
it is only applicable for folks using Go.  It is wire compatible with 
nanomsg, so you can mix and match the two
easily.  Mangos does have some extra capabilities though — especially for 
websocket and TLS based security.)

[...]

I’m not that passionate about this, even though I’ve written one of the 
only wire protocol implementations of the SP
protocols (mangos) and am the current nanomsg maintainer/BDFL.  For me, 
ZeroMQ’s heavy weight approach to everything,
and its reliance on C++, made it unpleasant for me to work with, which is 
what drew me to nanomsg originally.  I like
the protocols, and I am super proud of Mangos which I think is a very 
robust messaging framework that I’ve used in a
variety of commercial settings (and I know others have too).  But my 
feelings won’t be hurt if you choose ZeroMQ.  I
do hope in a couple of weeks, if you have time, you’ll come back and have a 
look at libnng.  I think by then it will
really be worth looking at.  (nng stands for nanomsg next-gen.)

Best of luck to you!
Thanks Garrett for the explanation. Most likely I will give it a try 
starting from now to
the next couple of weeks. I don't have any problem to wait the upcoming 
libnng.

As said, a feature I really like to have is the notification of topic that a 
subscriber is interested at.



I've almost finished the C implementation of my logic, but it seems there 
some "strange" errors regarding TCP transport.
IPC seems working fine. First of all the flow is the following:

          +------+     +------+
          | Sub1 | ... | SubN |
          +------+     +------+
             ^             ^
              \           /
               \         /
               +--+ +--+
                   | |
                   | |
                 +-----+
                 | Pub | TCP or IPC Transport
***------------+-----+---------------***
                 | Sub |
                 +-----+
                   ^ ^
                   | |
                +--+ +--+
               /         \  INPROC Transport
              /           \
             /             \
          +------+     +------+
          | Pub1 | ... | PubN |  DataObserver Thread
          +------+     +------+
             ^             ^
             |             |
         +-------+     +-------+
         | Data1 | ... | DataN | data generator
         +-------+     +-------+

A test case of this flow can be represented by this simple python code

------------------- Publisher.py

import time
import random
import string
import threading

from nanomsg import SUB, SUB_SUBSCRIBE, PUB, Socket, Device

INPROC_URL = 'inproc://{}'
URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'


def in_publisher( name, stop ):
   s = Socket(PUB)
   s.bind(INPROC_URL.format(name))
   print "Enabling Data:", name

   while True:
       data =''.join(random.choice(string.ascii_uppercase + string.digits) 
for _ in range(8))
       s.send("Data|{}|{}".format(name, data))
       time.sleep(1)
       if stop():
           break


def in_subscriber( names, stop ):
   s = Socket(PUB)
   s.bind(URL)

   s1 = Socket(SUB)
   s1.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|') 

   for name in names:
       s1.connect(INPROC_URL.format(name))
       print "Subscribing Data:", name

   #d = Device( s1, s )
   #d.start()
   while True:
       # route the data from internal to external socket
       s.send(s1.recv())
       if stop():
           break


names = ['foo', 'bar', 'baz', 'bax']
threads = []
stop_threads = False

for name in names:
   t = threading.Thread(name=name, target=in_publisher, args=(name, lambda: 
stop_threads ))
   threads.append(t)
   t.start()

t = threading.Thread(target=in_subscriber, args=(names, lambda: stop_threads 
))
threads.append(t)
t.start()

while True:
   a = raw_input()
   if a == 'q':
       stop_threads = True
       break

print "Quitting"

for t in threads:
   t.join()

print "Finish"

------------------- Subscriber.py

import time

from nanomsg import SUB, SUB_SUBSCRIBE, Socket

URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'

with Socket(SUB) as s:
   s.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|') 
   s.connect(URL)
   while True:
       a = s.recv()
       print "Received:", a
       time.sleep(1)

Everything works fine when using a IPC transport

Received: Data|foo|PT2U7TTC
Received: Data|bax|O5IW8MYF
Received: Data|bar|O2HUB4JF
Received: Data|baz|D01N4V72
Received: Data|bax|RGLBK4XL
Received: Data|foo|3CVJ2PZB
Received: Data|baz|G53IGSSK
Received: Data|bar|CWOCJGII
Received: Data|bax|LBJZF3AA
Received: Data|foo|79S9T0FD
Received: Data|bar|0RD7QVRG
Received: Data|baz|N1QAYK2B
Received: Data|bax|X3EMTKLN
Received: Data|bar|YWOSMW04

But as soon as I switch to TCP:

Traceback (most recent call last):
 File "Subscriber.py", line 20, in <module>
   s.connect(URL)
 File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 284, in connect
   wrapper.nn_connect(self.fd, address)
 File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 63, in _nn_check_positive_rtn
   raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument

Finally, regarding usage of the Device, if you uncomment the Device part in 
the publisher and comment the while
you get this

Traceback (most recent call last):
 File "Subscriber.py", line 20, in <module>
   s.connect(URL)
 File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 284, in connect
   wrapper.nn_connect(self.fd, address)
 File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py", 
line 63, in _nn_check_positive_rtn
   raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument

Do you have an idea why this is happening? C implementation is showing the 
same problem.


Other related posts: