[tarantool-patches] [PATCH v9 5/7] iproto: create port_sql

  • From: imeevma@xxxxxxxxxxxxx
  • To: v.shpilevoy@xxxxxxxxxxxxx
  • Date: Fri, 22 Mar 2019 13:50:40 +0300

This patch creates port_sql implementation for the port. This will
allow us to dump sql responses to obuf or to Lua. Also this patch
defines methods dump_msgpack() and destroy() of port_sql.

Part of #3505
---
 src/box/execute.c | 263 +++++++++++++++++++++++++++++++++++-------------------
 src/box/execute.h |  49 +---------
 src/box/iproto.cc |  12 ++-
 src/box/port.h    |   1 -
 4 files changed, 182 insertions(+), 143 deletions(-)

diff --git a/src/box/execute.c b/src/box/execute.c
index 5810086..460eebc 100644
--- a/src/box/execute.c
+++ b/src/box/execute.c
@@ -83,6 +83,92 @@ struct sql_bind {
 };
 
 /**
+ * Port implementation that is used to store SQL responses and
+ * output them to obuf or Lua. This port implementation is
+ * inherited from the port_tuple structure. This allows us to use
+ * this structure in the port_tuple methods instead of port_tuple
+ * itself.
+ *
+ * The methods of port_tuple are called via explicit access to
+ * port_tuple_vtab just like C++ does with BaseClass::method, when
+ * it is called in a child method.
+ */
+struct port_sql {
+       /* port_tuple to inherit from. */
+       struct port_tuple port_tuple;
+       /* Prepared SQL statement. */
+       struct sql_stmt *stmt;
+};
+
+static_assert(sizeof(struct port_sql) <= sizeof(struct port),
+             "sizeof(struct port_sql) must be <= sizeof(struct port)");
+
+/**
+ * Dump data from port to buffer. Data in port contains tuples,
+ * metadata, or information obtained from an executed SQL query.
+ *
+ * Dumped msgpack structure:
+ * +----------------------------------------------+
+ * | IPROTO_BODY: {                               |
+ * |     IPROTO_METADATA: [                       |
+ * |         {IPROTO_FIELD_NAME: column name1},   |
+ * |         {IPROTO_FIELD_NAME: column name2},   |
+ * |         ...                                  |
+ * |     ],                                       |
+ * |                                              |
+ * |     IPROTO_DATA: [                           |
+ * |         tuple, tuple, tuple, ...             |
+ * |     ]                                        |
+ * | }                                            |
+ * +-------------------- OR ----------------------+
+ * | IPROTO_BODY: {                               |
+ * |     IPROTO_SQL_INFO: {                       |
+ * |         SQL_INFO_ROW_COUNT: number           |
+ * |         SQL_INFO_AUTOINCREMENT_IDS: [        |
+ * |             id, id, id, ...                  |
+ * |         ]                                    |
+ * |     }                                        |
+ * | }                                            |
+ * +-------------------- OR ----------------------+
+ * | IPROTO_BODY: {                               |
+ * |     IPROTO_SQL_INFO: {                       |
+ * |         SQL_INFO_ROW_COUNT: number           |
+ * |     }                                        |
+ * | }                                            |
+ * +----------------------------------------------+
+ * @param port Port that contains SQL response.
+ * @param[out] out Output buffer.
+ *
+ * @retval  0 Success.
+ * @retval -1 Memory error.
+ */
+static int
+port_sql_dump_msgpack(struct port *port, struct obuf *out);
+
+static void
+port_sql_destroy(struct port *base)
+{
+       port_tuple_vtab.destroy(base);
+       sql_finalize(((struct port_sql *)base)->stmt);
+}
+
+static const struct port_vtab port_sql_vtab = {
+       /* .dump_msgpack = */ port_sql_dump_msgpack,
+       /* .dump_msgpack_16 = */ NULL,
+       /* .dump_lua = */ NULL,
+       /* .dump_plain = */ NULL,
+       /* .destroy = */ port_sql_destroy,
+};
+
+static void
+port_sql_create(struct port *port, struct sql_stmt *stmt)
+{
+       port_tuple_create(port);
+       ((struct port_sql *)port)->stmt = stmt;
+       port->vtab = &port_sql_vtab;
+}
+
+/**
  * Return a string name of a parameter marker.
  * @param Bind to get name.
  * @retval Zero terminated name.
@@ -504,108 +590,36 @@ sql_get_description(struct sql_stmt *stmt, struct obuf 
*out,
        return 0;
 }
 
-static inline int
-sql_execute(sql *db, struct sql_stmt *stmt, struct port *port,
-           struct region *region)
-{
-       int rc, column_count = sql_column_count(stmt);
-       if (column_count > 0) {
-               /* Either ROW or DONE or ERROR. */
-               while ((rc = sql_step(stmt)) == SQL_ROW) {
-                       if (sql_row_to_port(stmt, column_count, region,
-                                           port) != 0)
-                               return -1;
-               }
-               assert(rc == SQL_DONE || rc != SQL_OK);
-       } else {
-               /* No rows. Either DONE or ERROR. */
-               rc = sql_step(stmt);
-               assert(rc != SQL_ROW && rc != SQL_OK);
-       }
-       if (rc != SQL_DONE) {
-               if (db->errCode != SQL_TARANTOOL_ERROR) {
-                       const char *err = (char *)sql_value_text(db->pErr);
-                       if (err == NULL)
-                               err = sqlErrStr(db->errCode);
-                       diag_set(ClientError, ER_VDBE_EXECUTE, err);
-               }
-               return -1;
-       }
-       return 0;
-}
-
-int
-sql_prepare_and_execute(const char *sql, int len, const struct sql_bind *bind,
-                       uint32_t bind_count, struct sql_response *response,
-                       struct region *region)
-{
-       struct sql_stmt *stmt;
-       struct sql *db = sql_get();
-       if (db == NULL) {
-               diag_set(ClientError, ER_LOADING);
-               return -1;
-       }
-       if (sql_prepare_v2(db, sql, len, &stmt, NULL) != SQL_OK) {
-               if (db->errCode != SQL_TARANTOOL_ERROR) {
-                       const char *err = (char *)sql_value_text(db->pErr);
-                       if (err == NULL)
-                               err = sqlErrStr(db->errCode);
-                       diag_set(ClientError, ER_VDBE_EXECUTE, err);
-               }
-               return -1;
-       }
-       assert(stmt != NULL);
-       port_tuple_create(&response->port);
-       response->prep_stmt = stmt;
-       if (sql_bind(stmt, bind, bind_count) == 0 &&
-           sql_execute(db, stmt, &response->port, region) == 0)
-               return 0;
-       port_destroy(&response->port);
-       sql_finalize(stmt);
-       return -1;
-}
-
-int
-sql_response_dump(struct sql_response *response, struct obuf *out)
+static int
+port_sql_dump_msgpack(struct port *port, struct obuf *out)
 {
+       assert(port->vtab == &port_sql_vtab);
        sql *db = sql_get();
-       struct sql_stmt *stmt = (struct sql_stmt *) response->prep_stmt;
-       struct port_tuple *port_tuple = (struct port_tuple *) &response->port;
-       int rc = 0, column_count = sql_column_count(stmt);
+       struct sql_stmt *stmt = ((struct port_sql *)port)->stmt;
+       int column_count = sql_column_count(stmt);
        if (column_count > 0) {
                int keys = 2;
                int size = mp_sizeof_map(keys);
                char *pos = (char *) obuf_alloc(out, size);
                if (pos == NULL) {
                        diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-                       goto err;
+                       return -1;
                }
                pos = mp_encode_map(pos, keys);
-               if (sql_get_description(stmt, out, column_count) != 0) {
-err:
-                       rc = -1;
-                       goto finish;
-               }
-               size = mp_sizeof_uint(IPROTO_DATA) +
-                      mp_sizeof_array(port_tuple->size);
+               if (sql_get_description(stmt, out, column_count) != 0)
+                       return -1;
+               size = mp_sizeof_uint(IPROTO_DATA);
                pos = (char *) obuf_alloc(out, size);
                if (pos == NULL) {
                        diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-                       goto err;
+                       return -1;
                }
                pos = mp_encode_uint(pos, IPROTO_DATA);
-               pos = mp_encode_array(pos, port_tuple->size);
-               /*
-                * Just like SELECT, SQL uses output format compatible
-                * with Tarantool 1.6
-                */
-               if (port_dump_msgpack_16(&response->port, out) < 0) {
-                       /* Failed port dump destroyes the port. */
-                       goto err;
-               }
+               if (port_tuple_vtab.dump_msgpack(port, out) < 0)
+                       return -1;
        } else {
                int keys = 1;
-               assert(port_tuple->size == 0);
+               assert(((struct port_tuple *)port)->size == 0);
                struct stailq *autoinc_id_list =
                        vdbe_autoinc_id_list((struct Vdbe *)stmt);
                uint32_t map_size = stailq_empty(autoinc_id_list) ? 1 : 2;
@@ -615,7 +629,7 @@ err:
                char *pos = (char *) obuf_alloc(out, size);
                if (pos == NULL) {
                        diag_set(OutOfMemory, size, "obuf_alloc", "pos");
-                       goto err;
+                       return -1;
                }
                pos = mp_encode_map(pos, keys);
                pos = mp_encode_uint(pos, IPROTO_SQL_INFO);
@@ -638,7 +652,7 @@ err:
                char *buf = obuf_alloc(out, size);
                if (buf == NULL) {
                        diag_set(OutOfMemory, size, "obuf_alloc", "buf");
-                       goto err;
+                       return -1;
                }
                buf = mp_encode_uint(buf, SQL_INFO_ROW_COUNT);
                buf = mp_encode_uint(buf, changes);
@@ -653,8 +667,75 @@ err:
                        }
                }
        }
-finish:
-       port_destroy(&response->port);
-       sql_finalize(stmt);
-       return rc;
+       return 0;
+}
+
+/**
+ * Execute prepared SQL statement.
+ *
+ * This function uses region to allocate memory for temporary
+ * objects. After this function, region will be in the same state
+ * in which it was before this function.
+ *
+ * @param db SQL handle.
+ * @param stmt Prepared statement.
+ * @param port Port to store SQL response.
+ * @param region Region to allocate temporary objects.
+ *
+ * @retval  0 Success.
+ * @retval -1 Error.
+ */
+static inline int
+sql_execute(sql *db, struct sql_stmt *stmt, struct port *port,
+           struct region *region)
+{
+       int rc, column_count = sql_column_count(stmt);
+       if (column_count > 0) {
+               /* Either ROW or DONE or ERROR. */
+               while ((rc = sql_step(stmt)) == SQL_ROW) {
+                       if (sql_row_to_port(stmt, column_count, region,
+                                           port) != 0)
+                               return -1;
+               }
+               assert(rc == SQL_DONE || rc != SQL_OK);
+       } else {
+               /* No rows. Either DONE or ERROR. */
+               rc = sql_step(stmt);
+               assert(rc != SQL_ROW && rc != SQL_OK);
+       }
+       if (rc != SQL_DONE) {
+               if (db->errCode != SQL_TARANTOOL_ERROR) {
+                       const char *err = (char *)sql_value_text(db->pErr);
+                       if (err == NULL)
+                               err = sqlErrStr(db->errCode);
+                       diag_set(ClientError, ER_VDBE_EXECUTE, err);
+               }
+               return -1;
+       }
+       return 0;
+}
+
+int
+sql_prepare_and_execute(const char *sql, int len, const struct sql_bind *bind,
+                       uint32_t bind_count, struct port *port,
+                       struct region *region)
+{
+       struct sql_stmt *stmt;
+       struct sql *db = sql_get();
+       if (sql_prepare_v2(db, sql, len, &stmt, NULL) != SQL_OK) {
+               if (db->errCode != SQL_TARANTOOL_ERROR) {
+                       const char *err = (char *)sql_value_text(db->pErr);
+                       if (err == NULL)
+                               err = sqlErrStr(db->errCode);
+                       diag_set(ClientError, ER_VDBE_EXECUTE, err);
+               }
+               return -1;
+       }
+       assert(stmt != NULL);
+       port_sql_create(port, stmt);
+       if (sql_bind(stmt, bind, bind_count) == 0 &&
+           sql_execute(db, stmt, port, region) == 0)
+               return 0;
+       port_destroy(port);
+       return -1;
 }
diff --git a/src/box/execute.h b/src/box/execute.h
index 12d893a..52563cd 100644
--- a/src/box/execute.h
+++ b/src/box/execute.h
@@ -48,18 +48,9 @@ enum sql_info_key {
 
 extern const char *sql_info_key_strs[];
 
-struct obuf;
 struct region;
 struct sql_bind;
 
-/** Response on EXECUTE request. */
-struct sql_response {
-       /** Port with response data if any. */
-       struct port port;
-       /** Prepared SQL statement with metadata. */
-       void *prep_stmt;
-};
-
 /**
  * Parse MessagePack array of SQL parameters.
  * @param data MessagePack array of parameters. Each parameter
@@ -76,48 +67,12 @@ int
 sql_bind_list_decode(const char *data, struct sql_bind **out_bind);
 
 /**
- * Dump a built response into @an out buffer. The response is
- * destroyed.
- * Response structure:
- * +----------------------------------------------+
- * | IPROTO_OK, sync, schema_version   ...        | iproto_header
- * +----------------------------------------------+---------------
- * | Body - a map with one or two keys.           |
- * |                                              |
- * | IPROTO_BODY: {                               |
- * |     IPROTO_METADATA: [                       |
- * |         {IPROTO_FIELD_NAME: column name1},   |
- * |         {IPROTO_FIELD_NAME: column name2},   | iproto_body
- * |         ...                                  |
- * |     ],                                       |
- * |                                              |
- * |     IPROTO_DATA: [                           |
- * |         tuple, tuple, tuple, ...             |
- * |     ]                                        |
- * | }                                            |
- * +-------------------- OR ----------------------+
- * | IPROTO_BODY: {                               |
- * |     IPROTO_SQL_INFO: {                       |
- * |         SQL_INFO_ROW_COUNT: number           |
- * |     }                                        |
- * | }                                            |
- * +----------------------------------------------+
- * @param response EXECUTE response.
- * @param out Output buffer.
- *
- * @retval  0 Success.
- * @retval -1 Memory error.
- */
-int
-sql_response_dump(struct sql_response *response, struct obuf *out);
-
-/**
  * Prepare and execute an SQL statement.
  * @param sql SQL statement.
  * @param len Length of @a sql.
  * @param bind Array of parameters.
  * @param bind_count Length of @a bind.
- * @param[out] response Response to store result.
+ * @param[out] port Port to store SQL response.
  * @param region Runtime allocator for temporary objects
  *        (columns, tuples ...).
  *
@@ -126,7 +81,7 @@ sql_response_dump(struct sql_response *response, struct obuf 
*out);
  */
 int
 sql_prepare_and_execute(const char *sql, int len, const struct sql_bind *bind,
-                       uint32_t bind_count, struct sql_response *response,
+                       uint32_t bind_count, struct port *port,
                        struct region *region);
 
 #if defined(__cplusplus)
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 3b0ba62..1e88bf0 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1621,7 +1621,7 @@ tx_process_sql(struct cmsg *m)
 {
        struct iproto_msg *msg = tx_accept_msg(m);
        struct obuf *out;
-       struct sql_response response;
+       struct port port;
        struct sql_bind *bind;
        int bind_count;
        const char *sql;
@@ -1638,7 +1638,7 @@ tx_process_sql(struct cmsg *m)
                goto error;
        sql = msg->sql.sql_text;
        sql = mp_decode_str(&sql, &len);
-       if (sql_prepare_and_execute(sql, len, bind, bind_count, &response,
+       if (sql_prepare_and_execute(sql, len, bind, bind_count, &port,
                                    &fiber()->gc) != 0)
                goto error;
        /*
@@ -1648,12 +1648,16 @@ tx_process_sql(struct cmsg *m)
        out = msg->connection->tx.p_obuf;
        struct obuf_svp header_svp;
        /* Prepare memory for the iproto header. */
-       if (iproto_prepare_header(out, &header_svp, IPROTO_HEADER_LEN) != 0)
+       if (iproto_prepare_header(out, &header_svp, IPROTO_HEADER_LEN) != 0) {
+               port_destroy(&port);
                goto error;
-       if (sql_response_dump(&response, out) != 0) {
+       }
+       if (port_dump_msgpack(&port, out) != 0) {
+               port_destroy(&port);
                obuf_rollback_to_svp(out, &header_svp);
                goto error;
        }
+       port_destroy(&port);
        iproto_reply_sql(out, &header_svp, msg->header.sync, schema_version);
        iproto_wpos_create(&msg->wpos, out);
        return;
diff --git a/src/box/port.h b/src/box/port.h
index ad1b349..f188036 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -65,7 +65,6 @@ extern const struct port_vtab port_tuple_vtab;
 static inline struct port_tuple *
 port_tuple(struct port *port)
 {
-       assert(port->vtab == &port_tuple_vtab);
        return (struct port_tuple *)port;
 }
 
-- 
2.7.4


Other related posts: