[tarantool-patches] [PATCH v3 05/11] gc: keep track of vclocks instead of signatures

  • From: Vladimir Davydov <vdavydov.dev@xxxxxxxxx>
  • To: kostja@xxxxxxxxxxxxx
  • Date: Sat, 14 Jul 2018 23:49:20 +0300

In order to check if a replica needs to be rebootstrapped, we need to
know the vclock of the oldest WAL stored on the master, but the garbage
collector works with signatures and hence can't report the vclock it was
last called for. Actually, all gc users have a vclock and can pass it
instead of signature so it's pretty easy to switch garbage collection
infrastructure to vclock.

Needed for #461
---
 src/box/box.cc     |  5 ++--
 src/box/gc.c       | 83 +++++++++++++++++++++++++++++-------------------------
 src/box/gc.h       | 17 ++++++-----
 src/box/lua/info.c |  4 ++-
 src/box/relay.cc   | 18 ++++++------
 5 files changed, 66 insertions(+), 61 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index 200e49a1..7aac0a13 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1422,7 +1422,7 @@ box_process_join(struct ev_io *io, struct xrow_header 
*header)
        /* Register the replica with the garbage collector. */
        struct gc_consumer *gc = gc_consumer_register(
                tt_sprintf("replica %s", tt_uuid_str(&instance_uuid)),
-               vclock_sum(&start_vclock), GC_CONSUMER_WAL);
+               &start_vclock, GC_CONSUMER_WAL);
        if (gc == NULL)
                diag_raise();
        auto gc_guard = make_scoped_guard([=]{
@@ -2124,8 +2124,7 @@ box_backup_start(int checkpoint_idx, box_backup_cb cb, 
void *cb_arg)
                        return -1;
                }
        } while (checkpoint_idx-- > 0);
-       backup_gc = gc_consumer_register("backup", vclock_sum(vclock),
-                                        GC_CONSUMER_ALL);
+       backup_gc = gc_consumer_register("backup", vclock, GC_CONSUMER_ALL);
        if (backup_gc == NULL)
                return -1;
        int rc = engine_backup(vclock, cb, cb_arg);
diff --git a/src/box/gc.c b/src/box/gc.c
index 6a05b298..6c324220 100644
--- a/src/box/gc.c
+++ b/src/box/gc.c
@@ -59,8 +59,8 @@ struct gc_consumer {
        gc_node_t node;
        /** Human-readable name. */
        char *name;
-       /** The vclock signature tracked by this consumer. */
-       int64_t signature;
+       /** The vclock tracked by this consumer. */
+       struct vclock vclock;
        /** Consumer type, indicating that consumer only consumes
         * WAL files, or both - SNAP and WAL.
         */
@@ -73,10 +73,10 @@ typedef rb_tree(struct gc_consumer) gc_tree_t;
 struct gc_state {
        /** Number of checkpoints to maintain. */
        int checkpoint_count;
-       /** Max signature WAL garbage collection has been called for. */
-       int64_t wal_signature;
-       /** Max signature checkpoint garbage collection has been called for. */
-       int64_t checkpoint_signature;
+       /** Max vclock WAL garbage collection has been called for. */
+       struct vclock wal_vclock;
+       /** Max vclock checkpoint garbage collection has been called for. */
+       struct vclock checkpoint_vclock;
        /** Registered consumers, linked by gc_consumer::node. */
        gc_tree_t consumers;
        /**
@@ -94,9 +94,9 @@ static struct gc_state gc;
 static inline int
 gc_consumer_cmp(const struct gc_consumer *a, const struct gc_consumer *b)
 {
-       if (a->signature < b->signature)
+       if (vclock_sum(&a->vclock) < vclock_sum(&b->vclock))
                return -1;
-       if (a->signature > b->signature)
+       if (vclock_sum(&a->vclock) > vclock_sum(&b->vclock))
                return 1;
        if ((intptr_t)a < (intptr_t)b)
                return -1;
@@ -110,7 +110,7 @@ rb_gen(MAYBE_UNUSED static inline, gc_tree_, gc_tree_t,
 
 /** Allocate a consumer object. */
 static struct gc_consumer *
-gc_consumer_new(const char *name, int64_t signature,
+gc_consumer_new(const char *name, const struct vclock *vclock,
                enum gc_consumer_type type)
 {
        struct gc_consumer *consumer = calloc(1, sizeof(*consumer));
@@ -126,7 +126,7 @@ gc_consumer_new(const char *name, int64_t signature,
                free(consumer);
                return NULL;
        }
-       consumer->signature = signature;
+       vclock_copy(&consumer->vclock, vclock);
        consumer->type = type;
        return consumer;
 }
@@ -143,8 +143,8 @@ gc_consumer_delete(struct gc_consumer *consumer)
 void
 gc_init(void)
 {
-       gc.wal_signature = -1;
-       gc.checkpoint_signature = -1;
+       vclock_create(&gc.wal_vclock);
+       vclock_create(&gc.checkpoint_vclock);
        gc_tree_new(&gc.consumers);
        latch_create(&gc.latch);
 }
@@ -191,7 +191,8 @@ gc_run(void)
         * We have to maintain @checkpoint_count oldest checkpoints,
         * plus we can't remove checkpoints that are still in use.
         */
-       int64_t gc_checkpoint_signature = -1;
+       struct vclock gc_checkpoint_vclock;
+       vclock_create(&gc_checkpoint_vclock);
 
        struct checkpoint_iterator checkpoints;
        checkpoint_iterator_init(&checkpoints);
@@ -201,18 +202,21 @@ gc_run(void)
                if (--checkpoint_count > 0)
                        continue;
                if (leftmost_checkpoint != NULL &&
-                   leftmost_checkpoint->signature < vclock_sum(vclock))
+                   vclock_sum(&leftmost_checkpoint->vclock) < 
vclock_sum(vclock))
                        continue;
-               gc_checkpoint_signature = vclock_sum(vclock);
+               vclock_copy(&gc_checkpoint_vclock, vclock);
                break;
        }
 
-       int64_t gc_wal_signature = MIN(gc_checkpoint_signature,
-                                      leftmost != NULL ?
-                                      leftmost->signature : INT64_MAX);
+       struct vclock gc_wal_vclock;
+       if (leftmost != NULL &&
+           vclock_sum(&leftmost->vclock) < vclock_sum(&gc_checkpoint_vclock))
+               vclock_copy(&gc_wal_vclock, &leftmost->vclock);
+       else
+               vclock_copy(&gc_wal_vclock, &gc_checkpoint_vclock);
 
-       if (gc_wal_signature <= gc.wal_signature &&
-           gc_checkpoint_signature <= gc.checkpoint_signature)
+       if (vclock_sum(&gc_wal_vclock) <= vclock_sum(&gc.wal_vclock) &&
+           vclock_sum(&gc_checkpoint_vclock) <= 
vclock_sum(&gc.checkpoint_vclock))
                return; /* nothing to do */
 
        /*
@@ -231,14 +235,14 @@ gc_run(void)
         */
        int rc = 0;
 
-       if (gc_checkpoint_signature > gc.checkpoint_signature) {
-               gc.checkpoint_signature = gc_checkpoint_signature;
-               rc = engine_collect_garbage(gc_checkpoint_signature);
+       if (vclock_sum(&gc_checkpoint_vclock) > 
vclock_sum(&gc.checkpoint_vclock)) {
+               vclock_copy(&gc.checkpoint_vclock, &gc_checkpoint_vclock);
+               rc = engine_collect_garbage(vclock_sum(&gc_checkpoint_vclock));
        }
-       if (gc_wal_signature > gc.wal_signature) {
-               gc.wal_signature = gc_wal_signature;
+       if (vclock_sum(&gc_wal_vclock) > vclock_sum(&gc.wal_vclock)) {
+               vclock_copy(&gc.wal_vclock, &gc_wal_vclock);
                if (rc == 0)
-                       wal_collect_garbage(gc_wal_signature);
+                       wal_collect_garbage(vclock_sum(&gc_wal_vclock));
        }
 
        latch_unlock(&gc.latch);
@@ -251,11 +255,10 @@ gc_set_checkpoint_count(int checkpoint_count)
 }
 
 struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature,
+gc_consumer_register(const char *name, const struct vclock *vclock,
                     enum gc_consumer_type type)
 {
-       struct gc_consumer *consumer = gc_consumer_new(name, signature,
-                                                      type);
+       struct gc_consumer *consumer = gc_consumer_new(name, vclock, type);
        if (consumer != NULL)
                gc_tree_insert(&gc.consumers, consumer);
        return consumer;
@@ -264,7 +267,7 @@ gc_consumer_register(const char *name, int64_t signature,
 void
 gc_consumer_unregister(struct gc_consumer *consumer)
 {
-       int64_t signature = consumer->signature;
+       int64_t signature = vclock_sum(&consumer->vclock);
 
        gc_tree_remove(&gc.consumers, consumer);
        gc_consumer_delete(consumer);
@@ -274,14 +277,15 @@ gc_consumer_unregister(struct gc_consumer *consumer)
         * if it referenced the oldest vclock.
         */
        struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
-       if (leftmost == NULL || leftmost->signature > signature)
+       if (leftmost == NULL || vclock_sum(&leftmost->vclock) > signature)
                gc_run();
 }
 
 void
-gc_consumer_advance(struct gc_consumer *consumer, int64_t signature)
+gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock)
 {
-       int64_t prev_signature = consumer->signature;
+       int64_t signature = vclock_sum(vclock);
+       int64_t prev_signature = vclock_sum(&consumer->vclock);
 
        assert(signature >= prev_signature);
        if (signature == prev_signature)
@@ -292,12 +296,13 @@ gc_consumer_advance(struct gc_consumer *consumer, int64_t 
signature)
         * is violated.
         */
        struct gc_consumer *next = gc_tree_next(&gc.consumers, consumer);
-       bool update_tree = (next != NULL && signature >= next->signature);
+       bool update_tree = (next != NULL &&
+                           signature >= vclock_sum(&next->vclock));
 
        if (update_tree)
                gc_tree_remove(&gc.consumers, consumer);
 
-       consumer->signature = signature;
+       vclock_copy(&consumer->vclock, vclock);
 
        if (update_tree)
                gc_tree_insert(&gc.consumers, consumer);
@@ -307,7 +312,7 @@ gc_consumer_advance(struct gc_consumer *consumer, int64_t 
signature)
         * if it referenced the oldest vclock.
         */
        struct gc_consumer *leftmost = gc_tree_first(&gc.consumers);
-       if (leftmost == NULL || leftmost->signature > prev_signature)
+       if (leftmost == NULL || vclock_sum(&leftmost->vclock) > prev_signature)
                gc_run();
 }
 
@@ -317,10 +322,10 @@ gc_consumer_name(const struct gc_consumer *consumer)
        return consumer->name;
 }
 
-int64_t
-gc_consumer_signature(const struct gc_consumer *consumer)
+void
+gc_consumer_vclock(const struct gc_consumer *consumer, struct vclock *vclock)
 {
-       return consumer->signature;
+       vclock_copy(vclock, &consumer->vclock);
 }
 
 struct gc_consumer *
diff --git a/src/box/gc.h b/src/box/gc.h
index 6a890b7b..7e061768 100644
--- a/src/box/gc.h
+++ b/src/box/gc.h
@@ -32,13 +32,12 @@
  */
 
 #include <stddef.h>
-#include <stdint.h>
-#include <stdbool.h>
 
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+struct vclock;
 struct gc_consumer;
 
 /** Consumer type: WAL consumer, or SNAP */
@@ -79,7 +78,7 @@ gc_set_checkpoint_count(int checkpoint_count);
  * Register a consumer.
  *
  * This will stop garbage collection of objects newer than
- * @signature until the consumer is unregistered or advanced.
+ * @vclock until the consumer is unregistered or advanced.
  * @name is a human-readable name of the consumer, it will
  * be used for reporting the consumer to the user.
  * @type consumer type, reporting whether consumer only depends
@@ -89,7 +88,7 @@ gc_set_checkpoint_count(int checkpoint_count);
  * memory allocation failure.
  */
 struct gc_consumer *
-gc_consumer_register(const char *name, int64_t signature,
+gc_consumer_register(const char *name, const struct vclock *vclock,
                     enum gc_consumer_type type);
 
 /**
@@ -100,19 +99,19 @@ void
 gc_consumer_unregister(struct gc_consumer *consumer);
 
 /**
- * Advance the vclock signature tracked by a consumer and
+ * Advance the vclock tracked by a consumer and
  * invoke garbage collection if needed.
  */
 void
-gc_consumer_advance(struct gc_consumer *consumer, int64_t signature);
+gc_consumer_advance(struct gc_consumer *consumer, const struct vclock *vclock);
 
 /** Return the name of a consumer. */
 const char *
 gc_consumer_name(const struct gc_consumer *consumer);
 
-/** Return the signature a consumer tracks. */
-int64_t
-gc_consumer_signature(const struct gc_consumer *consumer);
+/** Return the vclock a consumer tracks. */
+void
+gc_consumer_vclock(const struct gc_consumer *consumer, struct vclock *vclock);
 
 /**
  * Iterator over registered consumers. The iterator is valid
diff --git a/src/box/lua/info.c b/src/box/lua/info.c
index d6697df9..4544d8b6 100644
--- a/src/box/lua/info.c
+++ b/src/box/lua/info.c
@@ -396,7 +396,9 @@ lbox_info_gc_call(struct lua_State *L)
                lua_settable(L, -3);
 
                lua_pushstring(L, "signature");
-               luaL_pushint64(L, gc_consumer_signature(consumer));
+               struct vclock vclock;
+               gc_consumer_vclock(consumer, &vclock);
+               luaL_pushint64(L, vclock_sum(&vclock));
                lua_settable(L, -3);
 
                lua_rawseti(L, -2, ++count);
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 75c3d56a..4cacbc84 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -78,8 +78,8 @@ struct relay_gc_msg {
        struct stailq_entry in_pending;
        /** Relay instance */
        struct relay *relay;
-       /** Vclock signature to advance to */
-       int64_t signature;
+       /** Vclock to advance to */
+       struct vclock vclock;
 };
 
 /** State of a replication relay. */
@@ -325,7 +325,7 @@ static void
 tx_gc_advance(struct cmsg *msg)
 {
        struct relay_gc_msg *m = (struct relay_gc_msg *)msg;
-       gc_consumer_advance(m->relay->replica->gc, m->signature);
+       gc_consumer_advance(m->relay->replica->gc, &m->vclock);
        free(m);
 }
 
@@ -343,7 +343,7 @@ relay_on_close_log_f(struct trigger *trigger, void * /* 
event */)
        }
        cmsg_init(&m->msg, route);
        m->relay = relay;
-       m->signature = vclock_sum(&relay->r->vclock);
+       vclock_copy(&m->vclock, &relay->r->vclock);
        /*
         * Do not invoke garbage collection until the replica
         * confirms that it has received data stored in the
@@ -356,16 +356,16 @@ relay_on_close_log_f(struct trigger *trigger, void * /* 
event */)
  * Invoke pending garbage collection requests.
  *
  * This function schedules the most recent gc message whose
- * signature is less than or equal to the given one. Older
+ * vclock is less than or equal to the given one. Older
  * messages are discarded as their job will be done by the
  * scheduled message anyway.
  */
 static inline void
-relay_schedule_pending_gc(struct relay *relay, int64_t signature)
+relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock)
 {
        struct relay_gc_msg *curr, *next, *gc_msg = NULL;
        stailq_foreach_entry_safe(curr, next, &relay->pending_gc, in_pending) {
-               if (curr->signature > signature)
+               if (vclock_sum(&curr->vclock) > vclock_sum(vclock))
                        break;
                stailq_shift(&relay->pending_gc);
                free(gc_msg);
@@ -533,7 +533,7 @@ relay_subscribe_f(va_list ap)
                relay->status_msg.relay = relay;
                cpipe_push(&relay->tx_pipe, &relay->status_msg.msg);
                /* Collect xlog files received by the replica. */
-               relay_schedule_pending_gc(relay, vclock_sum(send_vclock));
+               relay_schedule_pending_gc(relay, send_vclock);
        }
 
        say_crit("exiting the relay loop");
@@ -578,7 +578,7 @@ relay_subscribe(struct replica *replica, int fd, uint64_t 
sync,
        if (replica->gc == NULL) {
                replica->gc = gc_consumer_register(
                        tt_sprintf("replica %s", tt_uuid_str(&replica->uuid)),
-                       vclock_sum(replica_clock), GC_CONSUMER_WAL);
+                       replica_clock, GC_CONSUMER_WAL);
                if (replica->gc == NULL)
                        diag_raise();
        }
-- 
2.11.0


Other related posts: