[nanomsg] Re: Combined patch...

  • From: ark degtiarov <adegtiarov@xxxxxxxxx>
  • To: nanomsg@xxxxxxxxxxxxx
  • Date: Wed, 9 Jul 2014 08:17:30 -0400

Hi Martin,

Yes, I'm.

Best,

- A
On Jul 9, 2014 3:54 AM, "Martin Sustrik" <sustrik@xxxxxxxxxx> wrote:

> -----BEGIN PGP SIGNED MESSAGE-----
> Hash: SHA1
>
> Hi Ark,
>
> We'll have to split the patch into it's constituent patches before
> applying it to the mainline.
>
> In the meantime, can you state that you are submitting the patch under
> MIT/X11 license?
>
> Thanks!
> Martin
>
> On 09/07/14 09:50, Ark Degtiarov wrote:
> > Hi, this is combined patch which attempts to address a few issues I
> > have encountered while evaluating nanomsg: 1. 'make diagrams': a.
> > Handle ValueError exception in Visitor::visit() b. Slightly
> > adjusted  src/aio/usock_posix.inc to eliminate warnings 2. Race
> > conditions while stressing IPC usock life cycle a. race conditions
> > due to *->state modifications after nn_worker_execute() b. race
> > conditions in nn_sock_send and nn_sock_recv c. occasional unclean
> > usock termination due to un-served task_recv d. added
> > tests/ipc_stress.c to validate fixes for 2.a-2.c.
> >
> > All testing was done on FC20-latest machines only.
> >
> > On tests/ipc_stress.c: This test runs longer than others. Perhaps
> > longer than necessary and could be trimmed down if desired.
> >
> > Thanks for very solid piece of architecture and design!
> >
> > --A
> >
> >
> > diff --git .gitignore .gitignore index e1575c1..cc4c2ef 100644 ---
> > .gitignore +++ .gitignore @@ -64,6 +64,7 @@ tests/inproc_shutdown
> > tests/iovec tests/ipc tests/ipc_shutdown +tests/ipc_stress
> > tests/list tests/msg tests/pair diff --git Makefile.am Makefile.am
> > index e140e09..79f2be1 100644 --- Makefile.am +++ Makefile.am @@
> > -436,6 +436,7 @@ TRANSPORT_TESTS = \ tests/inproc_shutdown \
> > tests/ipc \ tests/ipc_shutdown \ +    tests/ipc_stress \ tests/tcp
> > \ tests/tcp_shutdown
> >
> > @@ -489,7 +490,7 @@ nanocat_SOURCES = \
> >
> > if NANOCAT bin_PROGRAMS += nanocat -endif NANOCAT +endif #NANOCAT
> >
> >
> ################################################################################
> >
> >
> #
> > RFCs
> >  # @@ -542,8 +543,25 @@ install-exec-hook: $(LN_S) -f nanocat
> > nn_bus$(EXEEXT) && \ $(LN_S) -f nanocat nn_pair$(EXEEXT)
> >
> > -endif SYMLINKS -endif NANOCAT +uninstall-hook: + cd
> > $(DESTDIR)$(bindir) && \ +          rm -f nn_push$(EXEEXT) ; \ +
> > rm -f nn_pull$(EXEEXT) ; \ +          rm -f nn_pub$(EXEEXT) ; \ +
> > rm -f nn_sub$(EXEEXT) ; \ +          rm -f nn_req$(EXEEXT) ; \ +
> > rm -f nn_rep$(EXEEXT) ; \ +          rm -f nn_surveyor$(EXEEXT) ;
> > \ +          rm -f nn_respondent$(EXEEXT) ; \ +          rm -f
> > nn_bus$(EXEEXT) ; \ +          rm -f nn_pair$(EXEEXT) + cd
> > $(DESTDIR)$(libdir) && \ +          rm -f libnanomsg.* + cd
> > $(DESTDIR)$(includedir) && \ +          rm -rf nanomsg + +endif
> > #SYMLINKS +endif #NANOCAT
> >
> >
> > EXTRA_DIST += \ diff --git doc/diag.py doc/diag.py index
> > e004431..3b01ba5 100755 --- doc/diag.py +++ doc/diag.py @@ -144,7
> > +144,10 @@ class Visitor(object): self.visit(cursor)
> >
> > def visit(self, cursor): -        name = cursor.kind.name +
> > try: +            name = cursor.kind.name +        except
> > ValueError: +            name = 'VERY_BAD_NAME' meth =
> > getattr(self, 'enter_' + name, None) if meth is not None: res =
> > meth(cursor) diff --git src/aio/timer.c src/aio/timer.c index
> > 9a45880..0416c9c 100644 --- src/aio/timer.c +++ src/aio/timer.c @@
> > -95,8 +95,8 @@ static void nn_timer_shutdown (struct nn_fsm *self,
> > int src, int type, timer = nn_cont (self, struct nn_timer, fsm);
> >
> > if (nn_slow (src == NN_FSM_ACTION && type == NN_FSM_STOP)) { -
> > nn_worker_execute (timer->worker, &timer->stop_task); timer->state
> > = NN_TIMER_STATE_STOPPING; +        nn_worker_execute
> > (timer->worker, &timer->stop_task); return; } if (nn_slow
> > (timer->state == NN_TIMER_STATE_STOPPING)) { @@ -131,8 +131,8 @@
> > static void nn_timer_handler (struct nn_fsm *self, int src, int
> > type, case NN_FSM_START:
> >
> > /*  Send start event to the worker thread. */ -
> > nn_worker_execute (timer->worker, &timer->start_task); timer->state
> > = NN_TIMER_STATE_ACTIVE; +                nn_worker_execute
> > (timer->worker, &timer->start_task); return; default:
> > nn_fsm_bad_action (timer->state, src, type); diff --git
> > src/aio/usock_posix.inc src/aio/usock_posix.inc index
> > 893105f..baee3f7 100644 --- src/aio/usock_posix.inc +++
> > src/aio/usock_posix.inc @@ -130,6 +130,9 @@ void nn_usock_term
> > (struct nn_usock *self) nn_fsm_event_term (&self->event_received);
> > nn_fsm_event_term (&self->event_sent); nn_fsm_event_term
> > (&self->event_established); + +    nn_worker_maybe_cancel
> > (self->worker, &self->task_recv); + nn_worker_task_term
> > (&self->task_stop); nn_worker_task_term (&self->task_recv);
> > nn_worker_task_term (&self->task_send); @@ -858,8 +861,8 @@ error:
> > usock->state = NN_USOCK_STATE_LISTENING; return; case
> > NN_USOCK_ACTION_CANCEL: -                nn_worker_execute
> > (usock->worker, &usock->task_stop); usock->state =
> > NN_USOCK_STATE_CANCELLING; +                nn_worker_execute
> > (usock->worker, &usock->task_stop); return; default:
> > nn_fsm_bad_action (usock->state, src, type); @@ -952,14 +955,18 @@
> > error: case NN_USOCK_STATE_CANCELLING: switch (src) { case
> > NN_USOCK_SRC_TASK_STOP: -            nn_assert (type ==
> > NN_WORKER_TASK_EXECUTE); -            nn_worker_rm_fd
> > (usock->worker, &usock->wfd); -            usock->state =
> > NN_USOCK_STATE_LISTENING; +            switch (type) { +
> > case NN_WORKER_TASK_EXECUTE: +                nn_worker_rm_fd
> > (usock->worker, &usock->wfd); +                usock->state =
> > NN_USOCK_STATE_LISTENING;
> >
> > -            /*  Notify the accepted socket that it was stopped.
> > */ -            nn_fsm_action (&usock->asock->fsm,
> > NN_USOCK_ACTION_DONE); +                /*  Notify the accepted
> > socket that it was stopped. */ +                nn_fsm_action
> > (&usock->asock->fsm, NN_USOCK_ACTION_DONE);
> >
> > -            return; +                return; +
> > default: +                nn_fsm_bad_action (usock->state, src,
> > type); +            } case NN_USOCK_SRC_FD: switch (type) { case
> > NN_WORKER_FD_IN: diff --git src/aio/worker.h src/aio/worker.h index
> > adceae3..1c05dd9 100644 --- src/aio/worker.h +++ src/aio/worker.h
> > @@ -58,6 +58,7 @@ struct nn_worker; int nn_worker_init (struct
> > nn_worker *self); void nn_worker_term (struct nn_worker *self);
> > void nn_worker_execute (struct nn_worker *self, struct
> > nn_worker_task *task); +void nn_worker_maybe_cancel (struct
> > nn_worker *self, struct nn_worker_task *task);
> >
> > void nn_worker_add_timer (struct nn_worker *self, int timeout,
> > struct nn_worker_timer *timer); diff --git src/aio/worker_posix.inc
> > src/aio/worker_posix.inc index 36867d4..2caf70a 100644 ---
> > src/aio/worker_posix.inc +++ src/aio/worker_posix.inc @@ -145,6
> > +145,13 @@ void nn_worker_execute (struct nn_worker *self, struct
> > nn_worker_task *task) nn_mutex_unlock (&self->sync); }
> >
> > +void nn_worker_maybe_cancel (struct nn_worker *self, struct
> > nn_worker_task *task) +{ +    nn_mutex_lock (&self->sync); +
> > nn_queue_maybe_remove (&self->tasks, &task->item); +
> > nn_mutex_unlock (&self->sync); +} + static void nn_worker_routine
> > (void *arg) { int rc; diff --git src/core/sock.c src/core/sock.c
> > index 100fa43..8466bfd 100644 --- src/core/sock.c +++
> > src/core/sock.c @@ -597,7 +597,12 @@ int nn_sock_send (struct
> > nn_sock *self, struct nn_msg *msg, int flags) return -EINTR;
> > errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); -
> > self->flags |= NN_SOCK_FLAG_OUT; +        /* +         *  Double
> > check if pipes are still available for sending +         */ +
> > if (!nn_efd_wait (&self->rcvfd, 0)) { +            self->flags |=
> > NN_SOCK_FLAG_OUT; +        }
> >
> > /*  If needed, re-compute the timeout to reflect the time that
> > have already elapsed. */ @@ -670,7 +675,12 @@ int nn_sock_recv
> > (struct nn_sock *self, struct nn_msg *msg, int flags) return
> > -EINTR; errnum_assert (rc == 0, rc); nn_ctx_enter (&self->ctx); -
> > self->flags |= NN_SOCK_FLAG_IN; +        /* +         *  Double
> > check if pipes are still available for receiving +         */ +
> > if (!nn_efd_wait (&self->rcvfd, 0)) { +            self->flags |=
> > NN_SOCK_FLAG_IN; +        }
> >
> > /*  If needed, re-compute the timeout to reflect the time that
> > have already elapsed. */ diff --git src/utils/queue.c
> > src/utils/queue.c index 080a7b7..6a7cd99 100644 ---
> > src/utils/queue.c +++ src/utils/queue.c @@ -54,6 +54,39 @@ void
> > nn_queue_push (struct nn_queue *self, struct nn_queue_item *item)
> > self->tail = item; }
> >
> > +void nn_queue_maybe_remove (struct nn_queue *self, struct
> > nn_queue_item *item) +{ +   if (item->next == NN_QUEUE_NOTINQUEUE)
> > +       return; + +   if (self->head == item) { +       self->head
> > = self->head->next; +       if (!self->head) +           self->tail
> > = NULL; +       item->next = NN_QUEUE_NOTINQUEUE; +   } else { +
> > struct nn_queue_item *prev = self->head; + +       while (1) { +
> > if (!prev) break; + +           struct nn_queue_item *curr =
> > prev->next; + +           if (curr == item) { +
> > prev->next = curr->next; +               if (self->tail == curr) +
> > self->tail = prev; + +               item->next =
> > NN_QUEUE_NOTINQUEUE; +               break; +           } +
> > prev = curr; +       } +   } + +   nn_assert (item->next ==
> > NN_QUEUE_NOTINQUEUE); +} + struct nn_queue_item *nn_queue_pop
> > (struct nn_queue *self) { struct nn_queue_item *result; diff --git
> > tests/ipc_stress.c tests/ipc_stress.c new file mode 100644 index
> > 0000000..4433564 --- /dev/null +++ tests/ipc_stress.c @@ -0,0 +1,99
> > @@ +/* +    Copyright (c) 2012 250bpm s.r.o.  All rights reserved.
> > + +    Permission is hereby granted, free of charge, to any person
> > obtaining a copy +    of this software and associated documentation
> > files (the "Software"), +    to deal in the Software without
> > restriction, including without limitation +    the rights to use,
> > copy, modify, merge, publish, distribute, sublicense, +    and/or
> > sell copies of the Software, and to permit persons to whom +    the
> > Software is furnished to do so, subject to the following
> > conditions: + +    The above copyright notice and this permission
> > notice shall be included +    in all copies or substantial portions
> > of the Software. + +    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT
> > WARRANTY OF ANY KIND, EXPRESS OR +    IMPLIED, INCLUDING BUT NOT
> > LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +    FITNESS FOR A
> > PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL +    THE
> > AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
> > OTHER +    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
> > OTHERWISE, ARISING +    FROM, OUT OF OR IN CONNECTION WITH THE
> > SOFTWARE OR THE USE OR OTHER DEALINGS +    IN THE SOFTWARE. +*/ +
> > +#include "../src/nn.h" +#include "../src/pair.h" +#include
> > "../src/pubsub.h" +#include "../src/pipeline.h" +#include
> > "../src/ipc.h" + +#include "testutil.h" +#include
> > "../src/utils/thread.c" +#include "../src/utils/atomic.h" + +/*
> > Stress test the IPC transport. */ + +#define THREAD_COUNT 100
> > +#define TEST_LOOPS 10000 +#define SOCKET_ADDRESS
> > "ipc://test-stress.ipc" + +struct nn_atomic active; + +static void
> > server(NN_UNUSED void *arg) +{ +    int sock = nn_socket(AF_SP,
> > NN_PULL); +    nn_assert(sock >= 0); +    nn_assert(nn_bind(sock,
> > SOCKET_ADDRESS) >= 0); +    while (1) +    { +        char *buf =
> > NULL; +        if (!active.n) break; +        const int bytes =
> > nn_recv(sock, &buf, NN_MSG, 0); +        nn_assert(bytes >= 0); +
> > nn_freemsg(buf); +    } +    nn_close(sock); +} + +static void
> > client(NN_UNUSED void *arg) +{ +    char msg[] = "0"; +    int
> > sz_msg = strlen (msg) + 1; // '\0' too +    int i; + +    for (i =
> > 0; i < TEST_LOOPS; i++) { +        int cli_sock = nn_socket(AF_SP,
> > NN_PUSH); +        nn_assert(cli_sock >= 0); +
> > nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); +
> > const int bytes = nn_send(cli_sock, msg, sz_msg, 0); +
> > nn_assert(bytes == sz_msg); +        nn_close(cli_sock); +    } +
> > nn_atomic_dec(&active, 1); +} + +int main() +{ +    int sb; +
> > int i; +    struct nn_thread srv_thread; +    struct nn_thread
> > cli_threads[THREAD_COUNT]; +    nn_atomic_init (&active,
> > THREAD_COUNT); +    /*  Stress the shutdown algorithm. */ +
> > nn_thread_init(&srv_thread, server, NULL); + +    for (i = 0; i !=
> > THREAD_COUNT; ++i) +        nn_thread_init(&cli_threads[i], client,
> > NULL); +    for (i = 0; i != THREAD_COUNT; ++i) +
> > nn_thread_term(&cli_threads[i]); + +    active.n = 0; +    int
> > cli_sock = nn_socket(AF_SP, NN_PUSH); +    nn_assert(cli_sock >=
> > 0); +    nn_assert(nn_connect(cli_sock, SOCKET_ADDRESS) >= 0); +
> > const int bytes = nn_send(cli_sock, &i, sizeof(i), 0); +
> > nn_assert(bytes == sizeof(i)); +    nn_close(cli_sock); +
> > nn_thread_term(&srv_thread); +    return 0; +} +
> >
>
> -----BEGIN PGP SIGNATURE-----
> Version: GnuPG v1.4.11 (GNU/Linux)
> Comment: Using GnuPG with Thunderbird - http://www.enigmail.net/
>
> iQEcBAEBAgAGBQJTvPT3AAoJENTpVjxCNN9YvHsH/Rs9L9a+zwVIv05Jco8e1wId
> lPS07X0ubbkjpsbYaGR2EPjPi6VC8aoUcFv7vGQbA3KBvpct++aRP0kEQylUdKKD
> Diw9sjd13G645xx70hiZxn0P+cQqODRouj5ahtze+/YKIOO6MqsGX60XlNESrTZM
> ikWjk9x3oFgsRGW00vcsxjMZy5AEeZmq2m/X4kT3YvYXBDAFD55LNqyezH3z38H3
> ktVUGWBFtkDo6nPu1GwsDsV79GMjaRFX9ET7YySPhudc9e6fsGxcJbDCguxLBlUF
> WUTpmTwZ4zkFL5PC+LtAH5VkupH2UyI9RFgdi9foc9QZad7y2TNafQG7wFeyJWY=
> =vTD4
> -----END PGP SIGNATURE-----
>
>

Other related posts: