Hi, For a project, i create a subscriber system for multiple pub processing:In one thread, i retrieve data from sub nanomsg socket with poll functions to push it in lock free queue. I dont used subscribing and, for test, i just used one pub processing and one subcriber system.
My pseudo code is: while (1) { entries[0].fs = socket(NN_SUB); entries[0].events = NN_POLLIN; poll(entries, 1, 5000); if (pfs[0].revents & NN_POLL_IN) {size_t size = nn_recv(entries[0].fs, buffer, capacity, NN_DONT_WAIT);
int tick = 0; while (size > 0) { push_to_queue(buffer, size); size = nn_recv(entries[0].fs, buffer, capacity, NN_DONT_WAIT); ++tick; } } } I publish data packet of 200 bytes from publish process with pseudocode: nn_send(s, buffer, size, 0);When input flow data is slow (~10Mbytes/s), it's work fine but if i send data to max speed i have some pb: - i lose packet (10~50% of packet lose) - i send in packet a sequence number and i check in reception
- max data flow is 60 Mbytes/s on ipc and tcp In this case, i have some questions:- when i retrieve data ( loop while(size > 1)), i noticed tick value can be very large (>500000). should be stop retrieve data to release thread an restart poll ? - if push to queue is to long, is it possible to lose packet. How can i check in nanomsg ? Debug env does not provide this information
- Does it better to run a thread by socket pub connection ? Pierre Pierre