[tarantool-patches] Re: [tarantool-patches] [box.ctl 2/3] box.ctl: Add on_ctl_event trigger calls

  • From: Konstantin Belyavskiy <k.belyavskiy@xxxxxxxxxxxxx>
  • To: georgy <georgy@xxxxxxxxxxxxx>, Илья Марков <markovilya197@xxxxxxxxx>
  • Date: Tue, 31 Jul 2018 14:40:38 +0300

Hi,
overall patch looks good with minor conplains (summary)
- unused headers
- is it worth it to create a separate fiber to check if read-only condition was 
changed?
- test SHUTDOWN should be implemented



Четверг, 14 июня 2018, 18:04 +03:00 от Ilya Markov <imarkov@xxxxxxxxxxxxx>:

Add following cases of triggers:
* System space recovery. Called on finish of bootstrap or finish of join or
snapshot recovery.
* Local recovery. Called on finish of bootstrap or finish of recovery.
* Read_only/read_write. Called on changes of read_only state of
instance.
* Shutdown. Called on controlled shutdown.
* Replicaset_add/replicaset_remove. Called on changes in space _cluster.

Errors inside triggers are logged and don't influence on instance
behaviour.

Continue #3159
---
 src/box/alter.cc                   |   1 +
 src/box/box.cc                     |  44 ++++++-
 src/box/engine.c                   |  14 +++
 src/box/lua/ctl.c                  |   4 +-
 src/box/memtx_engine.c             |   2 +-
 src/box/replication.cc             |  14 +++
 test/replication/master_onctl.lua  |  34 +++++
 test/replication/onctl.result      | 250 +++++++++++++++++++++++++++++++++++++
 test/replication/onctl.test.lua    | 105 ++++++++++++++++
 test/replication/replica_onctl.lua |  34 +++++
 test/replication/suite.cfg         |   1 +
 11 files changed, 498 insertions(+), 5 deletions(-)
 create mode 100644 test/replication/master_onctl.lua
 create mode 100644 test/replication/onctl.result
 create mode 100644 test/replication/onctl.test.lua
 create mode 100644 test/replication/replica_onctl.lua

diff --git a/src/box/alter.cc b/src/box/alter.cc
index 6f6fcb0..7ec548b 100644
--- a/src/box/alter.cc
+++ b/src/box/alter.cc
@@ -52,6 +52,7 @@
 #include "identifier.h"
 #include "version.h"
 #include "sequence.h"

+#include "ctl.h" 
Not used.


 /**
  * chap-sha1 of empty string, i.e.
diff --git a/src/box/box.cc b/src/box/box.cc
index 26277e7..1de37d2 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -113,6 +113,11 @@ static fiber_cond ro_cond;
  */
 static bool is_orphan = true;

+/**
+ * Fiber used for on_ctl_event trigger.
+ */
+static fiber *ro_checker;
+
 /* Use the shared instance of xstream for all appliers */
 static struct xstream join_stream;
 static struct xstream subscribe_stream;
@@ -1573,6 +1578,9 @@ box_set_replicaset_uuid(const struct tt_uuid 
*replicaset_uuid)
 void
 box_free(void)
 {
+      if (run_on_ctl_event_trigger_type(CTL_EVENT_SHUTDOWN) < 0)
+              say_error("ctl_trigger error in shutdown: %s",
+                        diag_last_error(diag_get())->errmsg);
       /*
        * See gh-584 "box_free() is called even if box is not
        * initialized
@@ -1592,7 +1600,8 @@ box_free(void)
               engine_shutdown();
               wal_thread_stop();
       }
-
+      if (!fiber_is_dead(ro_checker))
+              fiber_cancel(ro_checker);
       fiber_cond_destroy(&ro_cond);
 }

@@ -1693,6 +1702,10 @@ bootstrap_from_master(struct replica *master)
       engine_begin_initial_recovery_xc(NULL);
       applier_resume_to_state(applier, APPLIER_FINAL_JOIN, TIMEOUT_INFINITY);

+      if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+              say_error("ctl_trigger error in system space recovery: %s",
+                        diag_last_error(diag_get())->errmsg);
+
       /*
        * Process final data (WALs).
        */
@@ -1755,11 +1768,35 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, 
int events)
       cbus_process(endpoint);
 }

+static int
+check_ro_f(MAYBE_UNUSED va_list ap)
+{
+      double timeout = TIMEOUT_INFINITY;
+      struct on_ctl_event_ctx ctx;
+      memset(&ctx, 0, sizeof(ctx));
+      while (true) {
+              if (box_wait_ro(!box_is_ro(), timeout) != 0) {
+                      if (fiber_is_cancelled())
+                              break;
+                      else
+                              return -1;
+              }
+              if (run_on_ctl_event_trigger_type(
+                      box_is_ro() ? CTL_EVENT_READ_ONLY:
+                                            CTL_EVENT_READ_WRITE) < 0)
+                      say_error("ctl_trigger error in %s: %s",
+                                box_is_ro() ? "read_only" :"read_write",
+                                diag_last_error(diag_get())->errmsg);
+      }
+      return 0;
+}

Is it worth it to create a separate fiber? May be check it near or inside 
fiber_cond_broadcast(&ro_cond)?


 void
 box_init(void)
 {
       fiber_cond_create(&ro_cond);
-
+      ro_checker = fiber_new_xc("check_ro", check_ro_f);
+      fiber_start(ro_checker, NULL);
       user_cache_init();
       /*
        * The order is important: to initialize sessions,
@@ -1885,6 +1922,9 @@ box_cfg_xc(void)
                */
               memtx_engine_recover_snapshot_xc(memtx,
                               &last_checkpoint_vclock);
+              if 
(run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+                      say_error("ctl_trigger error in system space recovery: 
%s",
+                                diag_last_error(diag_get())->errmsg);

               engine_begin_final_recovery_xc();
               recovery_follow_local(recovery, &wal_stream.base, "hot_standby",
diff --git a/src/box/engine.c b/src/box/engine.c
index 82293fd..fa78753 100644
--- a/src/box/engine.c
+++ b/src/box/engine.c
@@ -29,10 +29,12 @@
  * SUCH DAMAGE.
  */
 #include "engine.h"
+#include "ctl.h"

 #include <stdint.h>
 #include <string.h>
 #include <small/rlist.h>

+#include <fiber.h>
Not used.

 RLIST_HEAD(engines);

@@ -73,6 +75,14 @@ engine_bootstrap(void)
               if (engine->vtab->bootstrap(engine) != 0)
                       return -1;
       }
+      if (run_on_ctl_event_trigger_type(CTL_EVENT_SYSTEM_SPACE_RECOVERY) < 0)
+              say_error("ctl_trigger error in system space recovery: %s",
+                        diag_last_error(diag_get())->errmsg);
+
+      if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0)
+              say_error("ctl_trigger error in local recovery: %s",
+                        diag_last_error(diag_get())->errmsg);
+
       return 0;
 }

@@ -111,6 +121,10 @@ engine_end_recovery(void)
               if (engine->vtab->end_recovery(engine) != 0)
                       return -1;
       }
+      if (run_on_ctl_event_trigger_type(CTL_EVENT_LOCAL_RECOVERY) < 0)
+              say_error("ctl_trigger error in local recovery: %s",
+                        diag_last_error(diag_get())->errmsg);
+
       return 0;
 }

diff --git a/src/box/lua/ctl.c b/src/box/lua/ctl.c
index 52f320a..5bd9be3 100644
--- a/src/box/lua/ctl.c
+++ b/src/box/lua/ctl.c
@@ -122,8 +122,8 @@ box_lua_ctl_init(struct lua_State *L)
       lua_pushnumber(L, CTL_EVENT_SHUTDOWN);
       lua_setfield(L, -2, "SHUTDOWN");
       lua_pushnumber(L, CTL_EVENT_REPLICASET_ADD);
-      lua_setfield(L, -2, "CTL_EVENT_REPLICASET_ADD");
+      lua_setfield(L, -2, "REPLICASET_ADD");
       lua_pushnumber(L, CTL_EVENT_REPLICASET_REMOVE);
-      lua_setfield(L, -2, "CTL_EVENT_REPLICASET_REMOVE");
+      lua_setfield(L, -2, "REPLICASET_REMOVE");
       lua_pop(L, 2); /* box, ctl */
 }
diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c
index fac84ce..e737ea3 100644
--- a/src/box/memtx_engine.c
+++ b/src/box/memtx_engine.c
@@ -48,6 +48,7 @@
 #include "replication.h"
 #include "schema.h"
 #include "gc.h"

+#include "ctl.h"
Also not used.

 static void
 txn_on_yield_or_stop(struct trigger *trigger, void *event)
@@ -197,7 +198,6 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
        */
       if (!xlog_cursor_is_eof(&cursor))
               panic("snapshot `%s' has no EOF marker", filename);
-
       return 0;
 }

diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e1769..75aecd0 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -41,6 +41,7 @@
 #include "error.h"
 #include "relay.h"
 #include "vclock.h" /* VCLOCK_MAX */
+#include "ctl.h"

 uint32_t instance_id = REPLICA_ID_NIL;
 struct tt_uuid INSTANCE_UUID;
@@ -172,6 +173,12 @@ replicaset_add(uint32_t replica_id, const struct tt_uuid 
*replica_uuid)
       replica->uuid = *replica_uuid;
       replica_hash_insert(&replicaset.hash, replica);
       replica_set_id(replica, replica_id);
+      struct on_ctl_event_ctx on_ctl_ctx;
+      on_ctl_ctx.type = CTL_EVENT_REPLICASET_ADD;
+      on_ctl_ctx.replica_id = replica_id;
+      if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+              say_error("ctl_trigger error in replica add: %s",
+                        diag_last_error(diag_get())->errmsg);
       return replica;
 }

@@ -203,12 +210,19 @@ replica_clear_id(struct replica *replica)
        * Some records may arrive later on due to asynchronous nature of
        * replication.
        */
+      struct on_ctl_event_ctx on_ctl_ctx;
+      on_ctl_ctx.type = CTL_EVENT_REPLICASET_REMOVE;
+      on_ctl_ctx.replica_id = replica->id;
+
       replicaset.replica_by_id[replica->id] = NULL;
       replica->id = REPLICA_ID_NIL;
       if (replica_is_orphan(replica)) {
               replica_hash_remove(&replicaset.hash, replica);
               replica_delete(replica);
       }
+      if (run_on_ctl_event_triggers(&on_ctl_ctx) < 0)
+              say_error("ctl_trigger error in replica remove: %s",
+                        diag_last_error(diag_get())->errmsg);
 }

 static void
diff --git a/test/replication/master_onctl.lua 
b/test/replication/master_onctl.lua
new file mode 100644
index 0000000..e0eb39a
--- /dev/null
+++ b/test/replication/master_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+    if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+        SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+    elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+        LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+    elseif ctx.type == box.ctl.event.READ_ONLY then
+        READ_ONLY = READ_ONLY + 1
+    elseif ctx.type == box.ctl.event.READ_WRITE then
+        READ_WRITE = READ_WRITE + 1
+    elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+        table.insert(REPLICASET_ADD, ctx.replica_id)
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        table.insert(REPLICASET_REMOVE, ctx.replica_id)
+    end
+end
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    memtx_memory        = 107374182,
+    replication_connect_timeout = 0.5,
+    on_ctl_event        = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/onctl.result b/test/replication/onctl.result
new file mode 100644
index 0000000..19b3e67
--- /dev/null
+++ b/test/replication/onctl.result
@@ -0,0 +1,250 @@
+env = require('test_run')
+---
+...
+test_run = env.new()
+---
+...
+test_run:cmd("create server master with 
script='replication/master_onctl.lua'")
+---
+- true
+...
+test_run:cmd("create server replica with rpl_master=master, 
script='replication/replica_onctl.lua'")
+---
+- true
+...
+test_run:cmd("start server master")
+---
+- true
+...
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.grant('guest', 'replication')
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+-- must be two entries. First from bootstrap.snap, second for current 
instance.
+REPLICASET_ADD
+---
+- - 1
+  - 1
+...
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+---
+- - 1
+...
+REPLICASET_ADD = {}
+---
+...
+REPLICASET_REMOVE = {}
+---
+...
+new_replica_id = 0
+---
+...
+deleted_replica_id = 0
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_new(ctx)
+    if ctx.type == box.ctl.event.REPLICASET_ADD then
+        new_replica_id = ctx.replica_id
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        deleted_replica_id = ctx.replica_id
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+_ = box.ctl.on_ctl_event(on_ctl_new)
+---
+...
+test_run:cmd("start server replica")
+---
+- true
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 0
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_ctl_shutdown(ctx)
+    if ctx.type == box.ctl.event.SHUTDOWN then
+        require("log").info("test replica shutdown")
+    end
+end;
+---
+...
+function on_ctl_error(ctx)
+    error("trigger error")
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- []
+...
+box.cfg{read_only = true}
+---
+...
+fiber = require("fiber")
+---
+...
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+---
+...
+READ_ONLY
+---
+- 1
+...
+box.cfg{on_ctl_event = on_ctl_error}
+---
+...
+box.cfg{read_only = false}
+---
+...
+test_run:grep_log('replica', 'ctl_trigger error')
+---
+- ctl_trigger error
+...
+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}}
+---
+...
+box.cfg{on_ctl_event = on_ctl_shutdown}
+---
+...
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
+--test_run:grep_log('replica', 'test replica shutdown')
+test_run:cmd("switch master")
+---
+- true
+...
+box.schema.user.revoke('guest', 'replication')
+---
+...
+_ = box.space._cluster:delete{2}
+---
+...
+SYSTEM_SPACE_RECOVERY
+---
+- 1
+...
+LOCAL_RECOVERY
+---
+- 1
+...
+READ_ONLY
+---
+- 0
+...
+READ_WRITE
+---
+- 1
+...
+REPLICASET_ADD
+---
+- - 2
+...
+REPLICASET_REMOVE
+---
+- - 2
+...
+new_replica_id
+---
+- 2
+...
+deleted_replica_id
+---
+- 2
+...
+box.ctl.on_ctl_event(nil, on_ctl_new)
+---
+...
+-- cleanup
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("stop server master")
+---
+- true
+...
+test_run:cmd("cleanup server master")
+---
+- true
+...
+test_run:cmd("stop server replica")
+---
+- true
+...
+test_run:cmd("cleanup server replica")
+---
+- true
+...
diff --git a/test/replication/onctl.test.lua b/test/replication/onctl.test.lua
new file mode 100644
index 0000000..ff6a898
--- /dev/null
+++ b/test/replication/onctl.test.lua
@@ -0,0 +1,105 @@
+env = require('test_run')
+test_run = env.new()
+
+test_run:cmd("create server master with 
script='replication/master_onctl.lua'")
+test_run:cmd("create server replica with rpl_master=master, 
script='replication/replica_onctl.lua'")
+
+test_run:cmd("start server master")
+test_run:cmd("switch master")
+box.schema.user.grant('guest', 'replication')
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+-- must be two entries. First from bootstrap.snap, second for current 
instance.
+REPLICASET_ADD
+-- must be one entry. Deletion of initial tuple in _cluster space.
+REPLICASET_REMOVE
+
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+new_replica_id = 0
+deleted_replica_id = 0
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_new(ctx)
+    if ctx.type == box.ctl.event.REPLICASET_ADD then
+        new_replica_id = ctx.replica_id
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        deleted_replica_id = ctx.replica_id
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+_ = box.ctl.on_ctl_event(on_ctl_new)
+
+test_run:cmd("start server replica")
+
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+test_run:cmd("switch replica")
+
+test_run:cmd("setopt delimiter ';'")
+function on_ctl_shutdown(ctx)
+    if ctx.type == box.ctl.event.SHUTDOWN then
+        require("log").info("test replica shutdown")
+    end
+end;
+
+function on_ctl_error(ctx)
+    error("trigger error")
+end;
+
+test_run:cmd("setopt delimiter ''");
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+box.cfg{read_only = true}
+fiber = require("fiber")
+while READ_ONLY == 0 do fiber.sleep(0.001) end
+READ_ONLY
+
+box.cfg{on_ctl_event = on_ctl_error}
+box.cfg{read_only = false}
+test_run:grep_log('replica', 'ctl_trigger error')
+box.cfg{on_ctl_event = {box.NULL, on_ctl_error}}
+box.cfg{on_ctl_event = on_ctl_shutdown}
+
+test_run:cmd("restart server replica")
+-- TODO: test SHUTDOWN, when it is possible to grep logs of killed replica.
+--test_run:grep_log('replica', 'test replica shutdown') 
It should be done

+
+
+test_run:cmd("switch master")
+box.schema.user.revoke('guest', 'replication')
+_ = box.space._cluster:delete{2}
+
+SYSTEM_SPACE_RECOVERY
+LOCAL_RECOVERY
+READ_ONLY
+READ_WRITE
+REPLICASET_ADD
+REPLICASET_REMOVE
+
+new_replica_id
+deleted_replica_id
+
+box.ctl.on_ctl_event(nil, on_ctl_new)
+
+-- cleanup
+test_run:cmd("switch default")
+test_run:cmd("stop server master")
+test_run:cmd("cleanup server master")
+test_run:cmd("stop server replica")
+test_run:cmd("cleanup server replica")
diff --git a/test/replication/replica_onctl.lua 
b/test/replication/replica_onctl.lua
new file mode 100644
index 0000000..d6ce73c
--- /dev/null
+++ b/test/replication/replica_onctl.lua
@@ -0,0 +1,34 @@
+#!/usr/bin/env tarantool
+
+SYSTEM_SPACE_RECOVERY = 0
+LOCAL_RECOVERY = 0
+READ_ONLY = 0
+READ_WRITE = 0
+REPLICASET_ADD = {}
+REPLICASET_REMOVE = {}
+
+local function onctl(ctx)
+    if ctx.type == box.ctl.event.SYSTEM_SPACE_RECOVERY then
+        SYSTEM_SPACE_RECOVERY = SYSTEM_SPACE_RECOVERY + 1
+    elseif ctx.type == box.ctl.event.LOCAL_RECOVERY then
+        LOCAL_RECOVERY = LOCAL_RECOVERY + 1
+    elseif ctx.type == box.ctl.event.READ_ONLY then
+        READ_ONLY = READ_ONLY + 1
+    elseif ctx.type == box.ctl.event.READ_WRITE then
+        READ_WRITE = READ_WRITE + 1
+    elseif ctx.type == box.ctl.event.REPLICASET_ADD then
+        table.insert(REPLICASET_ADD, ctx.replica_id)
+    elseif ctx.type == box.ctl.event.REPLICASET_REMOVE then
+        table.insert(REPLICASET_REMOVE, ctx.replica_id)
+    end
+end
+
+box.cfg({
+    listen              = os.getenv("LISTEN"),
+    replication         = os.getenv("MASTER"),
+    memtx_memory        = 107374182,
+    replication_connect_timeout = 0.5,
+    on_ctl_event        = onctl,
+})
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/replication/suite.cfg b/test/replication/suite.cfg
index 95e94e5..365a825 100644
--- a/test/replication/suite.cfg
+++ b/test/replication/suite.cfg
@@ -6,6 +6,7 @@
     "wal_off.test.lua": {},
     "hot_standby.test.lua": {},
     "rebootstrap.test.lua": {},
+    "onctl.test.lua": {},
     "*": {
         "memtx": {"engine": "memtx"},
         "vinyl": {"engine": "vinyl"}
-- 
2.7.4




Best regards,
Konstantin Belyavskiy
k.belyavskiy@xxxxxxxxxxxxx

Other related posts:

  • » [tarantool-patches] Re: [tarantool-patches] [box.ctl 2/3] box.ctl: Add on_ctl_event trigger calls - Konstantin Belyavskiy