[tarantool-patches] [PATCH] replication: implement replication_shutdown()

  • From: Serge Petrenko <sergepetrenko@xxxxxxxxxxxxx>
  • To: tarantool-patches@xxxxxxxxxxxxx
  • Date: Tue, 31 Jul 2018 21:32:42 +0300

Relay threads keep using tx upon shutdown, which leads to occasional
segmentation faults and assertion fails (e.g. in replication test
suite).

Fix this by implementing replication_shutdown and relay_halt functions.
replication_shutdown calls relay_halt to stop every relay thread that is
using tx.

Closes #3485
---
FYI, I couldn't come up with a test case for this patch. My best effort
was to run replication test suite multiple times and make sure that there
were less crashes.

https://github.com/tarantool/tarantool/issues/3485
https://github.com/tarantool/tarantool/compare/sergepetrenko/gh-3485-replication-shutdown

 src/box/box.cc         |  2 +-
 src/box/relay.cc       | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++
 src/box/relay.h        | 10 +++++++++
 src/box/replication.cc | 30 +++++++++++++++++++++++++
 src/box/replication.h  |  6 +++++
 src/main.cc            |  1 +
 6 files changed, 108 insertions(+), 1 deletion(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ae4959d6f..f212c0fa8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1597,9 +1597,9 @@ box_free(void)
         * initialized
         */
        if (is_box_configured) {
+               replication_shutdown();
 #if 0
                session_free();
-               replication_free();
                user_cache_free();
                schema_free();
                module_free();
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 4cacbc840..0a81ac5db 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -82,10 +82,22 @@ struct relay_gc_msg {
        struct vclock vclock;
 };
 
+/**
+ * Cbus message sent by tx thread to stop relay on shutdown.
+ */
+struct relay_halt_msg {
+       /** Parent. */
+       struct cmsg msg;
+       /** Relay instance. */
+       struct relay *relay;
+};
+
 /** State of a replication relay. */
 struct relay {
        /** The thread in which we relay data to the replica. */
        struct cord cord;
+       /** The main fiber in cord to be canceled upon relay halt. */
+       struct fiber *main_fiber;
        /** Replica connection */
        struct ev_io io;
        /** Request sync */
@@ -120,6 +132,11 @@ struct relay {
        struct cpipe tx_pipe;
        /** A pipe from 'tx' thread to 'relay' */
        struct cpipe relay_pipe;
+       /**
+        * A flag indicating that we executed relay_subscribe_f and
+        * have tx_pipe and relay_pipe ready.
+        */
+       bool tx_in_use;
        /** Status message */
        struct relay_status_msg status_msg;
        /**
@@ -152,6 +169,12 @@ relay_get_state(const struct relay *relay)
        return relay->state;
 }
 
+bool
+relay_uses_tx(const struct relay *relay)
+{
+       return relay->tx_in_use;
+}
+
 const struct vclock *
 relay_vclock(const struct relay *relay)
 {
@@ -198,6 +221,40 @@ relay_start(struct relay *relay, int fd, uint64_t sync,
        relay->state = RELAY_FOLLOW;
 }
 
+static void
+relay_main_fiber_halt(struct cmsg *msg)
+{
+       struct relay_halt_msg *m = (struct relay_halt_msg *)msg;
+       struct relay *relay = m->relay;
+
+       assert(relay->main_fiber != NULL);
+       fiber_cancel(relay->main_fiber);
+       relay->main_fiber = NULL;
+
+       free(m);
+}
+
+void
+relay_halt(struct relay *relay)
+{
+       assert(relay->state == RELAY_FOLLOW);
+
+       static const struct cmsg_hop route[] ={
+               {relay_main_fiber_halt, NULL}
+       };
+       struct relay_halt_msg *m = (struct relay_halt_msg *)malloc(sizeof(*m));
+       if (m == NULL) {
+               /*
+                * Out of memory during shutdown. Do nothing.
+                */
+               say_warn("failed to allocate relay halt message");
+               return;
+       }
+       cmsg_init(&m->msg, route);
+       m->relay = relay;
+       cpipe_push(&relay->relay_pipe, &m->msg);
+}
+
 static void
 relay_stop(struct relay *relay)
 {
@@ -468,6 +525,8 @@ relay_subscribe_f(va_list ap)
                             fiber_schedule_cb, fiber());
        cbus_pair("tx", cord_name(cord()), &relay->tx_pipe, &relay->relay_pipe,
                  NULL, NULL, cbus_process);
+       relay->main_fiber = fiber();
+       relay->tx_in_use =  true;
        /* Setup garbage collection trigger. */
        struct trigger on_close_log = {
                RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL
@@ -545,6 +604,7 @@ relay_subscribe_f(va_list ap)
        cbus_unpair(&relay->tx_pipe, &relay->relay_pipe,
                    NULL, NULL, cbus_process);
        cbus_endpoint_destroy(&relay->endpoint, cbus_process);
+       relay->tx_in_use = false;
        if (!diag_is_empty(&relay->diag)) {
                /* An error has occurred while reading ACKs of xlog. */
                diag_move(&relay->diag, diag_get());
diff --git a/src/box/relay.h b/src/box/relay.h
index 2988e6b0d..deaba34d4 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -61,6 +61,10 @@ enum relay_state {
 struct relay *
 relay_new(struct replica *replica);
 
+/** Stop a running relay.. Called on shutdown. */
+void
+relay_halt(struct relay *relay);
+
 /** Destroy and delete the relay */
 void
 relay_delete(struct relay *relay);
@@ -73,6 +77,12 @@ relay_get_diag(struct relay *relay);
 enum relay_state
 relay_get_state(const struct relay *relay);
 
+/**
+ * Return whether relay_subscribe_f was already started
+ * and pipes between tx and relay were created.
+ */
+bool
+relay_uses_tx(const struct relay *relay);
 /**
  * Returns relay's vclock
  * @param relay relay
diff --git a/src/box/replication.cc b/src/box/replication.cc
index 48956d2ed..9b4968777 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -398,6 +398,36 @@ replica_on_applier_state_f(struct trigger *trigger, void 
*event)
        fiber_cond_signal(&replicaset.applier.cond);
 }
 
+void
+replication_shutdown()
+{
+       struct replica *replica, *next;
+
+       replica_hash_foreach_safe(&replicaset.hash, replica, next) {
+               if (replica->id == instance_id)
+                       continue;
+               if (replica->applier != NULL) {
+                       replica_clear_applier(replica);
+                       /*
+                        * We're exiting, so control won't be passed
+                        * to appliers and we don't need to stop them.
+                        */
+               }
+               if (replica->id != REPLICA_ID_NIL) {
+                       if (relay_get_state(replica->relay) == RELAY_FOLLOW &&
+                           relay_uses_tx(replica->relay)) {
+                               replica->id = REPLICA_ID_NIL;
+                               relay_halt(replica->relay);
+                       }
+               } else {
+                       replica_hash_remove(&replicaset.hash, replica);
+                       replica_delete(replica);
+               }
+       }
+
+       replication_free();
+}
+
 /**
  * Update the replica set with new "applier" objects
  * upon reconfiguration of box.cfg.replication.
diff --git a/src/box/replication.h b/src/box/replication.h
index e8b391af2..08f9df258 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -333,6 +333,12 @@ replica_on_relay_stop(struct replica *replica);
 void
 replica_check_id(uint32_t replica_id);
 
+/*
+ * Stop replication and delete all replicas and replicaset.
+ */
+void
+replication_shutdown();
+
 /**
  * Register the universally unique identifier of a remote replica and
  * a matching replica-set-local identifier in the  _cluster registry.
diff --git a/src/main.cc b/src/main.cc
index a36a2b0d0..9cb3243b6 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -76,6 +76,7 @@
 #include "box/lua/init.h" /* box_lua_init() */
 #include "box/session.h"
 #include "systemd.h"
+#include "box/replication.h" /* replication_shutdown() */
 
 static pid_t master_pid = getpid();
 static struct pidfh *pid_file_handle;
-- 
2.15.2 (Apple Git-101.1)


Other related posts:

  • » [tarantool-patches] [PATCH] replication: implement replication_shutdown() - Serge Petrenko