[tarantool-patches] [PATCH v3 03/11] Introduce IPROTO_REQUEST_STATUS command

  • From: Vladimir Davydov <vdavydov.dev@xxxxxxxxx>
  • To: kostja@xxxxxxxxxxxxx
  • Date: Sat, 14 Jul 2018 23:49:18 +0300

The new command is supposed to supersede IPROTO_REQUEST_VOTE, which is
difficult to extend, because it uses the global iproto key namespace.
The new command returns a map (IPROTO_STATUS), to which we can add
various information without polluting the global namespace. Currently,
the map contains IPROTO_STATUS_IS_RO and IPROTO_STATUS_VCLOCK keys,
but soon it will be added info needed for replica rebootstrap feature.

Needed for #461
---
 src/box/applier.cc         |  6 +--
 src/box/applier.h          |  8 ++--
 src/box/box.cc             |  7 ++++
 src/box/box.h              |  3 ++
 src/box/iproto.cc          |  7 ++++
 src/box/iproto_constants.c |  3 +-
 src/box/iproto_constants.h | 13 +++++-
 src/box/replication.cc     |  9 +++--
 src/box/xrow.c             | 98 +++++++++++++++++++++++++++++++++++++++++++++-
 src/box/xrow.h             | 80 +++++++++++++++++++++++--------------
 10 files changed, 188 insertions(+), 46 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 556502bf..ad2710a3 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -218,14 +218,12 @@ applier_connect(struct applier *applier)
         * It will be used for leader election on bootstrap.
         */
        if (applier->version_id >= version_id(1, 7, 7)) {
-               xrow_encode_request_vote(&row);
+               xrow_encode_status_request(&row);
                coio_write_xrow(coio, &row);
                coio_read_xrow(coio, ibuf, &row);
                if (row.type != IPROTO_OK)
                        xrow_decode_error_xc(&row);
-               vclock_create(&applier->vclock);
-               xrow_decode_request_vote_xc(&row, &applier->vclock,
-                                           &applier->remote_is_ro);
+               xrow_decode_status_xc(&row, &applier->remote_status);
        }
 
        applier_set_state(applier, APPLIER_CONNECTED);
diff --git a/src/box/applier.h b/src/box/applier.h
index c33562cc..29b4e5af 100644
--- a/src/box/applier.h
+++ b/src/box/applier.h
@@ -43,7 +43,7 @@
 #include "tt_uuid.h"
 #include "uri.h"
 
-#include "vclock.h"
+#include "xrow.h"
 
 struct xstream;
 
@@ -94,10 +94,8 @@ struct applier {
        struct uri uri;
        /** Remote version encoded as a number, see version_id() macro */
        uint32_t version_id;
-       /** Remote vclock at time of connect. */
-       struct vclock vclock;
-       /** Remote peer mode, true if read-only, default: false */
-       bool remote_is_ro;
+       /** Remote status at time of connect. */
+       struct status remote_status;
        /** Remote address */
        union {
                struct sockaddr addr;
diff --git a/src/box/box.cc b/src/box/box.cc
index 7fc15f33..200e49a1 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -1564,6 +1564,13 @@ box_process_subscribe(struct ev_io *io, struct 
xrow_header *header)
                        replica_version_id);
 }
 
+void
+box_process_status_request(struct status *status)
+{
+       status->is_ro = cfg_geti("read_only") != 0;
+       vclock_copy(&status->vclock, &replicaset.vclock);
+}
+
 /** Insert a new cluster into _schema */
 static void
 box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid)
diff --git a/src/box/box.h b/src/box/box.h
index 182e1b72..8c38b416 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header 
*header);
 void
 box_process_subscribe(struct ev_io *io, struct xrow_header *header);
 
+void
+box_process_status_request(struct status *status);
+
 /**
  * Check Lua configuration before initialization or
  * in case of a configuration change.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index cba81a22..17f161a3 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1159,6 +1159,7 @@ iproto_msg_decode(struct iproto_msg *msg, const char 
**pos, const char *reqend,
                *stop_input = true;
                break;
        case IPROTO_REQUEST_VOTE:
+       case IPROTO_REQUEST_STATUS:
                cmsg_init(&msg->base, misc_route);
                break;
        case IPROTO_AUTH:
@@ -1526,6 +1527,7 @@ tx_process_misc(struct cmsg *m)
                goto error;
 
        try {
+               struct status status;
                switch (msg->header.type) {
                case IPROTO_AUTH:
                        box_process_auth(&msg->auth, con->salt);
@@ -1542,6 +1544,11 @@ tx_process_misc(struct cmsg *m)
                                                     &replicaset.vclock,
                                                     cfg_geti("read_only"));
                        break;
+               case IPROTO_REQUEST_STATUS:
+                       box_process_status_request(&status);
+                       iproto_reply_status_xc(out, &status, msg->header.sync,
+                                              ::schema_version);
+                       break;
                default:
                        unreachable();
                }
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 3bc965bd..bc7dfd7d 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -87,6 +87,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
        /* 0x27 */      MP_STR, /* IPROTO_EXPR */
        /* 0x28 */      MP_ARRAY, /* IPROTO_OPS */
        /* 0x29 */      MP_BOOL, /* IPROTO_SERVER_IS_RO */
+       /* 0x2a */      MP_MAP, /* IPROTO_STATUS */
        /* }}} */
 };
 
@@ -168,7 +169,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
        "expression",       /* 0x27 */
        "operations",       /* 0x28 */
        "server is ro",     /* 0x29 */
-       NULL,               /* 0x2a */
+       "status",           /* 0x2a */
        NULL,               /* 0x2b */
        NULL,               /* 0x2c */
        NULL,               /* 0x2d */
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index ccbf2da5..ce4366ec 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -78,12 +78,18 @@ enum iproto_key {
        IPROTO_EXPR = 0x27, /* EVAL */
        IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */
        IPROTO_SERVER_IS_RO = 0x29,
+       IPROTO_STATUS = 0x2a,
        /* Leave a gap between request keys and response keys */
        IPROTO_DATA = 0x30,
        IPROTO_ERROR = 0x31,
        IPROTO_KEY_MAX
 };
 
+enum iproto_status_key {
+       IPROTO_STATUS_IS_RO = 0x01,
+       IPROTO_STATUS_VCLOCK = 0x02,
+};
+
 #define bit(c) (1ULL<<IPROTO_##c)
 
 #define IPROTO_HEAD_BMAP (bit(REQUEST_TYPE) | bit(SYNC) | bit(REPLICA_ID) |\
@@ -155,8 +161,13 @@ enum iproto_type {
        IPROTO_JOIN = 65,
        /** Replication SUBSCRIBE command */
        IPROTO_SUBSCRIBE = 66,
-       /** Vote request command for master election */
+       /**
+        * Vote request command for master election
+        * DEPRECATED: use IPROTO_REQUEST_STATUS instead
+        */
        IPROTO_REQUEST_VOTE = 67,
+       /** Instance status request command */
+       IPROTO_REQUEST_STATUS = 68,
 
        /** Vinyl run info stored in .index file */
        VY_INDEX_RUN_INFO = 100,
diff --git a/src/box/replication.cc b/src/box/replication.cc
index c1e17698..f12244c9 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -732,7 +732,8 @@ replicaset_round(bool skip_ro)
 {
        struct replica *leader = NULL;
        replicaset_foreach(replica) {
-               if (replica->applier == NULL)
+               struct applier *applier = replica->applier;
+               if (applier == NULL)
                        continue;
                /**
                 * While bootstrapping a new cluster, read-only
@@ -741,7 +742,7 @@ replicaset_round(bool skip_ro)
                 * replicas since there is still a possibility
                 * that all replicas exist in cluster table.
                 */
-               if (skip_ro && replica->applier->remote_is_ro)
+               if (skip_ro && applier->remote_status.is_ro)
                        continue;
                if (leader == NULL) {
                        leader = replica;
@@ -753,8 +754,8 @@ replicaset_round(bool skip_ro)
                 * with the same vclock, prefer the one with
                 * the lowest uuid.
                 */
-               int cmp = vclock_compare(&replica->applier->vclock,
-                                        &leader->applier->vclock);
+               int cmp = vclock_compare(&applier->remote_status.vclock,
+                               &leader->applier->remote_status.vclock);
                if (cmp < 0)
                        continue;
                if (cmp == 0 && tt_uuid_compare(&replica->uuid,
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 56197d0e..4bc1f81e 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -344,6 +344,41 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
 }
 
 int
+iproto_reply_status(struct obuf *out, const struct status *status,
+                   uint64_t sync, uint32_t schema_version)
+{
+       size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) +
+               mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(2) +
+               mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(status->is_ro) +
+               mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(&status->vclock);
+
+       char *buf = obuf_reserve(out, max_size);
+       if (buf == NULL) {
+               diag_set(OutOfMemory, max_size,
+                        "obuf_alloc", "buf");
+               return -1;
+       }
+
+       char *data = buf + IPROTO_HEADER_LEN;
+       data = mp_encode_map(data, 1);
+       data = mp_encode_uint(data, IPROTO_STATUS);
+       data = mp_encode_map(data, 2);
+       data = mp_encode_uint(data, IPROTO_STATUS_IS_RO);
+       data = mp_encode_bool(data, status->is_ro);
+       data = mp_encode_uint(data, IPROTO_STATUS_VCLOCK);
+       data = mp_encode_vclock(data, &status->vclock);
+       size_t size = data - buf;
+       assert(size <= max_size);
+
+       iproto_header_encode(buf, IPROTO_OK, sync, schema_version,
+                            size - IPROTO_HEADER_LEN);
+
+       char *ptr = obuf_alloc(out, size);
+       assert(ptr == buf);
+       return 0;
+}
+
+int
 iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
                   uint32_t schema_version)
 {
@@ -847,10 +882,69 @@ error:
 }
 
 void
-xrow_encode_request_vote(struct xrow_header *row)
+xrow_encode_status_request(struct xrow_header *row)
 {
        memset(row, 0, sizeof(*row));
-       row->type = IPROTO_REQUEST_VOTE;
+       row->type = IPROTO_REQUEST_STATUS;
+}
+
+int
+xrow_decode_status(struct xrow_header *row, struct status *status)
+{
+       status->is_ro = false;
+       vclock_create(&status->vclock);
+
+       if (row->bodycnt == 0)
+               goto err;
+       assert(row->bodycnt == 1);
+
+       const char *data = (const char *) row->body[0].iov_base;
+       const char *end = data + row->body[0].iov_len;
+       const char *tmp = data;
+       if (mp_check(&tmp, end) != 0 || mp_typeof(*data) != MP_MAP)
+               goto err;
+
+       /* Find STATUS key. */
+       uint32_t map_size = mp_decode_map(&data);
+       for (uint32_t i = 0; i < map_size; i++) {
+               if (mp_typeof(*data) != MP_UINT) {
+                       mp_next(&data); /* key */
+                       mp_next(&data); /* value */
+                       continue;
+               }
+               if (mp_decode_uint(&data) == IPROTO_STATUS)
+                       break;
+       }
+       if (data == end)
+               return 0;
+
+       /* Decode STATUS map. */
+       map_size = mp_decode_map(&data);
+       for (uint32_t i = 0; i < map_size; i++) {
+               if (mp_typeof(*data) != MP_UINT) {
+                       mp_next(&data); /* key */
+                       mp_next(&data); /* value */
+                       continue;
+               }
+               uint32_t key = mp_decode_uint(&data);
+               switch (key) {
+               case IPROTO_STATUS_IS_RO:
+                       if (mp_typeof(*data) != MP_BOOL)
+                               goto err;
+                       status->is_ro = mp_decode_bool(&data);
+                       break;
+               case IPROTO_STATUS_VCLOCK:
+                       if (mp_decode_vclock(&data, &status->vclock) != 0)
+                               goto err;
+                       break;
+               default:
+                       mp_next(&data);
+               }
+       }
+       return 0;
+err:
+       diag_set(ClientError, ER_INVALID_MSGPACK, "packet body");
+       return -1;
 }
 
 int
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 92ea3c97..67910d52 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -30,19 +30,19 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+#include <stdbool.h>
 #include <stdint.h>
 #include <stddef.h>
 #include <sys/uio.h> /* struct iovec */
 
 #include "tt_uuid.h"
 #include "diag.h"
+#include "vclock.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif
 
-struct vclock;
-
 enum {
        XROW_HEADER_IOVMAX = 1,
        XROW_BODY_IOVMAX = 2,
@@ -223,12 +223,28 @@ xrow_encode_auth(struct xrow_header *row, const char 
*salt, size_t salt_len,
                 const char *login, size_t login_len, const char *password,
                 size_t password_len);
 
+/** Instance status. */
+struct status {
+       /** Set if the instance is running in read-only mode. */
+       bool is_ro;
+       /** Current instance vclock. */
+       struct vclock vclock;
+};
+
 /**
- * Encode a vote request for master election.
+ * Decode STATUS from MessagePack.
+ * @param row Row to decode.
+ * @param[out] status
+ */
+int
+xrow_decode_status(struct xrow_header *row, struct status *status);
+
+/**
+ * Encode an instance status request.
  * @param row[out] Row to encode into.
  */
 void
-xrow_encode_request_vote(struct xrow_header *row);
+xrow_encode_status_request(struct xrow_header *row);
 
 /**
  * Encode SUBSCRIBE command.
@@ -315,22 +331,6 @@ xrow_decode_vclock(struct xrow_header *row, struct vclock 
*vclock)
 }
 
 /**
- * Decode peer vclock and access rights (a response to VOTE command).
- * @param row Row to decode.
- * @param[out] vclock.
- * @param[out] read_only.
- *
- * @retval  0 Success.
- * @retval -1 Memory or format error.
- */
-static inline int
-xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock,
-                        bool *read_only)
-{
-       return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only);
-}
-
-/**
  * Encode a heartbeat message.
  * @param row[out] Row to encode into.
  * @param replica_id Instance id.
@@ -405,6 +405,20 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync,
                         bool read_only);
 
 /**
+ * Encode a reply to an instance status request.
+ * @param out Buffer to write to.
+ * @param status Instance status to encode.
+ * @param sync Request sync.
+ * @param schema_version Actual schema version.
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+int
+iproto_reply_status(struct obuf *out, const struct status *status,
+                   uint64_t sync, uint32_t schema_version);
+
+/**
  * Write an error packet int output buffer. Doesn't throw if out
  * of memory
  */
@@ -585,6 +599,14 @@ xrow_encode_auth_xc(struct xrow_header *row, const char 
*salt, size_t salt_len,
                diag_raise();
 }
 
+/** @copydoc xrow_decode_status. */
+static inline void
+xrow_decode_status_xc(struct xrow_header *row, struct status *status)
+{
+       if (xrow_decode_status(row, status) != 0)
+               diag_raise();
+}
+
 /** @copydoc xrow_encode_subscribe. */
 static inline void
 xrow_encode_subscribe_xc(struct xrow_header *row,
@@ -642,15 +664,6 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct 
vclock *vclock)
                diag_raise();
 }
 
-/** @copydoc xrow_decode_request_vote. */
-static inline void
-xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock,
-                           bool *read_only)
-{
-       if (xrow_decode_request_vote(row, vclock, read_only) != 0)
-               diag_raise();
-}
-
 /** @copydoc iproto_reply_ok. */
 static inline void
 iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version)
@@ -670,6 +683,15 @@ iproto_reply_request_vote_xc(struct obuf *out, uint64_t 
sync,
                diag_raise();
 }
 
+/** @copydoc iproto_reply_status. */
+static inline void
+iproto_reply_status_xc(struct obuf *out, const struct status *status,
+                      uint64_t sync, uint32_t schema_version)
+{
+       if (iproto_reply_status(out, status, sync, schema_version) != 0)
+               diag_raise();
+}
+
 #endif
 
 #endif /* TARANTOOL_XROW_H_INCLUDED */
-- 
2.11.0


Other related posts: