[nanomsg] Re: Combined patch...

  • From: Martin Sustrik <sustrik@xxxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Wed, 09 Jul 2014 09:53:27 +0200

-----BEGIN PGP SIGNED MESSAGE-----
Hash: SHA1

Hi Ark,

We'll have to split the patch into it's constituent patches before
applying it to the mainline.

In the meantime, can you state that you are submitting the patch under
MIT/X11 license?

Thanks!
Martin

On 09/07/14 09:50, Ark Degtiarov wrote:
> Hi, this is combined patch which attempts to address a few issues I
> have encountered while evaluating nanomsg: 1. 'make diagrams': a.
> Handle ValueError exception in Visitor::visit() b. Slightly
> adjusted  src/aio/usock_posix.inc to eliminate warnings 2. Race
> conditions while stressing IPC usock life cycle a. race conditions
> due to *->state modifications after nn_worker_execute() b. race
> conditions in nn_sock_send and nn_sock_recv c. occasional unclean
> usock termination due to un-served task_recv d. added
> tests/ipc_stress.c to validate fixes for 2.a-2.c.
> 
> All testing was done on FC20-latest machines only.
> 
> On tests/ipc_stress.c: This test runs longer than others. Perhaps
> longer than necessary and could be trimmed down if desired.
> 
> Thanks for very solid piece of architecture and design!
> 
> --A
> 
> 
> diff --git .gitignore .gitignore index e1575c1..cc4c2ef 100644 ---
> .gitignore +++ .gitignore @@ -64,6 +64,7 @@ tests/inproc_shutdown 
> tests/iovec tests/ipc tests/ipc_shutdown +tests/ipc_stress 
> tests/list tests/msg tests/pair diff --git Makefile.am Makefile.am 
> index e140e09..79f2be1 100644 --- Makefile.am +++ Makefile.am @@
> -436,6 +436,7 @@ TRANSPORT_TESTS = \ tests/inproc_shutdown \ 
> tests/ipc \ tests/ipc_shutdown \ +    tests/ipc_stress \ tests/tcp
> \ tests/tcp_shutdown
> 
> @@ -489,7 +490,7 @@ nanocat_SOURCES = \
> 
> if NANOCAT bin_PROGRAMS += nanocat -endif NANOCAT +endif #NANOCAT
> 
> ################################################################################
>
> 
#
> RFCs
>  # @@ -542,8 +543,25 @@ install-exec-hook: $(LN_S) -f nanocat
> nn_bus$(EXEEXT) && \ $(LN_S) -f nanocat nn_pair$(EXEEXT)
> 
> -endif SYMLINKS -endif NANOCAT +uninstall-hook: + cd
> $(DESTDIR)$(bindir) && \ +          rm -f nn_push$(EXEEXT) ; \ +
> rm -f nn_pull$(EXEEXT) ; \ +          rm -f nn_pub$(EXEEXT) ; \ +
> rm -f nn_sub$(EXEEXT) ; \ +          rm -f nn_req$(EXEEXT) ; \ +
> rm -f nn_rep$(EXEEXT) ; \ +          rm -f nn_surveyor$(EXEEXT) ;
> \ +          rm -f nn_respondent$(EXEEXT) ; \ +          rm -f
> nn_bus$(EXEEXT) ; \ +          rm -f nn_pair$(EXEEXT) + cd
> $(DESTDIR)$(libdir) && \ +          rm -f libnanomsg.* + cd
> $(DESTDIR)$(includedir) && \ +          rm -rf nanomsg + +endif
> #SYMLINKS +endif #NANOCAT
> 
> 
> EXTRA_DIST += \ diff --git doc/diag.py doc/diag.py index
> e004431..3b01ba5 100755 --- doc/diag.py +++ doc/diag.py @@ -144,7
> +144,10 @@ class Visitor(object): self.visit(cursor)
> 
> def visit(self, cursor): -        name = cursor.kind.name +
> try: +            name = cursor.kind.name +        except
> ValueError: +            name = 'VERY_BAD_NAME' meth =
> getattr(self, 'enter_' + name, None) if meth is not None: res =
> meth(cursor) diff --git src/aio/timer.c src/aio/timer.c index
> 9a45880..0416c9c 100644 --- src/aio/timer.c +++ src/aio/timer.c @@
> -95,8 +95,8 @@ static void nn_timer_shutdown (struct nn_fsm *self, 
> int src, int type, timer = nn_cont (self, struct nn_timer, fsm);
> 
> if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { -
> nn_worker_execute (timer->worker, &timer->stop_task); timer->state
> = NN_TIMER_STATE_STOPPING; +        nn_worker_execute
> (timer->worker, &timer->stop_task); return; } if (nn_slow
> (timer->state == NN_TIMER_STATE_STOPPING)) { @@ -131,8 +131,8 @@
> static void nn_timer_handler (struct nn_fsm *self, int src, int
> type, case NN_FSM_START:
> 
> /*  Send start event to the worker thread. */ -
> nn_worker_execute (timer->worker, &timer->start_task); timer->state
> = NN_TIMER_STATE_ACTIVE; +                nn_worker_execute
> (timer->worker, &timer->start_task); return; default: 
> nn_fsm_bad_action (timer->state, src, type); diff --git
> src/aio/usock_posix.inc src/aio/usock_posix.inc index
> 893105f..baee3f7 100644 --- src/aio/usock_posix.inc +++
> src/aio/usock_posix.inc @@ -130,6 +130,9 @@ void nn_usock_term
> (struct nn_usock *self) nn_fsm_event_term (&self->event_received); 
> nn_fsm_event_term (&self->event_sent); nn_fsm_event_term
> (&self->event_established); + +    nn_worker_maybe_cancel
> (self->worker, &self->task_recv); + nn_worker_task_term
> (&self->task_stop); nn_worker_task_term (&self->task_recv); 
> nn_worker_task_term (&self->task_send); @@ -858,8 +861,8 @@ error: 
> usock->state = NN_USOCK_STATE_LISTENING; return; case
> NN_USOCK_ACTION_CANCEL: -                nn_worker_execute
> (usock->worker, &usock->task_stop); usock->state =
> NN_USOCK_STATE_CANCELLING; +                nn_worker_execute
> (usock->worker, &usock->task_stop); return; default: 
> nn_fsm_bad_action (usock->state, src, type); @@ -952,14 +955,18 @@
> error: case NN_USOCK_STATE_CANCELLING: switch (src) { case
> NN_USOCK_SRC_TASK_STOP: -            nn_assert (type ==
> NN_WORKER_TASK_EXECUTE); -            nn_worker_rm_fd
> (usock->worker, &usock->wfd); -            usock->state =
> NN_USOCK_STATE_LISTENING; +            switch (type) { +
> case NN_WORKER_TASK_EXECUTE: +                nn_worker_rm_fd
> (usock->worker, &usock->wfd); +                usock->state =
> NN_USOCK_STATE_LISTENING;
> 
> -            /*  Notify the accepted socket that it was stopped.
> */ -            nn_fsm_action (&usock->asock->fsm,
> NN_USOCK_ACTION_DONE); +                /*  Notify the accepted
> socket that it was stopped. */ +                nn_fsm_action
> (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
> 
> -            return; +                return; +
> default: +                nn_fsm_bad_action (usock->state, src,
> type); +            } case NN_USOCK_SRC_FD: switch (type) { case
> NN_WORKER_FD_IN: diff --git src/aio/worker.h src/aio/worker.h index
> adceae3..1c05dd9 100644 --- src/aio/worker.h +++ src/aio/worker.h 
> @@ -58,6 +58,7 @@ struct nn_worker; int nn_worker_init (struct
> nn_worker *self); void nn_worker_term (struct nn_worker *self); 
> void nn_worker_execute (struct nn_worker *self, struct
> nn_worker_task *task); +void nn_worker_maybe_cancel (struct
> nn_worker *self, struct nn_worker_task *task);
> 
> void nn_worker_add_timer (struct nn_worker *self, int timeout, 
> struct nn_worker_timer *timer); diff --git src/aio/worker_posix.inc
> src/aio/worker_posix.inc index 36867d4..2caf70a 100644 ---
> src/aio/worker_posix.inc +++ src/aio/worker_posix.inc @@ -145,6
> +145,13 @@ void nn_worker_execute (struct nn_worker *self, struct
> nn_worker_task *task) nn_mutex_unlock (&self->sync); }
> 
> +void nn_worker_maybe_cancel (struct nn_worker *self, struct 
> nn_worker_task *task) +{ +    nn_mutex_lock (&self->sync); +
> nn_queue_maybe_remove (&self->tasks, &task->item); +
> nn_mutex_unlock (&self->sync); +} + static void nn_worker_routine
> (void *arg) { int rc; diff --git src/core/sock.c src/core/sock.c 
> index 100fa43..8466bfd 100644 --- src/core/sock.c +++
> src/core/sock.c @@ -597,7 +597,12 @@ int nn_sock_send (struct
> nn_sock *self, struct nn_msg *msg, int flags) return -EINTR; 
> errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); -
> self->flags |= NN_SOCK_FLAG_OUT; +        /* +         *  Double
> check if pipes are still available for sending +         */ +
> if (!nn_efd_wait (&self->rcvfd, 0)) { +            self->flags |=
> NN_SOCK_FLAG_OUT; +        }
> 
> /*  If needed, re-compute the timeout to reflect the time that
> have already elapsed. */ @@ -670,7 +675,12 @@ int nn_sock_recv
> (struct nn_sock *self, struct nn_msg *msg, int flags) return
> -EINTR; errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); -
> self->flags |= NN_SOCK_FLAG_IN; +        /* +         *  Double
> check if pipes are still available for receiving +         */ +
> if (!nn_efd_wait (&self->rcvfd, 0)) { +            self->flags |=
> NN_SOCK_FLAG_IN; +        }
> 
> /*  If needed, re-compute the timeout to reflect the time that
> have already elapsed. */ diff --git src/utils/queue.c
> src/utils/queue.c index 080a7b7..6a7cd99 100644 ---
> src/utils/queue.c +++ src/utils/queue.c @@ -54,6 +54,39 @@ void
> nn_queue_push (struct nn_queue *self, struct nn_queue_item *item) 
> self->tail = item; }
> 
> +void nn_queue_maybe_remove (struct nn_queue *self, struct
> nn_queue_item *item) +{ +   if (item->next == NN_QUEUE_NOTINQUEUE) 
> +       return; + +   if (self->head == item) { +       self->head
> = self->head->next; +       if (!self->head) +           self->tail
> = NULL; +       item->next = NN_QUEUE_NOTINQUEUE; +   } else { +
> struct nn_queue_item *prev = self->head; + +       while (1) { +
> if (!prev) break; + +           struct nn_queue_item *curr =
> prev->next; + +           if (curr == item) { +
> prev->next = curr->next; +               if (self->tail == curr) +
> self->tail = prev; + +               item->next =
> NN_QUEUE_NOTINQUEUE; +               break; +           } +
> prev = curr; +       } +   } + +   nn_assert (item->next ==
> NN_QUEUE_NOTINQUEUE); +} + struct nn_queue_item *nn_queue_pop
> (struct nn_queue *self) { struct nn_queue_item *result; diff --git
> tests/ipc_stress.c tests/ipc_stress.c new file mode 100644 index
> 0000000..4433564 --- /dev/null +++ tests/ipc_stress.c @@ -0,0 +1,99
> @@ +/* +    Copyright (c) 2012 250bpm s.r.o.  All rights reserved. 
> + +    Permission is hereby granted, free of charge, to any person 
> obtaining a copy +    of this software and associated documentation
> files (the "Software"), +    to deal in the Software without
> restriction, including without limitation +    the rights to use,
> copy, modify, merge, publish, distribute, sublicense, +    and/or
> sell copies of the Software, and to permit persons to whom +    the
> Software is furnished to do so, subject to the following 
> conditions: + +    The above copyright notice and this permission
> notice shall be included +    in all copies or substantial portions
> of the Software. + +    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT
> WARRANTY OF ANY KIND, EXPRESS OR +    IMPLIED, INCLUDING BUT NOT
> LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +    FITNESS FOR A
> PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +    THE
> AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> OTHER +    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
> OTHERWISE, ARISING +    FROM, OUT OF OR IN CONNECTION WITH THE
> SOFTWARE OR THE USE OR OTHER DEALINGS +    IN THE SOFTWARE. +*/ + 
> +#include "../src/nn.h" +#include "../src/pair.h" +#include
> "../src/pubsub.h" +#include "../src/pipeline.h" +#include
> "../src/ipc.h" + +#include "testutil.h" +#include
> "../src/utils/thread.c" +#include "../src/utils/atomic.h" + +/*
> Stress test the IPC transport. */ + +#define THREAD_COUNT 100 
> +#define TEST_LOOPS 10000 +#define SOCKET_ADDRESS
> "ipc://test-stress.ipc" + +struct nn_atomic active; + +static void
> server(NN_UNUSED void *arg) +{ +    int sock = nn_socket(AF_SP,
> NN_PULL); +    nn_assert(sock >= 0); +    nn_assert(nn_bind(sock,
> SOCKET_ADDRESS) >= 0); +    while (1) +    { +        char *buf =
> NULL; +        if (!active.n) break; +        const int bytes =
> nn_recv(sock, &buf, NN_MSG, 0); +        nn_assert(bytes >= 0); +
> nn_freemsg(buf); +    } +    nn_close(sock); +} + +static void
> client(NN_UNUSED void *arg) +{ +    char msg[] = "0"; +    int
> sz_msg = strlen (msg) + 1; // '\0' too +    int i; + +    for (i =
> 0; i < TEST_LOOPS; i++) { +        int cli_sock = nn_socket(AF_SP,
> NN_PUSH); +        nn_assert(cli_sock >= 0); +
> nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); +
> const int bytes = nn_send(cli_sock, msg, sz_msg, 0); +
> nn_assert(bytes == sz_msg); +        nn_close(cli_sock); +    } +
> nn_atomic_dec(&active, 1); +} + +int main() +{ +    int sb; +
> int i; +    struct nn_thread srv_thread; +    struct nn_thread
> cli_threads[THREAD_COUNT]; +    nn_atomic_init (&active,
> THREAD_COUNT); +    /*  Stress the shutdown algorithm. */ +
> nn_thread_init(&srv_thread, server, NULL); + +    for (i = 0; i !=
> THREAD_COUNT; ++i) +        nn_thread_init(&cli_threads[i], client,
> NULL); +    for (i = 0; i != THREAD_COUNT; ++i) +
> nn_thread_term(&cli_threads[i]); + +    active.n = 0; +    int
> cli_sock = nn_socket(AF_SP, NN_PUSH); +    nn_assert(cli_sock >=
> 0); +    nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); +
> const int bytes = nn_send(cli_sock, &i, sizeof(i), 0); +
> nn_assert(bytes == sizeof(i)); +    nn_close(cli_sock); +
> nn_thread_term(&srv_thread); +    return 0; +} +
> 

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (GNU/Linux)
Comment: Using GnuPG with Thunderbird - http://www.enigmail.net/

iQEcBAEBAgAGBQJTvPT3AAoJENTpVjxCNN9YvHsH/Rs9L9a+zwVIv05Jco8e1wId
lPS07X0ubbkjpsbYaGR2EPjPi6VC8aoUcFv7vGQbA3KBvpct++aRP0kEQylUdKKD
Diw9sjd13G645xx70hiZxn0P+cQqODRouj5ahtze+/YKIOO6MqsGX60XlNESrTZM
ikWjk9x3oFgsRGW00vcsxjMZy5AEeZmq2m/X4kT3YvYXBDAFD55LNqyezH3z38H3
ktVUGWBFtkDo6nPu1GwsDsV79GMjaRFX9ET7YySPhudc9e6fsGxcJbDCguxLBlUF
WUTpmTwZ4zkFL5PC+LtAH5VkupH2UyI9RFgdi9foc9QZad7y2TNafQG7wFeyJWY=
=vTD4
-----END PGP SIGNATURE-----

Other related posts: