[nanomsg] Best practices to high availability pubsub

  • From: Pierre Salmon <pierre.salmon@xxxxxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Wed, 12 Nov 2014 17:36:55 +0100

Hi,

For a project, i create a subscriber system for multiple pub processing:
In one thread, i retrieve data from sub nanomsg socket with poll functions to push it in lock free queue. I dont used subscribing and, for test, i just used one pub processing and one subcriber system.

My pseudo code is:

while (1)
{
    entries[0].fs = socket(NN_SUB);
    entries[0].events = NN_POLLIN;

    poll(entries, 1, 5000);
    if (pfs[0].revents & NN_POLL_IN)
    {
size_t size = nn_recv(entries[0].fs, buffer, capacity, NN_DONT_WAIT);
        int tick = 0;
        while (size > 0)
        {
            push_to_queue(buffer, size);
            size = nn_recv(entries[0].fs, buffer, capacity, NN_DONT_WAIT);
            ++tick;
        }
    }
}
I publish data packet of 200 bytes from publish process with pseudocode:
nn_send(s, buffer, size, 0);

When input flow data is slow (~10Mbytes/s), it's work fine but if i send data to max speed i have some pb: - i lose packet (10~50% of packet lose) - i send in packet a sequence number and i check in reception
 - max data flow is 60 Mbytes/s on ipc and tcp

In this case, i have some questions:
- when i retrieve data ( loop while(size > 1)), i noticed tick value can be very large (>500000). should be stop retrieve data to release thread an restart poll ? - if push to queue is to long, is it possible to lose packet. How can i check in nanomsg ? Debug env does not provide this information
 - Does it better to run a thread by socket pub connection ?

Pierre

Pierre

Other related posts: