[tarantool-patches] Re: [PATCH v2 1/1] sql: cleanup on failed creation operation

  • From: Imeev Mergen <imeevma@xxxxxxxxxxxxx>
  • To: Vladislav Shpilevoy <v.shpilevoy@xxxxxxxxxxxxx>, tarantool-patches@xxxxxxxxxxxxx
  • Date: Sat, 22 Sep 2018 12:21:59 +0300

Hello! Thank you for review!


On 09/20/2018 10:14 PM, Vladislav Shpilevoy wrote:

Hi! See 4 comments below, my review fixes at the end of
the email and on the branch.

commit 1f18fa692305f28891f0dbb87e74e863e0c002c3
Author: Mergen Imeev <imeevma@xxxxxxxxx>
Date:   Fri Aug 31 15:50:17 2018 +0300

     sql: cleanup on failed creation operation

     Some creation operations create objects even on fail. This is
     wrong and should be fixed. This patch adds destructors for such
     objects.

     Closes #3592

diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 60b49df..028a1c5 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -62,6 +110,22 @@ sql_finish_coding(struct Parse *parse_context)
      struct sqlite3 *db = parse_context->db;
      struct Vdbe *v = sqlite3GetVdbe(parse_context);
      sqlite3VdbeAddOp0(v, OP_Halt);
+    /**
+     * Destructors for all created in current statement
+     * spaces, indexes, sequences etc.
+     */
+    while (rlist_empty(&parse_context->record_list) == 0) {

1. Why not just rlist_foreach_entry/rlist_foreach_entry_reverse
instead of while + shift? You do not need to clean this list -
it is on a region anyway.
Fixed.

+        int record_reg = ++parse_context->nMem;
+        struct saved_record *record =
+ rlist_shift_entry(&parse_context->record_list,
+                      struct saved_record, link);
+        sqlite3VdbeAddOp3(v, OP_MakeRecord, record->reg_key,
+                  record->reg_key_count, record_reg);
+        sqlite3VdbeAddOp2(v, OP_SDelete, record->space_id, record_reg);
+        /* Set P2 of SInsert. */
+        sqlite3VdbeChangeP2(v, record->nOp, v->nOp);
+    }
+    sqlite3VdbeAddOp3(v, OP_Halt, SQL_TARANTOOL_ERROR, 0, 1);

2. The same as for the previous review: it is not okay to add a new
opcode to Vdbe always without calling attention to does it have
something to rollback or not. Please, add it only when you have
rollback section.
Fixed.

      if (db->mallocFailed || parse_context->nErr != 0) {
          if (parse_context->rc == SQLITE_OK)
              parse_context->rc = SQLITE_ERROR;
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 0efc4dd..c90cf1c 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1005,6 +1007,10 @@ case OP_Halt: {
          pOp = &aOp[pcx];
          break;
      }
+    if (pOp->p1 == SQL_TARANTOOL_ERROR && pOp->p3 != 0) {
+        rc = SQL_TARANTOOL_ERROR;
+        goto abort_due_to_error;
+    }

3. You do not need this check here on the hot path and p3
neither. See my review fixes.
Squashed.

      p->rc = pOp->p1;
      p->errorAction = (u8)pOp->p2;
      p->pc = pcx;

4. I tried this test:

    box.sql.execute('CREATE TABLE test1 (id INT PRIMARY KEY)')
    box.sql.execute('EXPLAIN CREATE TABLE test2 (id INT PRIMARY KEY REFERENCES test1, a INT UNIQUE, b INT UNIQUE)')

And this is what I got in result (shortened pseudo vdbe here and
full at the end of the letter):

    insert into _space or goto error1
    insert into _index or goto error2
    insert into _fk_constaints or goto error3
    return ok

::error4::
    drop fk
::error3::
    drop index
::error2::
    drop space
::error1::
    return error

As you can see, the last ::error4:: is never
used, so I guess you can omit generation of last
error processing. And if the saved_records list
has only one record, it is the same as empty one.

It is okay that error4 is not called and should
not exist because there is nothing after fkey
creation that can fail and lead to rollback of
the DDL request.
Fixed.

My review fixes:

===============================================

commit 92c396d9e6fe5397e0acd1f3c45675208c6a04f4
Author: Vladislav Shpilevoy <v.shpilevoy@xxxxxxxxxxxxx>
Date:   Thu Sep 20 21:09:48 2018 +0200

    Review fixes

diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 028a1c5b1..ebeeb1f20 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -70,8 +70,8 @@ struct saved_record
     int reg_key;
     /** Number of registers the key consists of. */
     int reg_key_count;
-    /** The number of SInsert operation. */
-    int nOp;
+    /** The number of the OP_SInsert operation. */
+    int insertion_opcode;
 };

 /**
@@ -81,11 +81,11 @@ struct saved_record
  * @param space_id Id of table in which record is inserted.
  * @param reg_key Register that contains first field of the key.
  * @param reg_key_count Exact number of fields of the key.
- * @param nOp_after The number of operation that follows SInsert.
+ * @param insertion_opcode Number of OP_SInsert opcode.
  */
 static inline void
 save_record(struct Parse *parser, uint32_t space_id, int reg_key,
-        int reg_key_count, int nOp_after)
+        int reg_key_count, int insertion_opcode)
 {
     struct saved_record *record =
         region_alloc(&fiber()->gc, sizeof(*record));
@@ -99,7 +99,7 @@ save_record(struct Parse *parser, uint32_t space_id, int reg_key,
     record->space_id = space_id;
     record->reg_key = reg_key;
     record->reg_key_count = reg_key_count;
-    record->nOp = nOp_after - 1;
+    record->insertion_opcode = insertion_opcode;
     rlist_add_entry(&parser->record_list, record, link);
 }

@@ -123,9 +123,9 @@ sql_finish_coding(struct Parse *parse_context)
                   record->reg_key_count, record_reg);
         sqlite3VdbeAddOp2(v, OP_SDelete, record->space_id, record_reg);
         /* Set P2 of SInsert. */
-        sqlite3VdbeChangeP2(v, record->nOp, v->nOp);
+        sqlite3VdbeChangeP2(v, record->insertion_opcode, v->nOp);
     }
-    sqlite3VdbeAddOp3(v, OP_Halt, SQL_TARANTOOL_ERROR, 0, 1);
+    sqlite3VdbeAddOp2(v, OP_Halt, SQL_TARANTOOL_ERROR, 0);
     if (db->mallocFailed || parse_context->nErr != 0) {
         if (parse_context->rc == SQLITE_OK)
             parse_context->rc = SQLITE_ERROR;
@@ -1172,7 +1172,7 @@ vdbe_emit_create_index(struct Parse *parse, struct space_def *def,
      */
     if (idx_def->opts.sql != NULL)
         sqlite3VdbeChangeP5(v, OPFLAG_NCHANGE);
-    save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp);
+    save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1);
     return;
 error:
     parse->rc = SQL_TARANTOOL_ERROR;
@@ -1232,7 +1232,7 @@ createSpace(Parse * pParse, int iSpaceId, char *zStmt)
     sqlite3VdbeAddOp3(v, OP_MakeRecord, iFirstCol, 7, iRecord);
     sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SPACE_ID, 0, iRecord);
     sqlite3VdbeChangeP5(v, OPFLAG_NCHANGE);
-    save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp);
+    save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1);
     return;
 error:
     pParse->nErr++;
@@ -1410,7 +1410,7 @@ vdbe_emit_fkey_create(struct Parse *parse_context, const struct fkey_def *fk)
               constr_tuple_reg + 9);
     sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
     save_record(parse_context, BOX_FK_CONSTRAINT_ID, constr_tuple_reg, 2,
-            vdbe->nOp);
+            vdbe->nOp - 1);
     sqlite3ReleaseTempRange(parse_context, constr_tuple_reg, 10);
     return;
 error:
@@ -1558,7 +1558,7 @@ sqlite3EndTable(Parse * pParse,    /* Parse context */
         sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SEQUENCE_ID, 0,
                   reg_seq_record);
         save_record(pParse, BOX_SEQUENCE_ID, reg_seq_record + 1, 1,
-                v->nOp);
+                v->nOp - 1);
         /* Do an insertion into _space_sequence. */
         int reg_space_seq_record =
             emitNewSysSpaceSequenceRecord(pParse, reg_space_id,
@@ -1566,7 +1566,7 @@ sqlite3EndTable(Parse * pParse,    /* Parse context */
         sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID, 0,
                   reg_space_seq_record);
         save_record(pParse, BOX_SPACE_SEQUENCE_ID,
-                reg_space_seq_record + 1, 1, v->nOp);
+                reg_space_seq_record + 1, 1, v->nOp - 1);
     }
     /* Code creation of FK constraints, if any. */
     struct fkey_parse *fk_parse;
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index c90cf1cf4..fc959bdaf 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -943,7 +943,7 @@ case OP_HaltIfNull: {      /* in3 */
     FALLTHROUGH;
 }

-/* Opcode:  Halt P1 P2 P3 P4 P5
+/* Opcode:  Halt P1 P2 * P4 P5
  *
  * Exit immediately.  All open cursors, etc are closed
  * automatically.
@@ -951,9 +951,7 @@ case OP_HaltIfNull: {      /* in3 */
  * P1 is the result code returned by sqlite3_exec(),
  * sqlite3_reset(), or sqlite3_finalize().  For a normal halt,
  * this should be SQLITE_OK (0).
- * For errors, it can be some other value.
- * If P1 == SQL_TARANTOOL_ERROR and P3 != 0 then just go to
- * abort_due_to_error. If P1!=0 and P3 == 0 then P2 will
+ * For errors, it can be some other value.  If P1!=0 then P2 will
  * determine whether or not to rollback the current transaction.
  * Do not rollback if P2==ON_CONFLICT_ACTION_FAIL. Do the rollback
  * if P2==ON_CONFLICT_ACTION_ROLLBACK.  If
@@ -1007,17 +1005,17 @@ case OP_Halt: {
         pOp = &aOp[pcx];
         break;
     }
-    if (pOp->p1 == SQL_TARANTOOL_ERROR && pOp->p3 != 0) {
-        rc = SQL_TARANTOOL_ERROR;
-        goto abort_due_to_error;
-    }
     p->rc = pOp->p1;
     p->errorAction = (u8)pOp->p2;
     p->pc = pcx;
     if (p->rc) {
         if (p->rc == SQL_TARANTOOL_ERROR) {
-            assert(pOp->p4.z != NULL);
-            box_error_set(__FILE__, __LINE__, pOp->p5, pOp->p4.z);
+            if (pOp->p4.z == NULL) {
+                assert(! diag_is_empty(diag_get()));
+            } else {
+                box_error_set(__FILE__, __LINE__, pOp->p5,
+                          pOp->p4.z);
+            }
         } else if (pOp->p5 != 0) {
             static const char * const azType[] = { "NOT NULL", "UNIQUE", "CHECK",
                                    "FOREIGN KEY" };

Full Vdbe from comment 4.

===============================================

---
- - [0, 'Init', 0, 1, 0, '', '00', 'Start at 1']
  - [1, 'IncMaxid', 1, 0, 0, '', '00', '']
  - [2, 'SCopy', 1, 2, 0, '', '00', 'r[2]=r[1]']
  - [3, 'Integer', 1, 3, 0, '', '00', 'r[3]=1']
  - [4, 'String8', 0, 4, 0, 'TEST2', '00', 'r[4]=''TEST2''']
  - [5, 'String8', 0, 5, 0, 'memtx', '00', 'r[5]=''memtx''']
  - [6, 'Integer', 3, 6, 0, '', '00', 'r[6]=3']
  - [7, 'Blob', 91, 7, 77, !!binary gaNzcWzZVENSRUFURSBUQUJMRSB0ZXN0MiAoaWQgSU5UIFBSSU1BUlkgS0VZIFJFRkVSRU5DRVMgdGVzdDEsIGEgSU5UIFVOSVFVRSwgYiBJTlQgVU5JUVVFKZOFpG5hbWWiSUSkdHlwZadpbnRlZ2VyqGFmZmluaXR5RKtpc19udWxsYWJsZcKvbnVsbGFibGVfYWN0aW9upWFib3J0haRuYW1loUGkdHlwZaZzY2FsYXKoYWZmaW5pdHlEq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZYWkbmFtZaFCpHR5cGWmc2NhbGFyqGFmZmluaXR5RKtpc19udWxsYWJsZcOvbnVsbGFibGVfYWN0aW9upG5vbmU=,
    '00', !!binary cls3XT2Bo3NxbNlUQ1JFQVRFIFRBQkxFIHRlc3QyIChpZCBJTlQgUFJJTUFSWSBLRVkgUkVGRVJFTkNFUyB0ZXN0MSwgYSBJTlQgVU5JUVVFLCBiIElOVCBVTklRVUUpk4WkbmFtZaJJRKR0eXBlp2ludGVnZXKoYWZmaW5pdHlEq2lzX251bGxhYmxlwq9udWxsYWJsZV9hY3Rpb26lYWJvcnSFpG5hbWWhQaR0eXBlpnNjYWxhcqhhZmZpbml0eUSraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lhaRuYW1loUKkdHlwZaZzY2FsYXKoYWZmaW5pdHlEq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZSAobGVuPTkxLCBzdWJ0eXBlPTc3KQ==]
  - [8, 'Blob', 196, 8, 77, !!binary k4WkbmFtZaJJRKR0eXBlp2ludGVnZXKoYWZmaW5pdHlEq2lzX251bGxhYmxlwq9udWxsYWJsZV9hY3Rpb26lYWJvcnSFpG5hbWWhQaR0eXBlpnNjYWxhcqhhZmZpbml0eUSraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lhaRuYW1loUKkdHlwZaZzY2FsYXKoYWZmaW5pdHlEq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZQ==,
    '00', !!binary cls4XT2ThaRuYW1loklEpHR5cGWnaW50ZWdlcqhhZmZpbml0eUSraXNfbnVsbGFibGXCr251bGxhYmxlX2FjdGlvbqVhYm9ydIWkbmFtZaFBpHR5cGWmc2NhbGFyqGFmZmluaXR5RKtpc19udWxsYWJsZcOvbnVsbGFibGVfYWN0aW9upG5vbmWFpG5hbWWhQqR0eXBlpnNjYWxhcqhhZmZpbml0eUSraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lIChsZW49MTk2LCBzdWJ0eXBlPTc3KQ==]
  - [9, 'MakeRecord', 2, 7, 9, '', '00', 'r[9]=mkrec(r[2..8])']
  - [10, 'SInsert', 280, 61, 9, '', '01', 'space id = 280, key = r[9], on error goto
      61']
  - [11, 'SCopy', 1, 10, 0, '', '00', 'r[10]=r[1]']
  - [12, 'Integer', 0, 11, 0, '', '00', 'r[11]=0']
  - [13, 'String8', 0, 12, 0, 'pk_unnamed_TEST2_1', '00', 'r[12]=''pk_unnamed_TEST2_1''']
  - [14, 'String8', 0, 13, 0, 'tree', '00', 'r[13]=''tree''']
  - [15, 'Blob', 14, 14, 77, !!binary gqZ1bmlxdWXDo3NxbKCRhaR0eXBlp2ludGVnZXKlZmllbGQ=,
    '00', !!binary clsxNF09gqZ1bmlxdWXDo3NxbKCRhaR0eXBlp2ludGVnZXKlZmllbGQgKGxlbj0xNCwgc3VidHlwZT03Nyk=]
  - [16, 'Blob', 72, 15, 77, !!binary kYWkdHlwZadpbnRlZ2VypWZpZWxk, '00', !!binary clsxNV09kYWkdHlwZadpbnRlZ2VypWZpZWxkIChsZW49NzIsIHN1YnR5cGU9Nzcp]
  - [17, 'MakeRecord', 10, 6, 16, '', '00', 'r[16]=mkrec(r[10..15])']
  - [18, 'SInsert', 288, 59, 16, '', '00', 'space id = 288, key = r[16], on error
      goto 59']
  - [19, 'SCopy', 1, 17, 0, '', '00', 'r[17]=r[1]']
  - [20, 'Integer', 1, 18, 0, '', '00', 'r[18]=1']
  - [21, 'String8', 0, 19, 0, 'unique_unnamed_TEST2_2', '00', 'r[19]=''unique_unnamed_TEST2_2''']
  - [22, 'String8', 0, 20, 0, 'tree', '00', 'r[20]=''tree''']
  - [23, 'Blob', 14, 21, 77, !!binary gqZ1bmlxdWXDo3NxbKCRhaR0eXBlpnNjYWxhcqVmaWVsZAGraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lqnNvcnRfb3JkZXKjYXNj,
    '00', !!binary clsyMV09gqZ1bmlxdWXDo3NxbKCRhaR0eXBlpnNjYWxhcqVmaWVsZAGraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lqnNvcnRfb3JkZXKjYXNjIChsZW49MTQsIHN1YnR5cGU9Nzcp]
  - [24, 'Blob', 70, 22, 77, !!binary kYWkdHlwZaZzY2FsYXKlZmllbGQBq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZapzb3J0X29yZGVyo2FzYw==,
    '00', !!binary clsyMl09kYWkdHlwZaZzY2FsYXKlZmllbGQBq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZapzb3J0X29yZGVyo2FzYyAobGVuPTcwLCBzdWJ0eXBlPTc3KQ==]
  - [25, 'MakeRecord', 17, 6, 23, '', '00', 'r[23]=mkrec(r[17..22])']
  - [26, 'SInsert', 288, 57, 23, '', '00', 'space id = 288, key = r[23], on error
      goto 57']
  - [27, 'SCopy', 1, 24, 0, '', '00', 'r[24]=r[1]']
  - [28, 'Integer', 2, 25, 0, '', '00', 'r[25]=2']
  - [29, 'String8', 0, 26, 0, 'unique_unnamed_TEST2_3', '00', 'r[26]=''unique_unnamed_TEST2_3''']
  - [30, 'String8', 0, 27, 0, 'tree', '00', 'r[27]=''tree''']
  - [31, 'Blob', 14, 28, 77, !!binary gqZ1bmlxdWXDo3NxbKCRhaR0eXBlpnNjYWxhcqVmaWVsZAKraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lqnNvcnRfb3JkZXKjYXNj,
    '00', !!binary clsyOF09gqZ1bmlxdWXDo3NxbKCRhaR0eXBlpnNjYWxhcqVmaWVsZAKraXNfbnVsbGFibGXDr251bGxhYmxlX2FjdGlvbqRub25lqnNvcnRfb3JkZXKjYXNjIChsZW49MTQsIHN1YnR5cGU9Nzcp]
  - [32, 'Blob', 70, 29, 77, !!binary kYWkdHlwZaZzY2FsYXKlZmllbGQCq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZapzb3J0X29yZGVyo2FzYw==,
    '00', !!binary clsyOV09kYWkdHlwZaZzY2FsYXKlZmllbGQCq2lzX251bGxhYmxlw69udWxsYWJsZV9hY3Rpb26kbm9uZapzb3J0X29yZGVyo2FzYyAobGVuPTcwLCBzdWJ0eXBlPTc3KQ==]
  - [33, 'MakeRecord', 24, 6, 30, '', '00', 'r[30]=mkrec(r[24..29])']
  - [34, 'SInsert', 288, 55, 30, '', '00', 'space id = 288, key = r[30], on error
      goto 55']
  - [35, 'String8', 0, 31, 0, 'FK_CONSTRAINT_1_TEST2', '00', 'r[31]=''FK_CONSTRAINT_1_TEST2''']
  - [36, 'SCopy', 1, 32, 0, '', '00', 'r[32]=r[1]']
  - [37, 'Integer', 512, 33, 0, '', '00', 'r[33]=512']
  - [38, 'OpenWrite', 0, 0, 0, 'space<name=_fk_constraint>', '20', 'index id = 0,
      space ptr = space<name=_fk_constraint>']
  - [39, 'NoConflict', 0, 42, 31, '2', '00', 'key=r[31..32]']
  - [40, 'Halt', 21, 0, 0, 'Constraint FK_CONSTRAINT_1_TEST2 already exists', 'aa',
    '']
  - [41, 'Close', 0, 0, 0, '', '00', '']
  - [42, 'Bool', 0, 34, 0, '0', '00', 'r[34]=0']
  - [43, 'String8', 0, 35, 0, 'simple', '00', 'r[35]=''simple''']
  - [44, 'String8', 0, 36, 0, 'no_action', '00', 'r[36]=''no_action''']
  - [45, 'String8', 0, 37, 0, 'no_action', '00', 'r[37]=''no_action''']
  - [46, 'Blob', 2, 38, 77, !!binary kQ==, '00', !!binary clszOF09kSAobGVuPTIsIHN1YnR5cGU9Nzcp]
  - [47, 'Blob', 2, 39, 77, !!binary kQ==, '00', !!binary clszOV09kSAobGVuPTIsIHN1YnR5cGU9Nzcp]
  - [48, 'MakeRecord', 31, 9, 40, '', '00', 'r[40]=mkrec(r[31..39])']
  - [49, 'SInsert', 356, 53, 40, '', '01', 'space id = 356, key = r[40], on error
      goto 53']
  - [50, 'Halt', 0, 0, 0, '', '00', '']
  - [51, 'MakeRecord', 31, 2, 41, '', '00', 'r[41]=mkrec(r[31..32])']
  - [52, 'SDelete', 356, 41, 0, '', '00', 'space id = 356, key = r[41]']
  - [53, 'MakeRecord', 24, 2, 42, '', '00', 'r[42]=mkrec(r[24..25])']
  - [54, 'SDelete', 288, 42, 0, '', '00', 'space id = 288, key = r[42]']
  - [55, 'MakeRecord', 17, 2, 43, '', '00', 'r[43]=mkrec(r[17..18])']
  - [56, 'SDelete', 288, 43, 0, '', '00', 'space id = 288, key = r[43]']
  - [57, 'MakeRecord', 10, 2, 44, '', '00', 'r[44]=mkrec(r[10..11])']
  - [58, 'SDelete', 288, 44, 0, '', '00', 'space id = 288, key = r[44]']
  - [59, 'MakeRecord', 2, 1, 45, '', '00', 'r[45]=mkrec(r[2])']
  - [60, 'SDelete', 280, 45, 0, '', '00', 'space id = 280, key = r[45]']
  - [61, 'Halt', 21, 0, 1, '', '00', '']


Patch with new fixes:

commit fe8415a79d401b741dcb565d34eb56495223f8b6
Author: Mergen Imeev <imeevma@xxxxxxxxx>
Date:   Fri Aug 31 15:50:17 2018 +0300

    sql: cleanup on failed creation operation

    Some creation operations create objects even on fail. This is
    wrong and should be fixed. This patch adds destructors for such
    objects.

    Closes #3592

diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 60b49df..2ac86ab 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -54,6 +54,54 @@
 #include "box/schema.h"
 #include "box/tuple_format.h"
 #include "box/coll_id_cache.h"
+#include "fiber.h"
+
+/**
+ * Structure that contains some information about record that was
+ * inserted into system space.
+ */
+struct saved_record
+{
+    /** A link in a record list. */
+    struct rlist link;
+    /** Id of space in which the record was inserted. */
+    uint32_t space_id;
+    /** First register of the key of the record. */
+    int reg_key;
+    /** Number of registers the key consists of. */
+    int reg_key_count;
+    /** The number of the OP_SInsert operation. */
+    int insertion_opcode;
+};
+
+/**
+ * Save inserted in system space record in list.
+ *
+ * @param parser SQL Parser object.
+ * @param space_id Id of table in which record is inserted.
+ * @param reg_key Register that contains first field of the key.
+ * @param reg_key_count Exact number of fields of the key.
+ * @param insertion_opcode Number of OP_SInsert opcode.
+ */
+static inline void
+save_record(struct Parse *parser, uint32_t space_id, int reg_key,
+        int reg_key_count, int insertion_opcode)
+{
+    struct saved_record *record =
+        region_alloc(&fiber()->gc, sizeof(*record));
+    if (record == NULL) {
+        diag_set(OutOfMemory, sizeof(*record), "region_alloc",
+             "record");
+        parser->rc = SQL_TARANTOOL_ERROR;
+        parser->nErr++;
+        return;
+    }
+    record->space_id = space_id;
+    record->reg_key = reg_key;
+    record->reg_key_count = reg_key_count;
+    record->insertion_opcode = insertion_opcode;
+    rlist_add_entry(&parser->record_list, record, link);
+}

 void
 sql_finish_coding(struct Parse *parse_context)
@@ -62,6 +110,29 @@ sql_finish_coding(struct Parse *parse_context)
     struct sqlite3 *db = parse_context->db;
     struct Vdbe *v = sqlite3GetVdbe(parse_context);
     sqlite3VdbeAddOp0(v, OP_Halt);
+    /*
+     * Destructors for all created in current statement
+     * spaces, indexes, sequences etc. There is no need to
+     * create destructor for last SInsert.
+     */
+    if (rlist_empty(&parse_context->record_list) == 0) {
+        struct saved_record *record =
+            rlist_shift_entry(&parse_context->record_list,
+                      struct saved_record, link);
+        /* Set P2 of SInsert. */
+        sqlite3VdbeChangeP2(v, record->insertion_opcode, v->nOp);
+        rlist_foreach_entry(record, &parse_context->record_list, link) {
+            int record_reg = ++parse_context->nMem;
+            sqlite3VdbeAddOp3(v, OP_MakeRecord, record->reg_key,
+                      record->reg_key_count, record_reg);
+            sqlite3VdbeAddOp2(v, OP_SDelete, record->space_id,
+                      record_reg);
+            /* Set P2 of SInsert. */
+            sqlite3VdbeChangeP2(v, record->insertion_opcode,
+                        v->nOp);
+        }
+        sqlite3VdbeAddOp2(v, OP_Halt, SQL_TARANTOOL_ERROR, 0);
+    }
     if (db->mallocFailed || parse_context->nErr != 0) {
         if (parse_context->rc == SQLITE_OK)
             parse_context->rc = SQLITE_ERROR;
@@ -1101,13 +1172,14 @@ vdbe_emit_create_index(struct Parse *parse, struct space_def *def,
     sqlite3VdbeAddOp4(v, OP_Blob, index_parts_sz, entry_reg + 5,
               SQL_SUBTYPE_MSGPACK, index_parts, P4_STATIC);
     sqlite3VdbeAddOp3(v, OP_MakeRecord, entry_reg, 6, tuple_reg);
-    sqlite3VdbeAddOp2(v, OP_SInsert, BOX_INDEX_ID, tuple_reg);
+    sqlite3VdbeAddOp3(v, OP_SInsert, BOX_INDEX_ID, 0, tuple_reg);
     /*
      * Non-NULL value means that index has been created via
      * separate CREATE INDEX statement.
      */
     if (idx_def->opts.sql != NULL)
         sqlite3VdbeChangeP5(v, OPFLAG_NCHANGE);
+    save_record(parse, BOX_INDEX_ID, entry_reg, 2, v->nOp - 1);
     return;
 error:
     parse->rc = SQL_TARANTOOL_ERROR;
@@ -1165,8 +1237,9 @@ createSpace(Parse * pParse, int iSpaceId, char *zStmt)
     sqlite3VdbeAddOp4(v, OP_Blob, table_stmt_sz, iFirstCol + 6,
               SQL_SUBTYPE_MSGPACK, table_stmt, P4_STATIC);
     sqlite3VdbeAddOp3(v, OP_MakeRecord, iFirstCol, 7, iRecord);
-    sqlite3VdbeAddOp2(v, OP_SInsert, BOX_SPACE_ID, iRecord);
+    sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SPACE_ID, 0, iRecord);
     sqlite3VdbeChangeP5(v, OPFLAG_NCHANGE);
+    save_record(pParse, BOX_SPACE_ID, iFirstCol, 1, v->nOp - 1);
     return;
 error:
     pParse->nErr++;
@@ -1340,9 +1413,11 @@ vdbe_emit_fkey_create(struct Parse *parse_context, const struct fkey_def *fk)
               parent_links, P4_DYNAMIC);
     sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, constr_tuple_reg, 9,
               constr_tuple_reg + 9);
-    sqlite3VdbeAddOp2(vdbe, OP_SInsert, BOX_FK_CONSTRAINT_ID,
+    sqlite3VdbeAddOp3(vdbe, OP_SInsert, BOX_FK_CONSTRAINT_ID, 0,
               constr_tuple_reg + 9);
     sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
+    save_record(parse_context, BOX_FK_CONSTRAINT_ID, constr_tuple_reg, 2,
+            vdbe->nOp - 1);
     sqlite3ReleaseTempRange(parse_context, constr_tuple_reg, 10);
     return;
 error:
@@ -1487,14 +1562,18 @@ sqlite3EndTable(Parse * pParse,    /* Parse context */
         int reg_seq_record =
             emitNewSysSequenceRecord(pParse, reg_seq_id,
                          p->def->name);
-        sqlite3VdbeAddOp2(v, OP_SInsert, BOX_SEQUENCE_ID,
+        sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SEQUENCE_ID, 0,
                   reg_seq_record);
+        save_record(pParse, BOX_SEQUENCE_ID, reg_seq_record + 1, 1,
+                v->nOp - 1);
         /* Do an insertion into _space_sequence. */
         int reg_space_seq_record =
             emitNewSysSpaceSequenceRecord(pParse, reg_space_id,
                               reg_seq_id);
-        sqlite3VdbeAddOp2(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID,
+        sqlite3VdbeAddOp3(v, OP_SInsert, BOX_SPACE_SEQUENCE_ID, 0,
                   reg_space_seq_record);
+        save_record(pParse, BOX_SPACE_SEQUENCE_ID,
+                reg_space_seq_record + 1, 1, v->nOp - 1);
     }
     /* Code creation of FK constraints, if any. */
     struct fkey_parse *fk_parse;
diff --git a/src/box/sql/prepare.c b/src/box/sql/prepare.c
index e98e845..a4b65eb 100644
--- a/src/box/sql/prepare.c
+++ b/src/box/sql/prepare.c
@@ -274,6 +274,7 @@ sql_parser_create(struct Parse *parser, sqlite3 *db)
     memset(parser, 0, sizeof(struct Parse));
     parser->db = db;
     rlist_create(&parser->new_fkey);
+    rlist_create(&parser->record_list);
     region_create(&parser->region, &cord()->slabc);
 }

diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index f56090d..c5becbd 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2764,6 +2764,11 @@ struct Parse {
      * Foreign key constraint appeared in CREATE TABLE stmt.
      */
     struct rlist new_fkey;
+    /**
+     * List of all records that were inserted in system spaces
+     * in current statement.
+     */
+    struct rlist record_list;
     bool initiateTTrans;    /* Initiate Tarantool transaction */
     /** True, if table to be created has AUTOINCREMENT PK. */
     bool is_new_table_autoinc;
diff --git a/src/box/sql/vdbe.c b/src/box/sql/vdbe.c
index 0efc4dd..fc959bd 100644
--- a/src/box/sql/vdbe.c
+++ b/src/box/sql/vdbe.c
@@ -1010,8 +1010,12 @@ case OP_Halt: {
     p->pc = pcx;
     if (p->rc) {
         if (p->rc == SQL_TARANTOOL_ERROR) {
-            assert(pOp->p4.z != NULL);
-            box_error_set(__FILE__, __LINE__, pOp->p5, pOp->p4.z);
+            if (pOp->p4.z == NULL) {
+                assert(! diag_is_empty(diag_get()));
+            } else {
+                box_error_set(__FILE__, __LINE__, pOp->p5,
+                          pOp->p4.z);
+            }
         } else if (pOp->p5 != 0) {
             static const char * const azType[] = { "NOT NULL", "UNIQUE", "CHECK",
                                    "FOREIGN KEY" };
@@ -4318,8 +4322,8 @@ case OP_IdxInsert: {        /* in2 */
     break;
 }

-/* Opcode: SInsert P1 P2 * * P5
- * Synopsis: space id = P1, key = r[P2]
+/* Opcode: SInsert P1 P2 P3 * P5
+ * Synopsis: space id = P1, key = r[P3], on error goto P2
  *
  * This opcode is used only during DDL routine.
  * In contrast to ordinary insertion, insertion to system spaces
@@ -4332,15 +4336,15 @@ case OP_IdxInsert: {        /* in2 */
  */
 case OP_SInsert: {
     assert(pOp->p1 > 0);
-    assert(pOp->p2 >= 0);
+    assert(pOp->p2 > 0);
+    assert(pOp->p3 >= 0);

-    pIn2 = &aMem[pOp->p2];
+    pIn3 = &aMem[pOp->p3];
     struct space *space = space_by_id(pOp->p1);
     assert(space != NULL);
     assert(space_is_system(space));
-    rc = tarantoolSqlite3Insert(space, pIn2->z, pIn2->z + pIn2->n);
-    if (rc)
-        goto abort_due_to_error;
+    if (tarantoolSqlite3Insert(space, pIn3->z, pIn3->z + pIn3->n) != 0)
+        goto jump_to_p2;
     if (pOp->p5 & OPFLAG_NCHANGE)
         p->nChange++;
     break;
diff --git a/test/sql/drop-table.result b/test/sql/drop-table.result
index 08f2496..636f3e0 100644
--- a/test/sql/drop-table.result
+++ b/test/sql/drop-table.result
@@ -33,3 +33,43 @@ box.sql.execute("INSERT INTO zzzoobar VALUES (111, 222, 'c3', 444)")
 -- DROP TABLE should do the job
 -- Debug
 -- require("console").start()
+--
+-- gh-3592: segmentation fault when table with error during
+-- creation is dropped.
+-- We should grant user enough rights to create space, but not
+-- enough to create index.
+--
+box.schema.user.create('tmp')
+---
+...
+box.schema.user.grant('tmp', 'create', 'universe')
+---
+...
+box.schema.user.grant('tmp', 'write', 'space', '_space')
+---
+...
+box.schema.user.grant('tmp', 'write', 'space', '_schema')
+---
+...
+box.session.su('tmp')
+---
+...
+--
+-- Error: user do not have rights to write in box.space._index.
+-- Space that was already created should be automatically dropped.
+--
+box.sql.execute('create table t1 (id int primary key, a int)')
+---
+- error: Write access to space '_index' is denied for user 'tmp'
+...
+-- Error: no such table.
+box.sql.execute('drop table t1')
+---
+- error: 'no such table: T1'
+...
+box.session.su('admin')
+---
+...
+box.schema.user.drop('tmp')
+---
+...
diff --git a/test/sql/drop-table.test.lua b/test/sql/drop-table.test.lua
index 9663074..ba47716 100644
--- a/test/sql/drop-table.test.lua
+++ b/test/sql/drop-table.test.lua
@@ -25,3 +25,24 @@ box.sql.execute("INSERT INTO zzzoobar VALUES (111, 222, 'c3', 444)")

 -- Debug
 -- require("console").start()
+
+--
+-- gh-3592: segmentation fault when table with error during
+-- creation is dropped.
+-- We should grant user enough rights to create space, but not
+-- enough to create index.
+--
+box.schema.user.create('tmp')
+box.schema.user.grant('tmp', 'create', 'universe')
+box.schema.user.grant('tmp', 'write', 'space', '_space')
+box.schema.user.grant('tmp', 'write', 'space', '_schema')
+box.session.su('tmp')
+--
+-- Error: user do not have rights to write in box.space._index.
+-- Space that was already created should be automatically dropped.
+--
+box.sql.execute('create table t1 (id int primary key, a int)')
+-- Error: no such table.
+box.sql.execute('drop table t1')
+box.session.su('admin')
+box.schema.user.drop('tmp')


Other related posts: