[nanomsg] Problem with simple pub/sub app

  • From: Immanuel Weber <immanuel.weber@xxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Tue, 1 Oct 2013 17:23:26 +0200

Hi all,
I'm trying to setup a simple test app for the pub/sub pattern, where one
thread is spawned to publish 10 messages and a second thread is spawned to
receive 5 of them. (Source code at the end of the mail)
If I use the inproc transport, that works fine. But when I switch to tcp, I
get an assertion after the subscriber received the last (fifth) message:
Assertion failed: self->state == NN_USOCK_STATE_ACTIVE
(c:\users\iweber\documents\gitrepos\nanomsg\src\aio\usock_win.inc:326)
The state actually is NN_USOCK_STATE_DONE, hence the assertion.
The callstack is
0 NMSG_WRITE MSVCR110D 0x7fed99d5692
1 abort MSVCR110D 0x7fed9afe8b4
2 nn_err_abort err.c 34 0x7fee9b0436b
3 nn_usock_send usock_win.inc 326 0x7fee9afe29b
4 nn_stcp_send stcp.c 146 0x7fee9b2219a
5 nn_pipe_send pipe.c 154 0x7fee9af594c
6 nn_dist_send dist.c 95 0x7fee9b074e8
7 nn_pub_send pub.c 159 0x7fee9b0b809
8 nn_sock_send sock.c 524 0x7fee9af6a3b
9 nn_send global.c 522 0x7fee9af373c
10 nn::socket::send nn.hpp 143 0x13fe846b4
11 publisher main.cpp 37 0x13fe64d24

So the assertion is not produced by the subscriber, but by the publisher,
which is kinda strange, I think. Why should its socket be in the done
state? I would expect the subscriber socket to be done after its shutdown...
I can however prevent that, if I add a Sleep(1000) statement after the loop
of the subscriber. Which delays the shutdown of the subscriber socket.

Any ideas?

Immanuel

p.s. I know that the thread spawning mechanism is maybe not the best way to
do, but actually I'm implementing the app using Qt and just created a
minimal example without Qt dependencies. The Qt version however shows exact
the same problem.

#include <nn.hpp>
#include <nanomsg/pubsub.h>
#include <Windows.h>
#include <future>
#include <memory>
#include <iostream>

int publisher();
int subscriber();

int main(int argc, char *argv[])
{
    std::future<int> pubFut = std::async(std::launch::async, []{
publisher(); });
    std::future<int> subFut = std::async(std::launch::async, []{
subscriber(); });
    auto retPub = pubFut.get();
    auto retSub = subFut.get();
}

int publisher() {
    auto pub = std::unique_ptr<nn::socket>(new nn::socket(AF_SP, NN_PUB));
    auto ret = pub->bind("tcp://127.0.0.1:5560");
    auto n = 0;
    while (n++ < 10) {
        char msg[] = {"TestMsg"};
        auto msgLen = strlen(msg) + 1;
        auto bytesSent = pub->send(msg, msgLen, 0);
        std::cout << "Publisher: sent message \"" << msg << "\" (" <<
bytesSent << ")" << std::endl;
        Sleep(500);
    }
    pub->shutdown(ret);
    return 0;
}

int subscriber() {
    auto sub = std::unique_ptr<nn::socket>(new nn::socket(AF_SP, NN_SUB));
    sub->setsockopt(NN_SUB, NN_SUB_SUBSCRIBE, "", 0);
    auto ret = sub->connect("tcp://127.0.0.1:5560");
    auto n = 0;
    while (n++ < 5) {
        char * buf = nullptr;
        auto bytesRecv = sub->recv(&buf, NN_MSG, 0);
        std::cout << "Subscriber: received message \"" << buf << "\" (" <<
bytesRecv << ")" << std::endl;
        if (bytesRecv > 0) {
            auto msg = QString::fromUtf8(buf);
            nn::freemsg(buf);
        }
    }
    // Sleep(1000);
    sub->shutdown(ret);
    return 0;
}

Other related posts: