Hi Martin, Yes, I'm. Best, - A On Jul 9, 2014 3:54 AM, "Martin Sustrik" <sustrik@xxxxxxxxxx> wrote: > -----BEGIN PGP SIGNED MESSAGE----- > Hash: SHA1 > > Hi Ark, > > We'll have to split the patch into it's constituent patches before > applying it to the mainline. > > In the meantime, can you state that you are submitting the patch under > MIT/X11 license? > > Thanks! > Martin > > On 09/07/14 09:50, Ark Degtiarov wrote: > > Hi, this is combined patch which attempts to address a few issues I > > have encountered while evaluating nanomsg: 1. 'make diagrams': a. > > Handle ValueError exception in Visitor::visit() b. Slightly > > adjusted src/aio/usock_posix.inc to eliminate warnings 2. Race > > conditions while stressing IPC usock life cycle a. race conditions > > due to *->state modifications after nn_worker_execute() b. race > > conditions in nn_sock_send and nn_sock_recv c. occasional unclean > > usock termination due to un-served task_recv d. added > > tests/ipc_stress.c to validate fixes for 2.a-2.c. > > > > All testing was done on FC20-latest machines only. > > > > On tests/ipc_stress.c: This test runs longer than others. Perhaps > > longer than necessary and could be trimmed down if desired. > > > > Thanks for very solid piece of architecture and design! > > > > --A > > > > > > diff --git .gitignore .gitignore index e1575c1..cc4c2ef 100644 --- > > .gitignore +++ .gitignore @@ -64,6 +64,7 @@ tests/inproc_shutdown > > tests/iovec tests/ipc tests/ipc_shutdown +tests/ipc_stress > > tests/list tests/msg tests/pair diff --git Makefile.am Makefile.am > > index e140e09..79f2be1 100644 --- Makefile.am +++ Makefile.am @@ > > -436,6 +436,7 @@ TRANSPORT_TESTS = \ tests/inproc_shutdown \ > > tests/ipc \ tests/ipc_shutdown \ + tests/ipc_stress \ tests/tcp > > \ tests/tcp_shutdown > > > > @@ -489,7 +490,7 @@ nanocat_SOURCES = \ > > > > if NANOCAT bin_PROGRAMS += nanocat -endif NANOCAT +endif #NANOCAT > > > > > ################################################################################ > > > > > # > > RFCs > > # @@ -542,8 +543,25 @@ install-exec-hook: $(LN_S) -f nanocat > > nn_bus$(EXEEXT) && \ $(LN_S) -f nanocat nn_pair$(EXEEXT) > > > > -endif SYMLINKS -endif NANOCAT +uninstall-hook: + cd > > $(DESTDIR)$(bindir) && \ + rm -f nn_push$(EXEEXT) ; \ + > > rm -f nn_pull$(EXEEXT) ; \ + rm -f nn_pub$(EXEEXT) ; \ + > > rm -f nn_sub$(EXEEXT) ; \ + rm -f nn_req$(EXEEXT) ; \ + > > rm -f nn_rep$(EXEEXT) ; \ + rm -f nn_surveyor$(EXEEXT) ; > > \ + rm -f nn_respondent$(EXEEXT) ; \ + rm -f > > nn_bus$(EXEEXT) ; \ + rm -f nn_pair$(EXEEXT) + cd > > $(DESTDIR)$(libdir) && \ + rm -f libnanomsg.* + cd > > $(DESTDIR)$(includedir) && \ + rm -rf nanomsg + +endif > > #SYMLINKS +endif #NANOCAT > > > > > > EXTRA_DIST += \ diff --git doc/diag.py doc/diag.py index > > e004431..3b01ba5 100755 --- doc/diag.py +++ doc/diag.py @@ -144,7 > > +144,10 @@ class Visitor(object): self.visit(cursor) > > > > def visit(self, cursor): - name = cursor.kind.name + > > try: + name = cursor.kind.name + except > > ValueError: + name = 'VERY_BAD_NAME' meth = > > getattr(self, 'enter_' + name, None) if meth is not None: res = > > meth(cursor) diff --git src/aio/timer.c src/aio/timer.c index > > 9a45880..0416c9c 100644 --- src/aio/timer.c +++ src/aio/timer.c @@ > > -95,8 +95,8 @@ static void nn_timer_shutdown (struct nn_fsm *self, > > int src, int type, timer = nn_cont (self, struct nn_timer, fsm); > > > > if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { - > > nn_worker_execute (timer->worker, &timer->stop_task); timer->state > > = NN_TIMER_STATE_STOPPING; + nn_worker_execute > > (timer->worker, &timer->stop_task); return; } if (nn_slow > > (timer->state == NN_TIMER_STATE_STOPPING)) { @@ -131,8 +131,8 @@ > > static void nn_timer_handler (struct nn_fsm *self, int src, int > > type, case NN_FSM_START: > > > > /* Send start event to the worker thread. */ - > > nn_worker_execute (timer->worker, &timer->start_task); timer->state > > = NN_TIMER_STATE_ACTIVE; + nn_worker_execute > > (timer->worker, &timer->start_task); return; default: > > nn_fsm_bad_action (timer->state, src, type); diff --git > > src/aio/usock_posix.inc src/aio/usock_posix.inc index > > 893105f..baee3f7 100644 --- src/aio/usock_posix.inc +++ > > src/aio/usock_posix.inc @@ -130,6 +130,9 @@ void nn_usock_term > > (struct nn_usock *self) nn_fsm_event_term (&self->event_received); > > nn_fsm_event_term (&self->event_sent); nn_fsm_event_term > > (&self->event_established); + + nn_worker_maybe_cancel > > (self->worker, &self->task_recv); + nn_worker_task_term > > (&self->task_stop); nn_worker_task_term (&self->task_recv); > > nn_worker_task_term (&self->task_send); @@ -858,8 +861,8 @@ error: > > usock->state = NN_USOCK_STATE_LISTENING; return; case > > NN_USOCK_ACTION_CANCEL: - nn_worker_execute > > (usock->worker, &usock->task_stop); usock->state = > > NN_USOCK_STATE_CANCELLING; + nn_worker_execute > > (usock->worker, &usock->task_stop); return; default: > > nn_fsm_bad_action (usock->state, src, type); @@ -952,14 +955,18 @@ > > error: case NN_USOCK_STATE_CANCELLING: switch (src) { case > > NN_USOCK_SRC_TASK_STOP: - nn_assert (type == > > NN_WORKER_TASK_EXECUTE); - nn_worker_rm_fd > > (usock->worker, &usock->wfd); - usock->state = > > NN_USOCK_STATE_LISTENING; + switch (type) { + > > case NN_WORKER_TASK_EXECUTE: + nn_worker_rm_fd > > (usock->worker, &usock->wfd); + usock->state = > > NN_USOCK_STATE_LISTENING; > > > > - /* Notify the accepted socket that it was stopped. > > */ - nn_fsm_action (&usock->asock->fsm, > > NN_USOCK_ACTION_DONE); + /* Notify the accepted > > socket that it was stopped. */ + nn_fsm_action > > (&usock->asock->fsm, NN_USOCK_ACTION_DONE); > > > > - return; + return; + > > default: + nn_fsm_bad_action (usock->state, src, > > type); + } case NN_USOCK_SRC_FD: switch (type) { case > > NN_WORKER_FD_IN: diff --git src/aio/worker.h src/aio/worker.h index > > adceae3..1c05dd9 100644 --- src/aio/worker.h +++ src/aio/worker.h > > @@ -58,6 +58,7 @@ struct nn_worker; int nn_worker_init (struct > > nn_worker *self); void nn_worker_term (struct nn_worker *self); > > void nn_worker_execute (struct nn_worker *self, struct > > nn_worker_task *task); +void nn_worker_maybe_cancel (struct > > nn_worker *self, struct nn_worker_task *task); > > > > void nn_worker_add_timer (struct nn_worker *self, int timeout, > > struct nn_worker_timer *timer); diff --git src/aio/worker_posix.inc > > src/aio/worker_posix.inc index 36867d4..2caf70a 100644 --- > > src/aio/worker_posix.inc +++ src/aio/worker_posix.inc @@ -145,6 > > +145,13 @@ void nn_worker_execute (struct nn_worker *self, struct > > nn_worker_task *task) nn_mutex_unlock (&self->sync); } > > > > +void nn_worker_maybe_cancel (struct nn_worker *self, struct > > nn_worker_task *task) +{ + nn_mutex_lock (&self->sync); + > > nn_queue_maybe_remove (&self->tasks, &task->item); + > > nn_mutex_unlock (&self->sync); +} + static void nn_worker_routine > > (void *arg) { int rc; diff --git src/core/sock.c src/core/sock.c > > index 100fa43..8466bfd 100644 --- src/core/sock.c +++ > > src/core/sock.c @@ -597,7 +597,12 @@ int nn_sock_send (struct > > nn_sock *self, struct nn_msg *msg, int flags) return -EINTR; > > errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); - > > self->flags |= NN_SOCK_FLAG_OUT; + /* + * Double > > check if pipes are still available for sending + */ + > > if (!nn_efd_wait (&self->rcvfd, 0)) { + self->flags |= > > NN_SOCK_FLAG_OUT; + } > > > > /* If needed, re-compute the timeout to reflect the time that > > have already elapsed. */ @@ -670,7 +675,12 @@ int nn_sock_recv > > (struct nn_sock *self, struct nn_msg *msg, int flags) return > > -EINTR; errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); - > > self->flags |= NN_SOCK_FLAG_IN; + /* + * Double > > check if pipes are still available for receiving + */ + > > if (!nn_efd_wait (&self->rcvfd, 0)) { + self->flags |= > > NN_SOCK_FLAG_IN; + } > > > > /* If needed, re-compute the timeout to reflect the time that > > have already elapsed. */ diff --git src/utils/queue.c > > src/utils/queue.c index 080a7b7..6a7cd99 100644 --- > > src/utils/queue.c +++ src/utils/queue.c @@ -54,6 +54,39 @@ void > > nn_queue_push (struct nn_queue *self, struct nn_queue_item *item) > > self->tail = item; } > > > > +void nn_queue_maybe_remove (struct nn_queue *self, struct > > nn_queue_item *item) +{ + if (item->next == NN_QUEUE_NOTINQUEUE) > > + return; + + if (self->head == item) { + self->head > > = self->head->next; + if (!self->head) + self->tail > > = NULL; + item->next = NN_QUEUE_NOTINQUEUE; + } else { + > > struct nn_queue_item *prev = self->head; + + while (1) { + > > if (!prev) break; + + struct nn_queue_item *curr = > > prev->next; + + if (curr == item) { + > > prev->next = curr->next; + if (self->tail == curr) + > > self->tail = prev; + + item->next = > > NN_QUEUE_NOTINQUEUE; + break; + } + > > prev = curr; + } + } + + nn_assert (item->next == > > NN_QUEUE_NOTINQUEUE); +} + struct nn_queue_item *nn_queue_pop > > (struct nn_queue *self) { struct nn_queue_item *result; diff --git > > tests/ipc_stress.c tests/ipc_stress.c new file mode 100644 index > > 0000000..4433564 --- /dev/null +++ tests/ipc_stress.c @@ -0,0 +1,99 > > @@ +/* + Copyright (c) 2012 250bpm s.r.o. All rights reserved. > > + + Permission is hereby granted, free of charge, to any person > > obtaining a copy + of this software and associated documentation > > files (the "Software"), + to deal in the Software without > > restriction, including without limitation + the rights to use, > > copy, modify, merge, publish, distribute, sublicense, + and/or > > sell copies of the Software, and to permit persons to whom + the > > Software is furnished to do so, subject to the following > > conditions: + + The above copyright notice and this permission > > notice shall be included + in all copies or substantial portions > > of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT > > WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT > > LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A > > PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE > > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR > > OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR > > OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE > > SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + > > +#include "../src/nn.h" +#include "../src/pair.h" +#include > > "../src/pubsub.h" +#include "../src/pipeline.h" +#include > > "../src/ipc.h" + +#include "testutil.h" +#include > > "../src/utils/thread.c" +#include "../src/utils/atomic.h" + +/* > > Stress test the IPC transport. */ + +#define THREAD_COUNT 100 > > +#define TEST_LOOPS 10000 +#define SOCKET_ADDRESS > > "ipc://test-stress.ipc" + +struct nn_atomic active; + +static void > > server(NN_UNUSED void *arg) +{ + int sock = nn_socket(AF_SP, > > NN_PULL); + nn_assert(sock >= 0); + nn_assert(nn_bind(sock, > > SOCKET_ADDRESS) >= 0); + while (1) + { + char *buf = > > NULL; + if (!active.n) break; + const int bytes = > > nn_recv(sock, &buf, NN_MSG, 0); + nn_assert(bytes >= 0); + > > nn_freemsg(buf); + } + nn_close(sock); +} + +static void > > client(NN_UNUSED void *arg) +{ + char msg[] = "0"; + int > > sz_msg = strlen (msg) + 1; // '\0' too + int i; + + for (i = > > 0; i < TEST_LOOPS; i++) { + int cli_sock = nn_socket(AF_SP, > > NN_PUSH); + nn_assert(cli_sock >= 0); + > > nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); + > > const int bytes = nn_send(cli_sock, msg, sz_msg, 0); + > > nn_assert(bytes == sz_msg); + nn_close(cli_sock); + } + > > nn_atomic_dec(&active, 1); +} + +int main() +{ + int sb; + > > int i; + struct nn_thread srv_thread; + struct nn_thread > > cli_threads[THREAD_COUNT]; + nn_atomic_init (&active, > > THREAD_COUNT); + /* Stress the shutdown algorithm. */ + > > nn_thread_init(&srv_thread, server, NULL); + + for (i = 0; i != > > THREAD_COUNT; ++i) + nn_thread_init(&cli_threads[i], client, > > NULL); + for (i = 0; i != THREAD_COUNT; ++i) + > > nn_thread_term(&cli_threads[i]); + + active.n = 0; + int > > cli_sock = nn_socket(AF_SP, NN_PUSH); + nn_assert(cli_sock >= > > 0); + nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); + > > const int bytes = nn_send(cli_sock, &i, sizeof(i), 0); + > > nn_assert(bytes == sizeof(i)); + nn_close(cli_sock); + > > nn_thread_term(&srv_thread); + return 0; +} + > > > > -----BEGIN PGP SIGNATURE----- > Version: GnuPG v1.4.11 (GNU/Linux) > Comment: Using GnuPG with Thunderbird - http://www.enigmail.net/ > > iQEcBAEBAgAGBQJTvPT3AAoJENTpVjxCNN9YvHsH/Rs9L9a+zwVIv05Jco8e1wId > lPS07X0ubbkjpsbYaGR2EPjPi6VC8aoUcFv7vGQbA3KBvpct++aRP0kEQylUdKKD > Diw9sjd13G645xx70hiZxn0P+cQqODRouj5ahtze+/YKIOO6MqsGX60XlNESrTZM > ikWjk9x3oFgsRGW00vcsxjMZy5AEeZmq2m/X4kT3YvYXBDAFD55LNqyezH3z38H3 > ktVUGWBFtkDo6nPu1GwsDsV79GMjaRFX9ET7YySPhudc9e6fsGxcJbDCguxLBlUF > WUTpmTwZ4zkFL5PC+LtAH5VkupH2UyI9RFgdi9foc9QZad7y2TNafQG7wFeyJWY= > =vTD4 > -----END PGP SIGNATURE----- > >