[nanomsg] Re: Planning nanomsg on multithreaded and distributed applications
- From: Roberto Fichera <kernel@xxxxxxxxxxxxx>
- To: nanomsg@xxxxxxxxxxxxx
- Date: Tue, 10 Jan 2017 19:35:45 +0100
On 01/05/2017 12:46 PM, Roberto Fichera wrote:
Hi Garrett,
On 01/05/2017 11:39 AM, Garrett D'Amore wrote:
nanomsg works quite well for many use cases, and lots of people are using
it. (I think mangos works even better, but
it is only applicable for folks using Go. It is wire compatible with
nanomsg, so you can mix and match the two
easily. Mangos does have some extra capabilities though — especially for
websocket and TLS based security.)
[...]
I’m not that passionate about this, even though I’ve written one of the only
wire protocol implementations of the SP
protocols (mangos) and am the current nanomsg maintainer/BDFL. For me,
ZeroMQ’s heavy weight approach to everything,
and its reliance on C++, made it unpleasant for me to work with, which is
what drew me to nanomsg originally. I like
the protocols, and I am super proud of Mangos which I think is a very robust
messaging framework that I’ve used in a
variety of commercial settings (and I know others have too). But my
feelings won’t be hurt if you choose ZeroMQ. I
do hope in a couple of weeks, if you have time, you’ll come back and have a
look at libnng. I think by then it will
really be worth looking at. (nng stands for nanomsg next-gen.)
Best of luck to you!
Thanks Garrett for the explanation. Most likely I will give it a try starting
from now to
the next couple of weeks. I don't have any problem to wait the upcoming
libnng.
As said, a feature I really like to have is the notification of topic that a
subscriber is interested at.
I've almost finished the C implementation of my logic, but it seems there some
"strange" errors regarding TCP transport.
IPC seems working fine. First of all the flow is the following:
+------+ +------+
| Sub1 | ... | SubN |
+------+ +------+
^ ^
\ /
\ /
+--+ +--+
| |
| |
+-----+
| Pub | TCP or IPC Transport
***------------+-----+---------------***
| Sub |
+-----+
^ ^
| |
+--+ +--+
/ \ INPROC Transport
/ \
/ \
+------+ +------+
| Pub1 | ... | PubN | DataObserver Thread
+------+ +------+
^ ^
| |
+-------+ +-------+
| Data1 | ... | DataN | data generator
+-------+ +-------+
A test case of this flow can be represented by this simple python code
------------------- Publisher.py
import time
import random
import string
import threading
from nanomsg import SUB, SUB_SUBSCRIBE, PUB, Socket, Device
INPROC_URL = 'inproc://{}'
URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'
def in_publisher( name, stop ):
s = Socket(PUB)
s.bind(INPROC_URL.format(name))
print "Enabling Data:", name
while True:
data =''.join(random.choice(string.ascii_uppercase + string.digits) for
_ in range(8))
s.send("Data|{}|{}".format(name, data))
time.sleep(1)
if stop():
break
def in_subscriber( names, stop ):
s = Socket(PUB)
s.bind(URL)
s1 = Socket(SUB)
s1.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|')
for name in names:
s1.connect(INPROC_URL.format(name))
print "Subscribing Data:", name
#d = Device( s1, s )
#d.start()
while True:
# route the data from internal to external socket
s.send(s1.recv())
if stop():
break
names = ['foo', 'bar', 'baz', 'bax']
threads = []
stop_threads = False
for name in names:
t = threading.Thread(name=name, target=in_publisher, args=(name, lambda:
stop_threads ))
threads.append(t)
t.start()
t = threading.Thread(target=in_subscriber, args=(names, lambda: stop_threads ))
threads.append(t)
t.start()
while True:
a = raw_input()
if a == 'q':
stop_threads = True
break
print "Quitting"
for t in threads:
t.join()
print "Finish"
------------------- Subscriber.py
import time
from nanomsg import SUB, SUB_SUBSCRIBE, Socket
URL = 'ipc:///tmp/publisher.ipc'
#URL = 'tcp://*:2345'
with Socket(SUB) as s:
s.set_string_option(SUB, SUB_SUBSCRIBE, 'Data|')
s.connect(URL)
while True:
a = s.recv()
print "Received:", a
time.sleep(1)
Everything works fine when using a IPC transport
Received: Data|foo|PT2U7TTC
Received: Data|bax|O5IW8MYF
Received: Data|bar|O2HUB4JF
Received: Data|baz|D01N4V72
Received: Data|bax|RGLBK4XL
Received: Data|foo|3CVJ2PZB
Received: Data|baz|G53IGSSK
Received: Data|bar|CWOCJGII
Received: Data|bax|LBJZF3AA
Received: Data|foo|79S9T0FD
Received: Data|bar|0RD7QVRG
Received: Data|baz|N1QAYK2B
Received: Data|bax|X3EMTKLN
Received: Data|bar|YWOSMW04
But as soon as I switch to TCP:
Traceback (most recent call last):
File "Subscriber.py", line 20, in <module>
s.connect(URL)
File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 284, in connect
wrapper.nn_connect(self.fd, address)
File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 63, in _nn_check_positive_rtn
raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument
Finally, regarding usage of the Device, if you uncomment the Device part in the
publisher and comment the while
you get this
Traceback (most recent call last):
File "Subscriber.py", line 20, in <module>
s.connect(URL)
File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 284, in connect
wrapper.nn_connect(self.fd, address)
File "/home/roberto/.local/lib/python2.7/site-packages/nanomsg/__init__.py",
line 63, in _nn_check_positive_rtn
raise NanoMsgAPIError()
nanomsg.NanoMsgAPIError: Invalid argument
Do you have an idea why this is happening? C implementation is showing the same
problem.
Other related posts: