[nanomsg] Re: Best practices to high availability pubsub

  • From: Pierre Salmon <pierre.salmon@xxxxxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Wed, 12 Nov 2014 18:03:38 +0100

Thx Garrett,

In this case, the better way is to use another pattern (like BUS). But, to increase performance, should I handle every socket in a separate thread ?

Pierre

On 11/12/2014 05:43 PM, Garrett D'Amore wrote:
pub/sub handling is best effort.  At high data rates, “backpressure” will 
result in events being dropped.   This is by design.  If you need reliable 
delivery, you need a full duplex protocol like req/rep.

I don’t believe that there are any exposed APIs to notice queue full 
conditions.  And sadly, your queue full condition may occur at other points on 
the network path (e.g. the remove subscribers).

Again, if you’re needing reliable delivery, pub/sub is the wrong pattern for 
you.   You can use req/rep, and you could even use pub/sub to do a “wakeup” 
(for example to wake a program that would otherwise sleep for a while to poll 
on a state change), but I wouldn’t convey mission critical data exclusively 
through pub/sub, and you need to expect that it can happen that you’ll miss 
events in pub/sub.

   - Garrett

On Nov 12, 2014, at 8:36 AM, Pierre Salmon <pierre.salmon@xxxxxxxxxxxxx> wrote:

Hi,

For a project, i create a subscriber system for multiple pub processing:
In one thread, i retrieve data from sub nanomsg socket with poll functions to 
push it in lock free queue. I dont used subscribing and, for test, i just used 
one pub processing and one subcriber system.

My pseudo code is:

while (1)
{
    entries[0].fs = socket(NN_SUB);
    entries[0].events = NN_POLLIN;

    poll(entries, 1, 5000);
    if (pfs[0].revents & NN_POLL_IN)
    {
        size_t size = nn_recv(entries[0].fs, buffer, capacity, NN_DONT_WAIT);
        int tick = 0;
        while (size > 0)
        {
            push_to_queue(buffer, size);
            size = nn_recv(entries[0].fs, buffer, capacity, NN_DONT_WAIT);
            ++tick;
        }
    }
}
I publish data packet of 200 bytes from publish process with pseudocode:
nn_send(s, buffer, size, 0);

When input flow data is slow (~10Mbytes/s), it's work fine but if i send data 
to max speed i have some pb:
- i lose packet (10~50% of packet lose) - i send in packet a sequence number 
and i check in reception
- max data flow is 60 Mbytes/s on ipc and tcp

In this case, i have some questions:
- when i retrieve data ( loop while(size > 1)), i noticed tick value can be very 
large (>500000). should be stop retrieve data to release thread an restart poll ?
- if push to queue is to long, is it possible to lose packet. How can i check 
in nanomsg ? Debug env does not provide this information
- Does it better to run a thread by socket pub connection ?

Pierre

Pierre




Other related posts: