[nanomsg] Re: Planning nanomsg on multithreaded and distributed applications

  • From: "Garrett D'Amore" <garrett@xxxxxxxxxx>
  • To: "nanomsg@xxxxxxxxxxxxx" <nanomsg@xxxxxxxxxxxxx>
  • Date: Tue, 10 Jan 2017 23:40:46 -0800

Excellent, glad to hear it!

On Tue, Jan 10, 2017 at 11:38 PM, Roberto Fichera <kernel@xxxxxxxxxxxxx>
wrote:

On 01/10/2017 08:27 PM, Garrett D'Amore wrote:
Your TCP URL is invalid for connect. You need to specify a specific IP
address such as 127.0.0.1 - the use of a wild card IP address means
INADDR_ANY and only works on the bind side.

Right! And indeed worked fine!


Sent from my iPhone

On Jan 10, 2017, at 10:35 AM, Roberto Fichera <kernel@xxxxxxxxxxxxx>
wrote:

On 01/05/2017 12:46 PM, Roberto Fichera wrote:

Hi Garrett,

On 01/05/2017 11:39 AM, Garrett D'Amore wrote:
nanomsg works quite well for many use cases, and lots of people are
using it.  (I think mangos works even better, but
it is only applicable for folks using Go.  It is wire compatible with
nanomsg, so you can mix and match the two
easily.  Mangos does have some extra capabilities though — especially
for websocket and TLS based security.)

[...]

I’m not that passionate about this, even though I’ve written one of
the only wire protocol implementations of the SP
protocols (mangos) and am the current nanomsg maintainer/BDFL.  For
me, ZeroMQ’s heavy weight approach to everything,
and its reliance on C++, made it unpleasant for me to work with,
which is what drew me to nanomsg originally.  I like
the protocols, and I am super proud of Mangos which I think is a very
robust messaging framework that I’ve used in a
variety of commercial settings (and I know others have too).  But my
feelings won’t be hurt if you choose ZeroMQ.  I
do hope in a couple of weeks, if you have time, you’ll come back and
have a look at libnng.  I think by then it will
really be worth looking at.  (nng stands for nanomsg next-gen.)

Best of luck to you!
Thanks Garrett for the explanation. Most likely I will give it a try
starting from now to
the next couple of weeks. I don't have any problem to wait the
upcoming libnng.

As said, a feature I really like to have is the notification of topic
that a subscriber is interested at.


I've almost finished the C implementation of my logic, but it seems
there some "strange" errors regarding TCP transport.
IPC seems working fine. First of all the flow is the following:

          +------+     +------+
          | Sub1 | ... | SubN |
          +------+     +------+
             ^             ^
              \           /
               \         /
               +--+ +--+
                   | |
                   | |
                 +-----+
                 | Pub | TCP or IPC Transport
***------------+-----+---------------***
                 | Sub |
                 +-----+
                   ^ ^
                   | |
                +--+ +--+
               /         \  INPROC Transport
              /           \
             /             \
          +------+     +------+
          | Pub1 | ... | PubN |  DataObserver Thread
          +------+     +------+
             ^             ^
             |             |
         +-------+     +-------+
         | Data1 | ... | DataN | data generator
         +-------+     +-------+

A test case of this flow can be represented by this simple python code

------------------- Publisher.py

import time
import random
import string
import threading

from nanomsg import SUB, SUB_SUBSCRIBE, PUB, Socket, Device

INPROC_URL = 'inproc://{}'
URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'


def in_publisher( name, stop ):
   s = Socket(PUB)
   s.bind(INPROC_URL.format(name))
   print "Enabling Data:", name

   while True:
       data =''.join(random.choice(string.ascii_uppercase +
string.digits) for _ in range(8))
       s.send("Data|{}|{}".format(name, data))
       time.sleep(1)
       if stop():
           break


def in_subscriber( names, stop ):
   s = Socket(PUB)
   s.bind(URL)

   s1 = Socket(SUB)
   s1.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|')

   for name in names:
       s1.connect(INPROC_URL.format(name))
       print "Subscribing Data:", name

   #d = Device( s1, s )
   #d.start()
   while True:
       # route the data from internal to external socket
       s.send(s1.recv())
       if stop():
           break


names = ['foo', 'bar', 'baz', 'bax']
threads = []
stop_threads = False

for name in names:
   t = threading.Thread(name=name, target=in_publisher, args=(name,
lambda: stop_threads ))
   threads.append(t)
   t.start()

t = threading.Thread(target=in_subscriber, args=(names, lambda:
stop_threads ))
threads.append(t)
t.start()

while True:
   a = raw_input()
   if a == 'q':
       stop_threads = True
       break

print "Quitting"

for t in threads:
   t.join()

print "Finish"

------------------- Subscriber.py

import time

from nanomsg import SUB, SUB_SUBSCRIBE, Socket

URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'

with Socket(SUB) as s:
   s.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|')
   s.connect(URL)
   while True:
       a = s.recv()
       print "Received:", a
       time.sleep(1)

Everything works fine when using a IPC transport

Received: Data|foo|PT2U7TTC
Received: Data|bax|O5IW8MYF
Received: Data|bar|O2HUB4JF
Received: Data|baz|D01N4V72
Received: Data|bax|RGLBK4XL
Received: Data|foo|3CVJ2PZB
Received: Data|baz|G53IGSSK
Received: Data|bar|CWOCJGII
Received: Data|bax|LBJZF3AA
Received: Data|foo|79S9T0FD
Received: Data|bar|0RD7QVRG
Received: Data|baz|N1QAYK2B
Received: Data|bax|X3EMTKLN
Received: Data|bar|YWOSMW04

But as soon as I switch to TCP:

Traceback (most recent call last):
 File "Subscriber.py", line 20, in <module>
   s.connect(URL)
 File 
"/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 284, in connect
   wrapper.nn_connect(self.fd, address)
 File 
"/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 63, in _nn_check_positive_rtn
   raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument

Finally, regarding usage of the Device, if you uncomment the Device
part in the publisher and comment the while
you get this

Traceback (most recent call last):
 File "Subscriber.py", line 20, in <module>
   s.connect(URL)
 File 
"/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 284, in connect
   wrapper.nn_connect(self.fd, address)
 File 
"/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 63, in _nn_check_positive_rtn
   raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument

Do you have an idea why this is happening? C implementation is showing
the same problem.





Other related posts: