[nanomsg] Combined patch...

  • From: Ark Degtiarov <a@xxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Wed, 09 Jul 2014 03:50:22 -0400

Hi,
this is combined patch which attempts to address a few issues I have
encountered while evaluating nanomsg:
  1. 'make diagrams':
        a. Handle ValueError exception in Visitor::visit()
        b. Slightly adjusted  src/aio/usock_posix.inc to eliminate
warnings
  2. Race conditions while stressing IPC usock life cycle
        a. race conditions due to *->state modifications after
nn_worker_execute()
        b. race conditions in nn_sock_send and nn_sock_recv
        c. occasional unclean usock termination due to un-served
task_recv
        d. added tests/ipc_stress.c to validate fixes for 2.a-2.c.

All testing was done on FC20-latest machines only.

On tests/ipc_stress.c: This test runs longer than others. Perhaps longer
than necessary and could be trimmed down if desired.

Thanks for very solid piece of architecture and design!

--A


diff --git .gitignore .gitignore
index e1575c1..cc4c2ef 100644
--- .gitignore
+++ .gitignore
@@ -64,6 +64,7 @@ tests/inproc_shutdown
 tests/iovec
 tests/ipc
 tests/ipc_shutdown
+tests/ipc_stress
 tests/list
 tests/msg
 tests/pair
diff --git Makefile.am Makefile.am
index e140e09..79f2be1 100644
--- Makefile.am
+++ Makefile.am
@@ -436,6 +436,7 @@ TRANSPORT_TESTS = \
     tests/inproc_shutdown \
     tests/ipc \
     tests/ipc_shutdown \
+    tests/ipc_stress \
     tests/tcp \
     tests/tcp_shutdown
 
@@ -489,7 +490,7 @@ nanocat_SOURCES = \
 
 if NANOCAT
 bin_PROGRAMS += nanocat
-endif NANOCAT
+endif #NANOCAT
 

################################################################################
 #  RFCs
#
@@ -542,8 +543,25 @@ install-exec-hook:
           $(LN_S) -f nanocat nn_bus$(EXEEXT) && \
           $(LN_S) -f nanocat nn_pair$(EXEEXT)
 
-endif SYMLINKS
-endif NANOCAT
+uninstall-hook:
+       cd $(DESTDIR)$(bindir) && \
+          rm -f nn_push$(EXEEXT) ; \
+          rm -f nn_pull$(EXEEXT) ; \
+          rm -f nn_pub$(EXEEXT) ; \
+          rm -f nn_sub$(EXEEXT) ; \
+          rm -f nn_req$(EXEEXT) ; \
+          rm -f nn_rep$(EXEEXT) ; \
+          rm -f nn_surveyor$(EXEEXT) ; \
+          rm -f nn_respondent$(EXEEXT) ; \
+          rm -f nn_bus$(EXEEXT) ; \
+          rm -f nn_pair$(EXEEXT)
+       cd $(DESTDIR)$(libdir) && \
+          rm -f libnanomsg.*
+       cd $(DESTDIR)$(includedir) && \
+          rm -rf nanomsg
+
+endif #SYMLINKS
+endif #NANOCAT
 
 
 EXTRA_DIST += \
diff --git doc/diag.py doc/diag.py
index e004431..3b01ba5 100755
--- doc/diag.py
+++ doc/diag.py
@@ -144,7 +144,10 @@ class Visitor(object):
         self.visit(cursor)
 
     def visit(self, cursor):
-        name = cursor.kind.name
+        try:
+            name = cursor.kind.name
+        except ValueError:
+            name = 'VERY_BAD_NAME'
         meth = getattr(self, 'enter_' + name, None)
         if meth is not None:
             res = meth(cursor)
diff --git src/aio/timer.c src/aio/timer.c
index 9a45880..0416c9c 100644
--- src/aio/timer.c
+++ src/aio/timer.c
@@ -95,8 +95,8 @@ static void nn_timer_shutdown (struct nn_fsm *self,
int src, int type,
     timer = nn_cont (self, struct nn_timer, fsm);
 
     if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) {
-        nn_worker_execute (timer->worker, &timer->stop_task);
         timer->state = NN_TIMER_STATE_STOPPING;
+        nn_worker_execute (timer->worker, &timer->stop_task);
         return;
     }
     if (nn_slow (timer->state == NN_TIMER_STATE_STOPPING)) {
@@ -131,8 +131,8 @@ static void nn_timer_handler (struct nn_fsm *self,
int src, int type,
             case NN_FSM_START:
 
                 /*  Send start event to the worker thread. */
-                nn_worker_execute (timer->worker, &timer->start_task);
                 timer->state = NN_TIMER_STATE_ACTIVE;
+                nn_worker_execute (timer->worker, &timer->start_task);
                 return;
             default:
                 nn_fsm_bad_action (timer->state, src, type);
diff --git src/aio/usock_posix.inc src/aio/usock_posix.inc
index 893105f..baee3f7 100644
--- src/aio/usock_posix.inc
+++ src/aio/usock_posix.inc
@@ -130,6 +130,9 @@ void nn_usock_term (struct nn_usock *self)
     nn_fsm_event_term (&self->event_received);
     nn_fsm_event_term (&self->event_sent);
     nn_fsm_event_term (&self->event_established);
+
+    nn_worker_maybe_cancel (self->worker, &self->task_recv);
+
     nn_worker_task_term (&self->task_stop);
     nn_worker_task_term (&self->task_recv);
     nn_worker_task_term (&self->task_send);
@@ -858,8 +861,8 @@ error:
                 usock->state = NN_USOCK_STATE_LISTENING;
                 return;
             case NN_USOCK_ACTION_CANCEL:
-                nn_worker_execute (usock->worker, &usock->task_stop);
                 usock->state = NN_USOCK_STATE_CANCELLING;
+                nn_worker_execute (usock->worker, &usock->task_stop);
                 return;
             default:
                 nn_fsm_bad_action (usock->state, src, type);
@@ -952,14 +955,18 @@ error:
     case NN_USOCK_STATE_CANCELLING:
         switch (src) {
         case NN_USOCK_SRC_TASK_STOP:
-            nn_assert (type == NN_WORKER_TASK_EXECUTE);
-            nn_worker_rm_fd (usock->worker, &usock->wfd);
-            usock->state = NN_USOCK_STATE_LISTENING;
+            switch (type) {
+            case NN_WORKER_TASK_EXECUTE:
+                nn_worker_rm_fd (usock->worker, &usock->wfd);
+                usock->state = NN_USOCK_STATE_LISTENING;
 
-            /*  Notify the accepted socket that it was stopped. */
-            nn_fsm_action (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
+                /*  Notify the accepted socket that it was stopped. */
+                nn_fsm_action (&usock->asock->fsm,
NN_USOCK_ACTION_DONE);
 
-            return;
+                return;
+            default:
+                nn_fsm_bad_action (usock->state, src, type);
+            }
         case NN_USOCK_SRC_FD:
             switch (type) {
             case NN_WORKER_FD_IN:
diff --git src/aio/worker.h src/aio/worker.h
index adceae3..1c05dd9 100644
--- src/aio/worker.h
+++ src/aio/worker.h
@@ -58,6 +58,7 @@ struct nn_worker;
 int nn_worker_init (struct nn_worker *self);
 void nn_worker_term (struct nn_worker *self);
 void nn_worker_execute (struct nn_worker *self, struct nn_worker_task
*task);
+void nn_worker_maybe_cancel (struct nn_worker *self, struct
nn_worker_task *task);
 
 void nn_worker_add_timer (struct nn_worker *self, int timeout,
     struct nn_worker_timer *timer);
diff --git src/aio/worker_posix.inc src/aio/worker_posix.inc
index 36867d4..2caf70a 100644
--- src/aio/worker_posix.inc
+++ src/aio/worker_posix.inc
@@ -145,6 +145,13 @@ void nn_worker_execute (struct nn_worker *self,
struct nn_worker_task *task)
     nn_mutex_unlock (&self->sync);
 }
 
+void nn_worker_maybe_cancel (struct nn_worker *self, struct
nn_worker_task *task)
+{
+    nn_mutex_lock (&self->sync);
+    nn_queue_maybe_remove (&self->tasks, &task->item);
+    nn_mutex_unlock (&self->sync);
+}
+
 static void nn_worker_routine (void *arg)
 {
     int rc;
diff --git src/core/sock.c src/core/sock.c
index 100fa43..8466bfd 100644
--- src/core/sock.c
+++ src/core/sock.c
@@ -597,7 +597,12 @@ int nn_sock_send (struct nn_sock *self, struct
nn_msg *msg, int flags)
             return -EINTR;
         errnum_assert (rc == 0, rc);
         nn_ctx_enter (&self->ctx);
-        self->flags |= NN_SOCK_FLAG_OUT;
+        /*
+         *  Double check if pipes are still available for sending
+         */
+        if (!nn_efd_wait (&self->rcvfd, 0)) {
+            self->flags |= NN_SOCK_FLAG_OUT;
+        }
 
         /*  If needed, re-compute the timeout to reflect the time that
have
             already elapsed. */
@@ -670,7 +675,12 @@ int nn_sock_recv (struct nn_sock *self, struct
nn_msg *msg, int flags)
             return -EINTR;
         errnum_assert (rc == 0, rc);
         nn_ctx_enter (&self->ctx);
-        self->flags |= NN_SOCK_FLAG_IN;
+        /*
+         *  Double check if pipes are still available for receiving
+         */
+        if (!nn_efd_wait (&self->rcvfd, 0)) {
+            self->flags |= NN_SOCK_FLAG_IN;
+        }
 
         /*  If needed, re-compute the timeout to reflect the time that
have
             already elapsed. */
diff --git src/utils/queue.c src/utils/queue.c
index 080a7b7..6a7cd99 100644
--- src/utils/queue.c
+++ src/utils/queue.c
@@ -54,6 +54,39 @@ void nn_queue_push (struct nn_queue *self, struct
nn_queue_item *item)
     self->tail = item;
 }
 
+void nn_queue_maybe_remove (struct nn_queue *self, struct nn_queue_item
*item)
+{
+   if (item->next == NN_QUEUE_NOTINQUEUE)
+       return;
+
+   if (self->head == item) {
+       self->head = self->head->next;
+       if (!self->head)
+           self->tail = NULL;
+       item->next = NN_QUEUE_NOTINQUEUE;
+   } else {
+       struct nn_queue_item *prev = self->head;
+
+       while (1) {
+           if (!prev) break;
+
+           struct nn_queue_item *curr = prev->next;
+
+           if (curr == item) {
+               prev->next = curr->next;
+               if (self->tail == curr)
+                   self->tail = prev;
+
+               item->next = NN_QUEUE_NOTINQUEUE;
+               break;
+           }
+           prev = curr;
+       }
+   }
+
+   nn_assert (item->next == NN_QUEUE_NOTINQUEUE);
+}
+
 struct nn_queue_item *nn_queue_pop (struct nn_queue *self)
 {
     struct nn_queue_item *result;
diff --git tests/ipc_stress.c tests/ipc_stress.c
new file mode 100644
index 0000000..4433564
--- /dev/null
+++ tests/ipc_stress.c
@@ -0,0 +1,99 @@
+/*
+    Copyright (c) 2012 250bpm s.r.o.  All rights reserved.
+
+    Permission is hereby granted, free of charge, to any person
obtaining a copy
+    of this software and associated documentation files (the
"Software"),
+    to deal in the Software without restriction, including without
limitation
+    the rights to use, copy, modify, merge, publish, distribute,
sublicense,
+    and/or sell copies of the Software, and to permit persons to whom
+    the Software is furnished to do so, subject to the following
conditions:
+
+    The above copyright notice and this permission notice shall be
included
+    in all copies or substantial portions of the Software.
+
+    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR
+    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY,
+    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL
+    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES
OR OTHER
+    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING
+    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS
+    IN THE SOFTWARE.
+*/
+
+#include "../src/nn.h"
+#include "../src/pair.h"
+#include "../src/pubsub.h"
+#include "../src/pipeline.h"
+#include "../src/ipc.h"
+
+#include "testutil.h"
+#include "../src/utils/thread.c"
+#include "../src/utils/atomic.h"
+
+/*  Stress test the IPC transport. */
+
+#define THREAD_COUNT 100
+#define TEST_LOOPS 10000
+#define SOCKET_ADDRESS "ipc://test-stress.ipc"
+
+struct nn_atomic active;
+
+static void server(NN_UNUSED void *arg)
+{
+    int sock = nn_socket(AF_SP, NN_PULL);
+    nn_assert(sock >= 0);
+    nn_assert(nn_bind(sock, SOCKET_ADDRESS) >= 0);
+    while (1)
+    {
+        char *buf = NULL;
+        if (!active.n) break;
+        const int bytes = nn_recv(sock, &buf, NN_MSG, 0);
+        nn_assert(bytes >= 0);
+        nn_freemsg(buf);
+    }
+    nn_close(sock);
+}
+
+static void client(NN_UNUSED void *arg)
+{
+    char msg[] = "0";
+    int sz_msg = strlen (msg) + 1; // '\0' too
+    int i;
+
+    for (i = 0; i < TEST_LOOPS; i++) {
+        int cli_sock = nn_socket(AF_SP, NN_PUSH);
+        nn_assert(cli_sock >= 0);
+        nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0);
+        const int bytes = nn_send(cli_sock, msg, sz_msg, 0);
+        nn_assert(bytes == sz_msg);
+        nn_close(cli_sock);
+    }
+    nn_atomic_dec(&active, 1);
+}
+
+int main()
+{
+    int sb;
+    int i;
+    struct nn_thread srv_thread;
+    struct nn_thread cli_threads[THREAD_COUNT];
+    nn_atomic_init (&active, THREAD_COUNT);
+    /*  Stress the shutdown algorithm. */
+    nn_thread_init(&srv_thread, server, NULL);
+
+    for (i = 0; i != THREAD_COUNT; ++i)
+        nn_thread_init(&cli_threads[i], client, NULL);
+    for (i = 0; i != THREAD_COUNT; ++i)
+        nn_thread_term(&cli_threads[i]);
+
+    active.n = 0;
+    int cli_sock = nn_socket(AF_SP, NN_PUSH);
+    nn_assert(cli_sock >= 0);
+    nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0);
+    const int bytes = nn_send(cli_sock, &i, sizeof(i), 0);
+    nn_assert(bytes == sizeof(i));
+    nn_close(cli_sock);
+    nn_thread_term(&srv_thread);
+    return 0;
+}
+

Other related posts: