[nanomsg] nn_recv

  • From: "Engelbert Roidl" <dmarc-noreply-outsider@xxxxxxxxxxxxx> (Redacted sender "engelbert.roidl@xxxxxxxxxxxxxx" for DMARC)
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Tue, 28 Jul 2015 22:40:41 +0200

Hi,

I'm trying the followin:

One dispatcherprocess. Bus topology. Processes sends Data to this Bus.
Other Processes subscribe this data.

Source:

"bus","Adresse Bus tcp://127.0.0.1:7000"
"pub","Adresse Bus tcp://127.0.0.1:7001"

-----------------------------------------------

int bussock = nn_socket (AF_SP, NN_BUS);
nn_bind (bussock, (char*)bus.c_str());
sleep (1); // wait for connections

int pubsock = nn_socket (AF_SP, NN_PUB);
nn_bind (pubsock, (char*)pub.c_str());
sleep (1); // wait for connections

int to = 100;
nn_setsockopt (bussock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to));
nn_setsockopt (pubsock, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to));

while (1)
{
// RECV
//char *buf = NULL;
char *buf=(char*)nn_allocmsg(4096,0);
char buf1[4096];
memset(buf, 0, sizeof(buf));
memset(buf1, 0, sizeof(buf1));

int recv = nn_recv (bussock, &buf, NN_MSG, 0);
if (recv >= 0)
{
strncpy(buf1, buf, recv);

printf ("RECEIVED %ld - %d '%s' FROM BUS sizeof %d\n",
(unsigned)time(NULL), recv, buf1, strlen(buf), strlen(buf1));
int send = nn_send (pubsock, buf1, strlen(buf1)+1, 0);
send = nn_send (bussock, buf1, strlen(buf1)+1, 0);
printf ("SENT %ld - %d '%s' TO PUB\n", (unsigned)time(NULL),
send, buf1);

nn_freemsg (buf);
}
}

-----------------------------------------------

My Clients uses a class BusConnection.

-------------------------------------------

BusConnection::BusConnection(char *addr, int port, int type)
{
socket = nn_socket (AF_SP, type);
char adr[100];
sprintf(adr, "tcp://%s:%d",addr,port);
int rc = nn_connect (socket, adr); // "tcp://127.0.0.1:7777");
nn_strerror (errno);
printf("%d connect(%d): %s - %d\n", socket, rc, adr, type);

int to = 100;
int sndto = 1;

size_t sz = sizeof(fd);
nn_setsockopt (socket, NN_SOL_SOCKET, NN_RCVTIMEO, &to, sizeof (to));
nn_setsockopt (socket, NN_SOL_SOCKET, NN_SNDTIMEO, &sndto, sizeof
(sndto));

nn_getsockopt (socket, NN_SOL_SOCKET, NN_RCVFD, &fd, &sz);
}

void BusConnection::SendToBus(char *msg)
{
int sz_msg = strlen (msg) + 1; // +\0
int send = nn_send (socket, msg, sz_msg, NN_DONTWAIT);

struct timeval tv;
gettimeofday(&tv,NULL);
unsigned long time_in_micros = 1000000 * tv.tv_sec + tv.tv_usec;

if(send >= 0)
{
printf("%ld BusConnection::SendToBus (%d) - %s - ", time_in_micros,
sz_msg, msg);
} else {
if(errno == EAGAIN)
printf("Error: %s\n", nn_strerror(errno));
else
printf("\n");
}
}

void BusConnection::Subscribe(char *sub)
{
int sz_sub = strlen (sub); // +\0
printf("%d subscribe: %s\n", socket, sub);
int r = nn_setsockopt (socket, NN_SUB, NN_SUB_SUBSCRIBE, sub, sz_sub);
}

--------------------------------------------------------------------------------

Modul is derived from BusConnection

--------------------------------------------------------------------------------

class Modul
{
public:
.......
protected:

private:
BusConnection Channel_bus;
BusConnection Channel_pub;
................
};



Modul::Modul(char *channeladdr, int chport) :
Channel_bus(channeladdr, chport, NN_BUS),
Channel_pub(channeladdr, chport+1, NN_SUB)
{
}

void Modul::SubscribeChannel (char *ch)
{
Channel_pub.Subscribe(ch);
}

void Modul::SendChannel(char *message)
{
Channel_bus.SendToBus (message);
}

int Modul::PollBusses()
{
time_t rawtime;
struct tm * timeinfo;

time (&rawtime);
timeinfo = localtime (&rawtime);

int recv=0;
struct nn_pollfd pfd [1];
int nbytes;

char *buf=(char*)nn_allocmsg(4096,0);
char buf1[4096];
memset(buf, 0, sizeof(buf));
memset(buf1, 0, sizeof(buf1));

pfd [0].fd = Channel_pub.GetSocket();
pfd [0].events = NN_POLLIN;

int rc = nn_poll (pfd, 1, 1);
printf("Modul::PollBusses - %d=poll()\n", rc);
if(rc>0) // 0=timeout -1=Error
{
if (pfd[0].revents & NN_POLLIN) // Message can be received from
cmddatabus
{
// RECV
recv = nn_recv (pfd[0].fd, &buf, NN_MSG, NN_DONTWAIT);

if (recv >= 0)
{
strncpy(buf1, buf, recv);
printf ("RECEIVED %d '%s' FROM ChannelBUS sizeof %d\n", recv,
(char*)buf1, strlen(buf1));
OnChannelReceived((char*)buf1);
} else {
switch(errno)
{
case EAGAIN:
printf("Modul::PollBusses - %d=nn_recv()- EAGAIN Error
%d %s \n",recv, errno, nn_strerror(errno));
break;
default:
printf("Modul::PollBusses - %d=nn_recv()- Unknown Error
%d %s \n",recv, errno, nn_strerror(errno));
break;
}

}
nn_freemsg (buf);
}
}
return(recv);
}

int Modul::OnChannelReceived(char *buf)
{
}

------------------------------------------------------------------------------

If I start the dispatcher and clients connection and subscription works
fine.
If Data is sent to the bus, it is received by the subscriber.... but if i
send 10 datapackets, which are not subscribed
the client (poll) will recieve something, but no data is there.

Here is an example output from the classes:


papa@debian:~/meghas/src/build/ahrs2$ ./ahrs2 -i AHRS2 -c 127.0.0.1
0 connect(1): tcp://127.0.0.1:7000 - 112
1 connect(1): tcp://127.0.0.1:7001 - 33
1 subscribe: 100|
1 subscribe: 2|

Modul::PollBusses - 0=poll()
Modul::PollBusses - 0=poll()
Modul::PollBusses - 0=poll()
.........................
Modul::PollBusses - 0=poll()
Modul::PollBusses - 1=poll()

RECEIVED 8 '100|8|1' FROM ChannelBUS sizeof 7
Equipment::OnChannelReceived 100|8|1
Equipment switched ON
Modul::PollBusses - 0=poll()
Event: (0.000000 / 0 / 1) Equipment::INIT
1438113659903252 BusConnection::SendToBus (9) - 18|123|1 -
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable

Modul::PollBusses - 0=poll()
Modul::PollBusses - 0=poll()

Event: (5.000000 / 5 / 1) Equipment::RUNNING
1438113664899077 BusConnection::SendToBus (9) - 18|123|2 -
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113664996903 BusConnection::SendToBus (64) - 10|10|1438113664996699 -
Tue Jul 28 22:01:04 2015
- 1438113664 - 1438113664997064 BusConnection::SendToBus (64) -
10|10|1438113664996699 - Tue Jul 28 22:01:04 2015
- 1438113664 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665096841 BusConnection::SendToBus (64) - 10|10|1438113665096668 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665096991 BusConnection::SendToBus (64) -
10|10|1438113665096668 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665196847 BusConnection::SendToBus (64) - 10|10|1438113665196672 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665196997 BusConnection::SendToBus (64) -
10|10|1438113665196672 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665296812 BusConnection::SendToBus (64) - 10|10|1438113665296642 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665296952 BusConnection::SendToBus (64) -
10|10|1438113665296642 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665396847 BusConnection::SendToBus (64) - 10|10|1438113665396674 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665396999 BusConnection::SendToBus (64) -
10|10|1438113665396674 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665496829 BusConnection::SendToBus (64) - 10|10|1438113665496651 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665496987 BusConnection::SendToBus (64) -
10|10|1438113665496651 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665596925 BusConnection::SendToBus (64) - 10|10|1438113665596700 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665597093 BusConnection::SendToBus (64) -
10|10|1438113665596700 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665696868 BusConnection::SendToBus (64) - 10|10|1438113665696695 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665697019 BusConnection::SendToBus (64) -
10|10|1438113665696695 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665796872 BusConnection::SendToBus (64) - 10|10|1438113665796699 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665797029 BusConnection::SendToBus (64) -
10|10|1438113665796699 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665896848 BusConnection::SendToBus (64) - 10|10|1438113665896685 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665897047 BusConnection::SendToBus (64) -
10|10|1438113665896685 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113665996851 BusConnection::SendToBus (64) - 10|10|1438113665996689 -
Tue Jul 28 22:01:05 2015
- 1438113665 - 1438113665997010 BusConnection::SendToBus (64) -
10|10|1438113665996689 - Tue Jul 28 22:01:05 2015
- 1438113665 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666096828 BusConnection::SendToBus (64) - 10|10|1438113666096681 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666096973 BusConnection::SendToBus (64) -
10|10|1438113666096681 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666196865 BusConnection::SendToBus (64) - 10|10|1438113666196710 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666197002 BusConnection::SendToBus (64) -
10|10|1438113666196710 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666296794 BusConnection::SendToBus (64) - 10|10|1438113666296649 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666296934 BusConnection::SendToBus (64) -
10|10|1438113666296649 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666396829 BusConnection::SendToBus (64) - 10|10|1438113666396676 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666396966 BusConnection::SendToBus (64) -
10|10|1438113666396676 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666496827 BusConnection::SendToBus (64) - 10|10|1438113666496676 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666496964 BusConnection::SendToBus (64) -
10|10|1438113666496676 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666596818 BusConnection::SendToBus (64) - 10|10|1438113666596669 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666596956 BusConnection::SendToBus (64) -
10|10|1438113666596669 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666696856 BusConnection::SendToBus (64) - 10|10|1438113666696708 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666696991 BusConnection::SendToBus (64) -
10|10|1438113666696708 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666796860 BusConnection::SendToBus (64) - 10|10|1438113666796703 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666797014 BusConnection::SendToBus (64) -
10|10|1438113666796703 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666896864 BusConnection::SendToBus (64) - 10|10|1438113666896708 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666897010 BusConnection::SendToBus (64) -
10|10|1438113666896708 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113666996872 BusConnection::SendToBus (64) - 10|10|1438113666996725 -
Tue Jul 28 22:01:06 2015
- 1438113666 - 1438113666997008 BusConnection::SendToBus (64) -
10|10|1438113666996725 - Tue Jul 28 22:01:06 2015
- 1438113666 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113667096875 BusConnection::SendToBus (64) - 10|10|1438113667096721 -
Tue Jul 28 22:01:07 2015
- 1438113667 - 1438113667097021 BusConnection::SendToBus (64) -
10|10|1438113667096721 - Tue Jul 28 22:01:07 2015
- 1438113667 - Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
1438113667196865 BusConnection::SendToBus (64) - 10|10|1438113667196710 -
Tue Jul 28 22:01:07 2015
- 1438113667 - 1438113667197012 BusConnection::SendToBus (64) -
10|10|1438113667196710 - Tue Jul 28 22:01:07 2015
- 1438113667 - Modul::PollBusses - 1=poll()
RECEIVED 8 '100|8|0' FROM ChannelBUS sizeof 7
Equipment::OnChannelReceived 100|8|0
Equipment switched OFF
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 1=poll()
Modul::PollBusses - -1=nn_recv()- EAGAIN Error 11 Resource temporarily
unavailable
Modul::PollBusses - 0=poll()
Modul::PollBusses - 0=poll()

I don't find my error. Why are as many wrong -1=nn_recv() EAGAIN Errors as
I receive correct data packages?

The problem seems to exist if my client sends 1 datapacket to the bus....
if it sends nothing recieving data seems to work.

Can anyone help me please?! :)

Other related posts:

  • » [nanomsg] nn_recv - Engelbert Roidl