Hi, this is combined patch which attempts to address a few issues I have encountered while evaluating nanomsg: 1. 'make diagrams': a. Handle ValueError exception in Visitor::visit() b. Slightly adjusted src/aio/usock_posix.inc to eliminate warnings 2. Race conditions while stressing IPC usock life cycle a. race conditions due to *->state modifications after nn_worker_execute() b. race conditions in nn_sock_send and nn_sock_recv c. occasional unclean usock termination due to un-served task_recv d. added tests/ipc_stress.c to validate fixes for 2.a-2.c. All testing was done on FC20-latest machines only. On tests/ipc_stress.c: This test runs longer than others. Perhaps longer than necessary and could be trimmed down if desired. Thanks for very solid piece of architecture and design! --A diff --git .gitignore .gitignore index e1575c1..cc4c2ef 100644 --- .gitignore +++ .gitignore @@ -64,6 +64,7 @@ tests/inproc_shutdown tests/iovec tests/ipc tests/ipc_shutdown +tests/ipc_stress tests/list tests/msg tests/pair diff --git Makefile.am Makefile.am index e140e09..79f2be1 100644 --- Makefile.am +++ Makefile.am @@ -436,6 +436,7 @@ TRANSPORT_TESTS = \ tests/inproc_shutdown \ tests/ipc \ tests/ipc_shutdown \ + tests/ipc_stress \ tests/tcp \ tests/tcp_shutdown @@ -489,7 +490,7 @@ nanocat_SOURCES = \ if NANOCAT bin_PROGRAMS += nanocat -endif NANOCAT +endif #NANOCAT ################################################################################ # RFCs # @@ -542,8 +543,25 @@ install-exec-hook: $(LN_S) -f nanocat nn_bus$(EXEEXT) && \ $(LN_S) -f nanocat nn_pair$(EXEEXT) -endif SYMLINKS -endif NANOCAT +uninstall-hook: + cd $(DESTDIR)$(bindir) && \ + rm -f nn_push$(EXEEXT) ; \ + rm -f nn_pull$(EXEEXT) ; \ + rm -f nn_pub$(EXEEXT) ; \ + rm -f nn_sub$(EXEEXT) ; \ + rm -f nn_req$(EXEEXT) ; \ + rm -f nn_rep$(EXEEXT) ; \ + rm -f nn_surveyor$(EXEEXT) ; \ + rm -f nn_respondent$(EXEEXT) ; \ + rm -f nn_bus$(EXEEXT) ; \ + rm -f nn_pair$(EXEEXT) + cd $(DESTDIR)$(libdir) && \ + rm -f libnanomsg.* + cd $(DESTDIR)$(includedir) && \ + rm -rf nanomsg + +endif #SYMLINKS +endif #NANOCAT EXTRA_DIST += \ diff --git doc/diag.py doc/diag.py index e004431..3b01ba5 100755 --- doc/diag.py +++ doc/diag.py @@ -144,7 +144,10 @@ class Visitor(object): self.visit(cursor) def visit(self, cursor): - name = cursor.kind.name + try: + name = cursor.kind.name + except ValueError: + name = 'VERY_BAD_NAME' meth = getattr(self, 'enter_' + name, None) if meth is not None: res = meth(cursor) diff --git src/aio/timer.c src/aio/timer.c index 9a45880..0416c9c 100644 --- src/aio/timer.c +++ src/aio/timer.c @@ -95,8 +95,8 @@ static void nn_timer_shutdown (struct nn_fsm *self, int src, int type, timer = nn_cont (self, struct nn_timer, fsm); if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { - nn_worker_execute (timer->worker, &timer->stop_task); timer->state = NN_TIMER_STATE_STOPPING; + nn_worker_execute (timer->worker, &timer->stop_task); return; } if (nn_slow (timer->state == NN_TIMER_STATE_STOPPING)) { @@ -131,8 +131,8 @@ static void nn_timer_handler (struct nn_fsm *self, int src, int type, case NN_FSM_START: /* Send start event to the worker thread. */ - nn_worker_execute (timer->worker, &timer->start_task); timer->state = NN_TIMER_STATE_ACTIVE; + nn_worker_execute (timer->worker, &timer->start_task); return; default: nn_fsm_bad_action (timer->state, src, type); diff --git src/aio/usock_posix.inc src/aio/usock_posix.inc index 893105f..baee3f7 100644 --- src/aio/usock_posix.inc +++ src/aio/usock_posix.inc @@ -130,6 +130,9 @@ void nn_usock_term (struct nn_usock *self) nn_fsm_event_term (&self->event_received); nn_fsm_event_term (&self->event_sent); nn_fsm_event_term (&self->event_established); + + nn_worker_maybe_cancel (self->worker, &self->task_recv); + nn_worker_task_term (&self->task_stop); nn_worker_task_term (&self->task_recv); nn_worker_task_term (&self->task_send); @@ -858,8 +861,8 @@ error: usock->state = NN_USOCK_STATE_LISTENING; return; case NN_USOCK_ACTION_CANCEL: - nn_worker_execute (usock->worker, &usock->task_stop); usock->state = NN_USOCK_STATE_CANCELLING; + nn_worker_execute (usock->worker, &usock->task_stop); return; default: nn_fsm_bad_action (usock->state, src, type); @@ -952,14 +955,18 @@ error: case NN_USOCK_STATE_CANCELLING: switch (src) { case NN_USOCK_SRC_TASK_STOP: - nn_assert (type == NN_WORKER_TASK_EXECUTE); - nn_worker_rm_fd (usock->worker, &usock->wfd); - usock->state = NN_USOCK_STATE_LISTENING; + switch (type) { + case NN_WORKER_TASK_EXECUTE: + nn_worker_rm_fd (usock->worker, &usock->wfd); + usock->state = NN_USOCK_STATE_LISTENING; - /* Notify the accepted socket that it was stopped. */ - nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE); + /* Notify the accepted socket that it was stopped. */ + nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE); - return; + return; + default: + nn_fsm_bad_action (usock->state, src, type); + } case NN_USOCK_SRC_FD: switch (type) { case NN_WORKER_FD_IN: diff --git src/aio/worker.h src/aio/worker.h index adceae3..1c05dd9 100644 --- src/aio/worker.h +++ src/aio/worker.h @@ -58,6 +58,7 @@ struct nn_worker; int nn_worker_init (struct nn_worker *self); void nn_worker_term (struct nn_worker *self); void nn_worker_execute (struct nn_worker *self, struct nn_worker_task *task); +void nn_worker_maybe_cancel (struct nn_worker *self, struct nn_worker_task *task); void nn_worker_add_timer (struct nn_worker *self, int timeout, struct nn_worker_timer *timer); diff --git src/aio/worker_posix.inc src/aio/worker_posix.inc index 36867d4..2caf70a 100644 --- src/aio/worker_posix.inc +++ src/aio/worker_posix.inc @@ -145,6 +145,13 @@ void nn_worker_execute (struct nn_worker *self, struct nn_worker_task *task) nn_mutex_unlock (&self->sync); } +void nn_worker_maybe_cancel (struct nn_worker *self, struct nn_worker_task *task) +{ + nn_mutex_lock (&self->sync); + nn_queue_maybe_remove (&self->tasks, &task->item); + nn_mutex_unlock (&self->sync); +} + static void nn_worker_routine (void *arg) { int rc; diff --git src/core/sock.c src/core/sock.c index 100fa43..8466bfd 100644 --- src/core/sock.c +++ src/core/sock.c @@ -597,7 +597,12 @@ int nn_sock_send (struct nn_sock *self, struct nn_msg *msg, int flags) return -EINTR; errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); - self->flags |= NN_SOCK_FLAG_OUT; + /* + * Double check if pipes are still available for sending + */ + if (!nn_efd_wait (&self->rcvfd, 0)) { + self->flags |= NN_SOCK_FLAG_OUT; + } /* If needed, re-compute the timeout to reflect the time that have already elapsed. */ @@ -670,7 +675,12 @@ int nn_sock_recv (struct nn_sock *self, struct nn_msg *msg, int flags) return -EINTR; errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); - self->flags |= NN_SOCK_FLAG_IN; + /* + * Double check if pipes are still available for receiving + */ + if (!nn_efd_wait (&self->rcvfd, 0)) { + self->flags |= NN_SOCK_FLAG_IN; + } /* If needed, re-compute the timeout to reflect the time that have already elapsed. */ diff --git src/utils/queue.c src/utils/queue.c index 080a7b7..6a7cd99 100644 --- src/utils/queue.c +++ src/utils/queue.c @@ -54,6 +54,39 @@ void nn_queue_push (struct nn_queue *self, struct nn_queue_item *item) self->tail = item; } +void nn_queue_maybe_remove (struct nn_queue *self, struct nn_queue_item *item) +{ + if (item->next == NN_QUEUE_NOTINQUEUE) + return; + + if (self->head == item) { + self->head = self->head->next; + if (!self->head) + self->tail = NULL; + item->next = NN_QUEUE_NOTINQUEUE; + } else { + struct nn_queue_item *prev = self->head; + + while (1) { + if (!prev) break; + + struct nn_queue_item *curr = prev->next; + + if (curr == item) { + prev->next = curr->next; + if (self->tail == curr) + self->tail = prev; + + item->next = NN_QUEUE_NOTINQUEUE; + break; + } + prev = curr; + } + } + + nn_assert (item->next == NN_QUEUE_NOTINQUEUE); +} + struct nn_queue_item *nn_queue_pop (struct nn_queue *self) { struct nn_queue_item *result; diff --git tests/ipc_stress.c tests/ipc_stress.c new file mode 100644 index 0000000..4433564 --- /dev/null +++ tests/ipc_stress.c @@ -0,0 +1,99 @@ +/* + Copyright (c) 2012 250bpm s.r.o. All rights reserved. + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), + to deal in the Software without restriction, including without limitation + the rights to use, copy, modify, merge, publish, distribute, sublicense, + and/or sell copies of the Software, and to permit persons to whom + the Software is furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included + in all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL + THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + IN THE SOFTWARE. +*/ + +#include "../src/nn.h" +#include "../src/pair.h" +#include "../src/pubsub.h" +#include "../src/pipeline.h" +#include "../src/ipc.h" + +#include "testutil.h" +#include "../src/utils/thread.c" +#include "../src/utils/atomic.h" + +/* Stress test the IPC transport. */ + +#define THREAD_COUNT 100 +#define TEST_LOOPS 10000 +#define SOCKET_ADDRESS "ipc://test-stress.ipc" + +struct nn_atomic active; + +static void server(NN_UNUSED void *arg) +{ + int sock = nn_socket(AF_SP, NN_PULL); + nn_assert(sock >= 0); + nn_assert(nn_bind(sock, SOCKET_ADDRESS) >= 0); + while (1) + { + char *buf = NULL; + if (!active.n) break; + const int bytes = nn_recv(sock, &buf, NN_MSG, 0); + nn_assert(bytes >= 0); + nn_freemsg(buf); + } + nn_close(sock); +} + +static void client(NN_UNUSED void *arg) +{ + char msg[] = "0"; + int sz_msg = strlen (msg) + 1; // '\0' too + int i; + + for (i = 0; i < TEST_LOOPS; i++) { + int cli_sock = nn_socket(AF_SP, NN_PUSH); + nn_assert(cli_sock >= 0); + nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); + const int bytes = nn_send(cli_sock, msg, sz_msg, 0); + nn_assert(bytes == sz_msg); + nn_close(cli_sock); + } + nn_atomic_dec(&active, 1); +} + +int main() +{ + int sb; + int i; + struct nn_thread srv_thread; + struct nn_thread cli_threads[THREAD_COUNT]; + nn_atomic_init (&active, THREAD_COUNT); + /* Stress the shutdown algorithm. */ + nn_thread_init(&srv_thread, server, NULL); + + for (i = 0; i != THREAD_COUNT; ++i) + nn_thread_init(&cli_threads[i], client, NULL); + for (i = 0; i != THREAD_COUNT; ++i) + nn_thread_term(&cli_threads[i]); + + active.n = 0; + int cli_sock = nn_socket(AF_SP, NN_PUSH); + nn_assert(cli_sock >= 0); + nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); + const int bytes = nn_send(cli_sock, &i, sizeof(i), 0); + nn_assert(bytes == sizeof(i)); + nn_close(cli_sock); + nn_thread_term(&srv_thread); + return 0; +} +