Thanks for the patch! See 32 comments below. Also see other
fixes on the branch in a separate commit.
1. Typos below: paren, refrencing.
On 13/07/2018 05:04, Nikita Pettik wrote:
After introducing separate space for persisting foreign key
constraints, nothing prevents us from adding ALTER TABLE statement to
add or drop named constraints. According to ANSI syntax is following:
ALTER TABLE <referencing table> ADD CONSTRAINT
<referential constraint name> FOREIGN KEY
<left parent> <referencing columns> <right paren> REFERENCES
<referenced table> [ <referenced columns> ] [ MATCH <match type> ]
[ <referential triggered action> ] [ <constraint check time> ]
ALTER TABLE <referencing table> DROP CONSTRAINT <constrain name>
In our terms it looks like:
ALTER TABLE t1 ADD CONSTRAINT f1 FOREIGN KEY(id, a)
REFERENCES t2 (id, b) MATCH FULL;
ALTER TABLE t1 DROP CONSTRAINT f1;
FK constraints which come with CREATE TABLE statement are also
persisted with auto-generated name. They are coded after space and its
indexes.
Moreover, we don't use original SQLite foreign keys anymore: those
obsolete structs have been removed alongside FK hash. Now FK constraints
are stored only in space.
Since types of refrencing and referenced fields must match, and now in
SQL only PK is allowed to feature INT (other fields are always SCALAR),
some tests have been corrected to obey this rule.
Part of #3271
---
extra/mkkeywordhash.c | 3 +
src/box/fkey.h | 6 +
src/box/sql.c | 16 +
src/box/sql/alter.c | 7 -
src/box/sql/build.c | 581 +++++++++++++++------
src/box/sql/callback.c | 2 -
src/box/sql/delete.c | 6 +-
src/box/sql/expr.c | 10 +-
src/box/sql/fkey.c | 974 +++++++++++------------------------
src/box/sql/insert.c | 19 +-
src/box/sql/main.c | 5 -
src/box/sql/parse.y | 37 +-
src/box/sql/pragma.c | 240 +--------
src/box/sql/pragma.h | 7 -
src/box/sql/prepare.c | 5 +
src/box/sql/sqliteInt.h | 167 +++---
src/box/sql/status.c | 5 +-
src/box/sql/tarantoolInt.h | 12 +
src/box/sql/update.c | 4 +-
src/box/sql/vdbe.c | 16 +-
test/sql-tap/alter.test.lua | 4 +-
test/sql-tap/alter2.test.lua | 196 +++++++
test/sql-tap/fkey1.test.lua | 51 +-
test/sql-tap/fkey2.test.lua | 131 ++---
test/sql-tap/fkey3.test.lua | 19 +-
test/sql-tap/fkey4.test.lua | 2 +-
test/sql-tap/orderby1.test.lua | 6 +-
test/sql-tap/table.test.lua | 18 +-
test/sql-tap/tkt-b1d3a2e531.test.lua | 6 +-
test/sql-tap/triggerC.test.lua | 2 +-
test/sql-tap/whereG.test.lua | 4 +-
test/sql-tap/with1.test.lua | 2 +-
32 files changed, 1226 insertions(+), 1337 deletions(-)
create mode 100755 test/sql-tap/alter2.test.lua
diff --git a/src/box/fkey.h b/src/box/fkey.h
index 1b6ea71d9..939773ef2 100644
--- a/src/box/fkey.h
+++ b/src/box/fkey.h
@@ -141,6 +141,12 @@ fkey_is_self_referenced(const struct fkey_def *fkey)
return fkey->child_id == fkey->parent_id;
}
+static inline bool
+space_fkey_check_references(const struct space *space)
+{
+ return space->parent_fkey != NULL;
+}
+
/**
* The second argument is a Trigger structure allocated by the
* fkActionTrigger() routine.This function deletes the Trigger
diff --git a/src/box/sql/alter.c b/src/box/sql/alter.c
index fe54e5531..e81113f58 100644
--- a/src/box/sql/alter.c
+++ b/src/box/sql/alter.c
@@ -189,12 +188,6 @@ sqlite3AlterFinishAddColumn(Parse * pParse, Token *
pColDef)
sqlite3ErrorMsg(pParse, "Cannot add a UNIQUE column");
return;
}
- if ((user_session->sql_flags & SQLITE_ForeignKeys) && pNew->pFKey
- && pDflt) {
- sqlite3ErrorMsg(pParse,
- "Cannot add a REFERENCES column with non-NULL
default value");
- return;
- }
assert(pNew->def->fields[pNew->def->field_count - 1].is_nullable ==
action_is_nullable(pNew->def->fields[
pNew->def->field_count - 1].nullable_action));
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 0c762fac9..c2d3cd035 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -373,9 +374,6 @@ deleteTable(sqlite3 * db, Table * pTable)
freeIndex(db, pIndex);
}
- /* Delete any foreign keys attached to this table. */
- sqlite3FkDelete(db, pTable);
-
/* Delete the Table structure itself.
*/
sqlite3HashClear(&pTable->idxHash);
@@ -1743,6 +1741,95 @@ emitNewSysSpaceSequenceRecord(Parse *pParse, int
space_id, const char reg_seq_id
return first_col;
}
+/**
+ * Generate opcodes to serialize foreign key into MgsPack and
+ * insert produced tuple into _fk_constraint space.
+ *
+ * @param parse_context Parsing context.
+ * @param fk Foreign key to be created.
+ */
+static void
+vdbe_fkey_code_creation(struct Parse *parse_context, const struct fkey_def *fk)
+{
+ assert(parse_context != NULL);
+ assert(fk != NULL);
+ struct Vdbe *vdbe = sqlite3GetVdbe(parse_context);
+ assert(vdbe != NULL);
+ /*
+ * Occupy registers for 8 fields: each member in
+ * _constraint space plus one for final msgpack tuple.
+ */
+ int constr_tuple_reg = sqlite3GetTempRange(parse_context, 9);
+ const char *name_copy = sqlite3DbStrDup(parse_context->db, fk->name);
+ if (name_copy == NULL)
+ return;
+ sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg, 0, name_copy,
+ P4_DYNAMIC);
+ /*
+ * In case we are adding FK constraints during execution
+ * of <CREATE TABLE ...> statement, we don't have child
+ * id, but we know register where it will be stored.
+ * */
+ if (parse_context->pNewTable != NULL) {
+ sqlite3VdbeAddOp2(vdbe, OP_SCopy, fk->child_id,
+ constr_tuple_reg + 1);
+ } else {
+ sqlite3VdbeAddOp2(vdbe, OP_Integer, fk->child_id,
+ constr_tuple_reg + 1);
+ }
+ if (parse_context->pNewTable != NULL && fkey_is_self_referenced(fk)) {
+ sqlite3VdbeAddOp2(vdbe, OP_SCopy, fk->parent_id,
+ constr_tuple_reg + 2);
+ } else {
+ sqlite3VdbeAddOp2(vdbe, OP_Integer, fk->parent_id,
+ constr_tuple_reg + 2);
+ }
+ sqlite3VdbeAddOp2(vdbe, OP_Bool, 0, constr_tuple_reg + 3);
+ sqlite3VdbeChangeP4(vdbe, -1, (char*)&fk->is_deferred, P4_BOOL);
+ sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 4, 0,
+ fkey_match_strs[fk->match], P4_STATIC);
+ sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 5, 0,
+ fkey_action_strs[fk->on_delete], P4_STATIC);
+ sqlite3VdbeAddOp4(vdbe, OP_String8, 0, constr_tuple_reg + 6, 0,
+ fkey_action_strs[fk->on_update], P4_STATIC);
+ size_t encoded_links_sz = fkey_encode_links(fk, NULL) + 1;
+ char *encoded_links = sqlite3DbMallocRaw(parse_context->db,
+ encoded_links_sz);
+ if (encoded_links == NULL) {
+ free((void *) name_copy);
+ return;
+ }
+ size_t real_links_sz = fkey_encode_links(fk, encoded_links);
+ encoded_links[real_links_sz] = '\0';
+ sqlite3VdbeAddOp4(vdbe, OP_Blob, real_links_sz, constr_tuple_reg + 7,
+ SQL_SUBTYPE_MSGPACK, encoded_links, P4_DYNAMIC);
+ sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, constr_tuple_reg, 8,
+ constr_tuple_reg + 8);
+ sqlite3VdbeAddOp2(vdbe, OP_SInsert, BOX_FK_CONSTRAINT_ID,
+ constr_tuple_reg + 8);
+ sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
+ sqlite3ReleaseTempRange(parse_context, constr_tuple_reg, 9);
+}
+
+static int
+resolve_link(struct Parse *parse_context, const struct space_def *def,
+ const char *field_name, uint32_t *link)
+{
+ assert(link != NULL);
+ uint32_t j;
+ for (j = 0; j < def->field_count; ++j) {
+ if (strcmp(field_name, def->fields[j].name) == 0) {
+ *link = j;
+ break;
+ }
+ }
+ if (j == def->field_count) {
+ sqlite3ErrorMsg(parse_context, "no such column %s", field_name);
+ return -1;
+ }
+ return 0;
+}
+
/*
* This routine is called to report the final ")" that terminates
* a CREATE TABLE statement.
@@ -1913,6 +2000,39 @@ sqlite3EndTable(Parse * pParse, /* Parse context */
/* Reparse everything to update our internal data structures */
parseTableSchemaRecord(pParse, iSpaceId, zStmt); /*
consumes zStmt */
+
+ /* Code creation of FK constraints, if any. */
+ struct fkey_parse *fk_parse = pParse->new_fkey;
+ while (fk_parse != NULL) {
+ struct fkey_def *fk = fk_parse->fkey;
+ if (fk_parse->selfref_cols != NULL) {
+ struct ExprList *cols = fk_parse->selfref_cols;
+ for (uint32_t i = 0; i < fk->field_count; ++i) {
+ if (resolve_link(pParse, p->def,
+ cols->a[i].zName,
+
&fk->links[i].parent_field) != 0)
+ return;
+ }
+ fk->parent_id = iSpaceId;
+ } else if (fk_parse->is_self_referenced) {
+ struct Index *pk = sqlite3PrimaryKeyIndex(p);
+ if (pk->nColumn != fk->field_count) {
+ sqlite3ErrorMsg(pParse,
+ "number of columns in
foreign key does "
+ "not match the number of
columns in "
+ "the referenced table");
+ return;
+ }
+ for (uint32_t i = 0; i < fk->field_count; ++i) {
+ fk->links[i].parent_field =
+ pk->aiColumn[i];
+ }
+ fk->parent_id = iSpaceId;
+ }
+ fk->child_id = iSpaceId;
+ vdbe_fkey_code_creation(pParse, fk);
+ fk_parse = fk_parse->next;
+ }
}
/* Add the table to the in-memory representation of the database.
@@ -2085,6 +2205,32 @@ sql_clear_stat_spaces(Parse *parse, const char
*table_name,
}
}
+/**
+ * Generate VDBE program to remove entry from _fk_constraint space.
+ *
+ * @param parse_context Parsing context.
+ * @param constraint_name Name of FK constraint to be dropped.
+ * Must be allocated on head by sqlite3DbMalloc().
+ * It will be freed in VDBE.
+ * @param child_id Id of table which constraint belongs to.
+ */
+static void
+vdbe_fkey_code_drop(struct Parse *parse_context, const char *constraint_name,
+ uint32_t child_id)
+{
+ struct Vdbe *vdbe = sqlite3GetVdbe(parse_context);
+ assert(vdbe != NULL);
+ int key_reg = sqlite3GetTempRange(parse_context, 3);
+ sqlite3VdbeAddOp4(vdbe, OP_String8, 0, key_reg, 0, constraint_name,
+ P4_DYNAMIC);
+ sqlite3VdbeAddOp2(vdbe, OP_Integer, child_id, key_reg + 1);
+ sqlite3VdbeAddOp3(vdbe, OP_MakeRecord, key_reg, 2, key_reg + 2);
+ sqlite3VdbeAddOp2(vdbe, OP_SDelete, BOX_FK_CONSTRAINT_ID, key_reg + 2);
+ sqlite3VdbeChangeP5(vdbe, OPFLAG_NCHANGE);
+ VdbeComment((vdbe, "Delete FK constraint %s", constraint_name));
+ sqlite3ReleaseTempRange(parse_context, key_reg, 3);
+}
+
/**
* Generate code to drop a table.
* This routine includes dropping triggers, sequences,
@@ -2270,176 +2429,276 @@ sql_drop_table(struct Parse *parse_context, struct
SrcList *table_name_list,
sqlite3SrcListDelete(db, table_name_list);
}
-/*
- * This routine is called to create a new foreign key on the table
- * currently under construction. pFromCol determines which columns
- * in the current table point to the foreign key. If pFromCol==0 then
- * connect the key to the last column inserted. pTo is the name of
- * the table referred to (a.k.a the "parent" table). pToCol is a list
- * of tables in the parent pTo table. flags contains all
- * information about the conflict resolution algorithms specified
- * in the ON DELETE, ON UPDATE and ON INSERT clauses.
+/**
+ * Return ordinal number of column by name. In case of error,
+ * set error message.
*
- * An FKey structure is created and added to the table currently
- * under construction in the pParse->pNewTable field.
+ * @param parse_context Parsing context.
+ * @param space Space which column belongs to.
+ * @param column_name Name of column to investigate.
+ * @param[out] colno Found name of column.
*
- * The foreign key is set for IMMEDIATE processing. A subsequent call
- * to sqlite3DeferForeignKey() might change this to DEFERRED.
+ * @retval 0 on success, -1 on fault.
*/
-void
-sqlite3CreateForeignKey(Parse * pParse, /* Parsing context */
- ExprList * pFromCol, /* Columns in this table that
point to other table */
- Token * pTo, /* Name of the other table */
- ExprList * pToCol, /* Columns in the other table */
- int flags /* Conflict resolution algorithms. */
- )
+static int
+columnno_by_name(struct Parse *parse_context, const struct space *space,
+ const char *column_name, uint32_t *colno)
{
- sqlite3 *db = pParse->db;
-#ifndef SQLITE_OMIT_FOREIGN_KEY
- FKey *pFKey = 0;
- FKey *pNextTo;
- Table *p = pParse->pNewTable;
- int nByte;
- int i;
- int nCol;
- char *z;
-
- assert(pTo != 0);
- char *normilized_name = strndup(pTo->z, pTo->n);
- if (normilized_name == NULL) {
- diag_set(OutOfMemory, pTo->n, "strndup", "normalized name");
- goto fk_end;
- }
- sqlite3NormalizeName(normilized_name);
- uint32_t parent_id = box_space_id_by_name(normilized_name,
- strlen(normilized_name));
- if (parent_id == BOX_ID_NIL &&
- strcmp(normilized_name, p->def->name) != 0) {
- sqlite3ErrorMsg(pParse, "foreign key constraint references "\
- "nonexistent table: %s", normilized_name);
- goto fk_end;
- }
- struct space *parent_space = space_by_id(parent_id);
- if (parent_space != NULL && parent_space->def->opts.is_view) {
- sqlite3ErrorMsg(pParse, "can't create foreign key constraint "\
- "referencing view: %s", normilized_name);
- goto fk_end;
+ assert(colno != NULL);
+ uint32_t column_len = strlen(column_name);
+ if (tuple_fieldno_by_name(space->def->dict, column_name, column_len,
+ field_name_hash(column_name, column_len),
+ colno) != 0) {
+ sqlite3ErrorMsg(parse_context,
+ "table \"%s\" doesn't feature column %s",
+ space->def->name, column_name);
+ return -1;
}
- if (p == 0)
- goto fk_end;
- if (pFromCol == 0) {
- int iCol = p->def->field_count - 1;
- if (NEVER(iCol < 0))
- goto fk_end;
- if (pToCol && pToCol->nExpr != 1) {
- sqlite3ErrorMsg(pParse, "foreign key on %s"
- " should reference only one column of table
%T",
- p->def->fields[iCol].name, pTo);
- goto fk_end;
- }
- nCol = 1;
- } else if (pToCol && pToCol->nExpr != pFromCol->nExpr) {
- sqlite3ErrorMsg(pParse,
- "number of columns in foreign key does not match the
number of "
- "columns in the referenced table");
- goto fk_end;
+ return 0;
+}
+
+void
+sql_create_foreign_key(struct Parse *parse_context, struct SrcList *child,
+ struct Token *constraint, struct ExprList *child_cols,
+ struct Token *parent, struct ExprList *parent_cols,
+ bool is_deferred, int actions)
+{
+ struct sqlite3 *db = parse_context->db;
+ /*
+ * When this function is called second time during
+ * <CREATE TABLE ...> statement (i.e. at VDBE runtime),
+ * don't even try to do something.
+ */
+ if (db->init.busy)
+ return;
+ /*
+ * Beforehand initialization for correct clean-up
+ * while emergency exiting in case of error.
+ */
+ const char *parent_name = NULL;
+ const char *constraint_name = NULL;
+ bool is_self_referenced = false;
+ /*
+ * Table under construction during CREATE TABLE
+ * processing. NULL for ALTER TABLE statement handling.
+ */
+ struct Table *new_tab = parse_context->pNewTable;
+ /* Whether we are processing ALTER TABLE or CREATE TABLE. */
+ bool is_alter = new_tab == NULL;
+ uint32_t child_cols_count;
+ if (child_cols == NULL) {
+ if (is_alter) {
+ sqlite3ErrorMsg(parse_context,
+ "referencing columns are not
specified");
+ goto exit_create_fk;
+ }
+ child_cols_count = 1;
} else {
- nCol = pFromCol->nExpr;
- }
- nByte = sizeof(*pFKey) + (nCol - 1) * sizeof(pFKey->aCol[0]) +
- strlen(normilized_name) + 1;
- if (pToCol) {
- for (i = 0; i < pToCol->nExpr; i++) {
- nByte += sqlite3Strlen30(pToCol->a[i].zName) + 1;
- }
- }
- pFKey = sqlite3DbMallocZero(db, nByte);
- if (pFKey == 0) {
- goto fk_end;
- }
- pFKey->pFrom = p;
- pFKey->pNextFrom = p->pFKey;
- z = (char *)&pFKey->aCol[nCol];
- pFKey->zTo = z;
- memcpy(z, normilized_name, strlen(normilized_name) + 1);
- z += strlen(normilized_name) + 1;
- pFKey->nCol = nCol;
- if (pFromCol == 0) {
- pFKey->aCol[0].iFrom = p->def->field_count - 1;
+ child_cols_count = child_cols->nExpr;
+ }
+ assert(!is_alter || (child != NULL && child->nSrc == 1));
+ struct space *child_space = NULL;
+ uint32_t child_id = 0;
+ if (is_alter) {
+ const char *child_name = child->a[0].zName;
+ child_id = box_space_id_by_name(child_name,
+ strlen(child_name));
+ if (child_id == BOX_ID_NIL) {
+ diag_set(ClientError, ER_NO_SUCH_SPACE, child_name);
+ goto tnt_error;
+ }
+ child_space = space_by_id(child_id);
+ assert(child_space != NULL);
} else {
- for (i = 0; i < nCol; i++) {
- int j;
- for (j = 0; j < (int)p->def->field_count; j++) {
- if (strcmp(p->def->fields[j].name,
- pFromCol->a[i].zName) == 0) {
- pFKey->aCol[i].iFrom = j;
- break;
- }
- }
- if (j >= (int)p->def->field_count) {
- sqlite3ErrorMsg(pParse,
- "unknown column \"%s\" in foreign
key definition",
- pFromCol->a[i].zName);
- goto fk_end;
- }
+ struct fkey_parse *fk = region_alloc(&parse_context->region,
+ sizeof(*fk));
+ if (fk == NULL) {
+ diag_set(OutOfMemory, sizeof(*fk), "region",
+ "struct fkey_parse");
+ parse_context->rc = SQL_TARANTOOL_ERROR;
+ parse_context->nErr++;
+ goto exit_create_fk;
}
- }
- if (pToCol) {
- for (i = 0; i < nCol; i++) {
- int n = sqlite3Strlen30(pToCol->a[i].zName);
- pFKey->aCol[i].zCol = z;
- memcpy(z, pToCol->a[i].zName, n);
- z[n] = 0;
- z += n + 1;
+ memset(fk, 0, sizeof(*fk));
+ struct fkey_parse *last_fk = parse_context->new_fkey;
+ parse_context->new_fkey = fk;
+ fk->next = last_fk;
+ }
+ assert(parent != NULL);
+ parent_name = sqlite3NameFromToken(db, parent);
+ if (parent_name == NULL)
+ goto exit_create_fk;
+ uint32_t parent_id = box_space_id_by_name(parent_name,
+ strlen(parent_name));
+ /*
+ * Within ALTER TABLE ADD CONSTRAINT FK also can be
+ * self-referenced, but in this case parent (which is
+ * also child) table will definitely exist.
+ */
+ is_self_referenced = is_alter ? false :
+ !strcmp(parent_name, new_tab->def->name);
+ if (parent_id == BOX_ID_NIL) {
+ if (is_self_referenced) {
+ parse_context->new_fkey->selfref_cols = parent_cols;
+ parse_context->new_fkey->is_self_referenced = true;
+ } else {
+ diag_set(ClientError, ER_NO_SUCH_SPACE, parent_name);;
+ goto tnt_error;
}
}
- pFKey->isDeferred = 0;
- pFKey->aAction[0] = (u8) (flags & 0xff); /* ON DELETE action */
- pFKey->aAction[1] = (u8) ((flags >> 8) & 0xff); /* ON UPDATE action
*/
-
- pNextTo = (FKey *) sqlite3HashInsert(&p->pSchema->fkeyHash,
- pFKey->zTo, (void *)pFKey);
- if (pNextTo == pFKey) {
- sqlite3OomFault(db);
- goto fk_end;
- }
- if (pNextTo) {
- assert(pNextTo->pPrevTo == 0);
- pFKey->pNextTo = pNextTo;
- pNextTo->pPrevTo = pFKey;
+ struct space *parent_space = space_by_id(parent_id);
+ if (parent_space != NULL && parent_space->def->opts.is_view) {
+ sqlite3ErrorMsg(parse_context,
+ "referenced table can't be view");
+ goto exit_create_fk;
+ }
+ if (parent_cols != NULL) {
+ if (parent_cols->nExpr != (int) child_cols_count) {
+ sqlite3ErrorMsg(parse_context,
+ "number of columns in foreign key does "
+ "not match the number of columns in "
+ "the referenced table");
+ goto exit_create_fk;
+ }
+ } else if (!is_self_referenced) {
+ /*
+ * If parent columns are not specified, then PK columns
+ * of parent table are used as referenced.
+ */
+ struct index *parent_pk = space_index(parent_space, 0);
+ assert(parent_pk != NULL);
+ if (parent_pk->def->key_def->part_count != child_cols_count) {
+ sqlite3ErrorMsg(parse_context,
+ "number of columns in foreign key does "
+ "not match the number of columns in "
+ "the referenced table");
+ goto exit_create_fk;
+ }
}
-
- /* Link the foreign key to the table as the last step.
+ if (constraint == NULL && !is_alter) {
+ if (parse_context->constraintName.n == 0) {
+ uint32_t fk_count = 0;
+ for (struct fkey_parse *fk = parse_context->new_fkey;
+ fk != NULL; fk = fk->next, fk_count++);
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index 3183e3dc7..ad19759e2 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -4835,12 +4835,12 @@ sqlite3ExprIfFalse(Parse * pParse, Expr * pExpr, int
dest, int jumpIfNull)
* Assert()s verify that the computation is correct.
*/
- op = ((pExpr->op + (TK_ISNULL & 1)) ^ 1) - (TK_ISNULL & 1);
+ if (pExpr->op >= TK_NE && pExpr->op <= TK_GE)
+ op = ((pExpr->op + (TK_NE & 1)) ^ 1) - (TK_NE & 1);
+ if (pExpr->op == TK_ISNULL || pExpr->op == TK_NOTNULL)
+ op = ((pExpr->op + (TK_ISNULL & 1)) ^ 1) - (TK_ISNULL & 1);
- /*
- * Verify correct alignment of TK_ and OP_ constants.
- * Tokens TK_ISNULL and TK_NE shoud have the same parity.
- */
+ /* Verify correct alignment of TK_ and OP_ constants. */
assert(pExpr->op != TK_NE || op == OP_Eq);
assert(pExpr->op != TK_EQ || op == OP_Ne);
assert(pExpr->op != TK_LT || op == OP_Ge);
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 016ded8d0..1eebf6b10 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -39,8 +39,9 @@
#include "box/schema.h"
#include "box/session.h"
#include "tarantoolInt.h"
+#include "vdbeInt.h"
-#ifndef SQLITE_OMIT_FOREIGN_KEY
+#ifndef SQLITE_OMIT_TRIGGER
@@ -366,150 +187,116 @@ sqlite3FkLocateIndex(Parse * pParse, /* Parse
context to store any error in */
*
* DELETE deferred Decrement the "deferred constraint counter".
*
- * These operations are identified in the comment at the top of this file
- * (fkey.c) as "I.1" and "D.1".
+ * These operations are identified in the comment at the top of
+ * this file (fkey.c) as "I.1" and "D.1".
+ *
+ * @param parse_context Current parsing context.
+ * @param parent Parent table of FK constraint.
+ * @param fk_def FK constraint definition.
+ * @param referenced_idx Id of referenced index.
+ * @param reg_data Address of array containing child table row.
+ * @param incr_count Increment constraint counter by this value.
+ * @param is_ignore If true, pretend parent contains all NULLs.
*/
static void
-fkLookupParent(Parse * pParse, /* Parse context */
- Table * pTab, /* Parent table of FK pFKey */
- Index * pIdx, /* Unique index on parent key columns in pTab */
- FKey * pFKey, /* Foreign key constraint */
- int *aiCol, /* Map from parent key columns to child table
columns */
- int regData, /* Address of array containing child table row
*/
- int nIncr, /* Increment constraint counter by this */
- int isIgnore /* If true, pretend pTab contains all NULL
values */
- )
+fkey_lookup_parent(struct Parse *parse_context, struct space *parent,
+ struct fkey_def *fk_def, uint32_t referenced_idx,
+ int reg_data, int incr_count, bool is_ignore)
{
- int i; /* Iterator variable */
- Vdbe *v = sqlite3GetVdbe(pParse); /* Vdbe to add code to */
- int iCur = pParse->nTab - 1; /* Cursor number to use */
- int iOk = sqlite3VdbeMakeLabel(v); /* jump here if parent key
found */
- struct session *user_session = current_session();
-
- /* If nIncr is less than zero, then check at runtime if there are any
- * outstanding constraints to resolve. If there are not, there is no
need
- * to check if deleting this row resolves any outstanding violations.
+ struct Vdbe *v = sqlite3GetVdbe(parse_context);
+ int cursor = parse_context->nTab - 1;
+ int ok_label = sqlite3VdbeMakeLabel(v);
+ /*
+ * If incr_count is less than zero, then check at runtime
+ * if there are any outstanding constraints to resolve.
+ * If there are not, there is no need to check if deleting
+ * this row resolves any outstanding violations.
*
- * Check if any of the key columns in the child table row are NULL. If
- * any are, then the constraint is considered satisfied. No need to
- * search for a matching row in the parent table.
+ * Check if any of the key columns in the child table row
+ * are NULL. If any are, then the constraint is considered
+ * satisfied. No need to search for a matching row in the
+ * parent table.
*/
- if (nIncr < 0) {
- sqlite3VdbeAddOp2(v, OP_FkIfZero, pFKey->isDeferred, iOk);
- VdbeCoverage(v);
- }
- for (i = 0; i < pFKey->nCol; i++) {
- int iReg = aiCol[i] + regData + 1;
- sqlite3VdbeAddOp2(v, OP_IsNull, iReg, iOk);
- VdbeCoverage(v);
- }
-
- if (isIgnore == 0) {
- if (pIdx == 0) {
- /* If pIdx is NULL, then the parent key is the INTEGER
PRIMARY KEY
- * column of the parent table (table pTab).
- */
- int regTemp = sqlite3GetTempReg(pParse);
-
- /* Invoke MustBeInt to coerce the child key value to an
integer (i.e.
- * apply the affinity of the parent key). If this
fails, then there
- * is no matching parent key. Before using MustBeInt,
make a copy of
- * the value. Otherwise, the value inserted into the
child key column
- * will have INTEGER affinity applied to it, which may
not be correct.
- */
- sqlite3VdbeAddOp2(v, OP_SCopy, aiCol[0] + 1 + regData,
- regTemp);
- VdbeCoverage(v);
-
- /* If the parent table is the same as the child table,
and we are about
- * to increment the constraint-counter (i.e. this is an
INSERT operation),
- * then check if the row being inserted matches itself.
If so, do not
- * increment the constraint-counter.
- */
- if (pTab == pFKey->pFrom && nIncr == 1) {
- sqlite3VdbeAddOp3(v, OP_Eq, regData, iOk,
- regTemp);
- VdbeCoverage(v);
- sqlite3VdbeChangeP5(v, SQLITE_NOTNULL);
- }
+ if (incr_count < 0)
+ sqlite3VdbeAddOp2(v, OP_FkIfZero, fk_def->is_deferred,
+ ok_label);
- } else {
- int nCol = pFKey->nCol;
- int regTemp = sqlite3GetTempRange(pParse, nCol);
- int regRec = sqlite3GetTempReg(pParse);
- struct space *space =
-
space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
- vdbe_emit_open_cursor(pParse, iCur, pIdx->tnum, space);
- for (i = 0; i < nCol; i++) {
- sqlite3VdbeAddOp2(v, OP_Copy,
- aiCol[i] + 1 + regData,
- regTemp + i);
- }
-
- /* If the parent table is the same as the child table,
and we are about
- * to increment the constraint-counter (i.e. this is an
INSERT operation),
- * then check if the row being inserted matches itself.
If so, do not
- * increment the constraint-counter.
- *
- * If any of the parent-key values are NULL, then the
row cannot match
- * itself. So set JUMPIFNULL to make sure we do the
OP_Found if any
- * of the parent-key values are NULL (at this point it
is known that
- * none of the child key values are).
- */
- if (pTab == pFKey->pFrom && nIncr == 1) {
- int iJump =
- sqlite3VdbeCurrentAddr(v) + nCol + 1;
- for (i = 0; i < nCol; i++) {
- int iChild = aiCol[i] + 1 + regData;
- int iParent =
- pIdx->aiColumn[i] + 1 + regData;
- assert(pIdx->aiColumn[i] >= 0);
- assert(aiCol[i] != pTab->iPKey);
- if (pIdx->aiColumn[i] == pTab->iPKey) {
- /* The parent key is a
composite key that includes the IPK column */
- iParent = regData;
- }
- sqlite3VdbeAddOp3(v, OP_Ne, iChild,
- iJump, iParent);
- VdbeCoverage(v);
- sqlite3VdbeChangeP5(v,
- SQLITE_JUMPIFNULL);
- }
- sqlite3VdbeGoto(v, iOk);
+ for (uint32_t i = 0; i < fk_def->field_count; i++) {
+ int iReg = fk_def->links[i].child_field + reg_data + 1;
+ sqlite3VdbeAddOp2(v, OP_IsNull, iReg, ok_label);
+ }
+ if (is_ignore == 0) {
+ uint32_t field_count = fk_def->field_count;
+ int temp_regs = sqlite3GetTempRange(parse_context, field_count);
+ int rec_reg = sqlite3GetTempReg(parse_context);
+ uint32_t id =
+
SQLITE_PAGENO_FROM_SPACEID_AND_INDEXID(fk_def->parent_id,
+ referenced_idx);
+ vdbe_emit_open_cursor(parse_context, cursor, id, parent);
+ for (uint32_t i = 0; i < field_count; ++i) {
+ sqlite3VdbeAddOp2(v, OP_Copy,
+ fk_def->links[i].child_field + 1 +
+ reg_data, temp_regs + i);
+ }
+ /*
+ * If the parent table is the same as the child
+ * table, and we are about to increment the
+ * constraint-counter (i.e. this is an INSERT operation),
+ * then check if the row being inserted matches itself.
+ * If so, do not increment the constraint-counter.
+ *
+ * If any of the parent-key values are NULL, then
+ * the row cannot match itself. So set JUMPIFNULL
+ * to make sure we do the OP_Found if any of the
+ * parent-key values are NULL (at this point it
+ * is known that none of the child key values are).
+ */
+ if (parent->def->id == fk_def->child_id && incr_count == 1) {
+ int jump = sqlite3VdbeCurrentAddr(v) + field_count + 1;
+ for (uint32_t i = 0; i < field_count; i++) {
+ int child_col = fk_def->links[i].child_field +
+ 1 + reg_data;
+ int parent_col = fk_def->links[i].parent_field +
+ 1 + reg_data;
+ sqlite3VdbeAddOp3(v, OP_Ne, child_col, jump,
+ parent_col);
+ sqlite3VdbeChangeP5(v, SQLITE_JUMPIFNULL);
}
-
- sqlite3VdbeAddOp4(v, OP_MakeRecord, regTemp, nCol,
- regRec,
- sqlite3IndexAffinityStr(pParse->db,
- pIdx), nCol);
- sqlite3VdbeAddOp4Int(v, OP_Found, iCur, iOk, regRec, 0);
- VdbeCoverage(v);
-
- sqlite3ReleaseTempReg(pParse, regRec);
- sqlite3ReleaseTempRange(pParse, regTemp, nCol);
+ sqlite3VdbeGoto(v, ok_label);
}
+ struct index *idx = space_index(parent, referenced_idx);
+ assert(idx != NULL);
+ sqlite3VdbeAddOp4(v, OP_MakeRecord, temp_regs, field_count,
+ rec_reg, sql_index_affinity_str(v->db,
+ idx->def),
+ P4_DYNAMIC);
+ sqlite3VdbeAddOp4Int(v, OP_Found, cursor, ok_label, rec_reg, 0);
+ sqlite3ReleaseTempReg(parse_context, rec_reg);
+ sqlite3ReleaseTempRange(parse_context, temp_regs, field_count);
}
-
- if (!pFKey->isDeferred && !(user_session->sql_flags & SQLITE_DeferFKs)
- && !pParse->pToplevel && !pParse->isMultiWrite) {
- /* Special case: If this is an INSERT statement that will
insert exactly
- * one row into the table, raise a constraint immediately
instead of
- * incrementing a counter. This is necessary as the VM code is
being
+ struct session *user_session = current_session();
+ if (!fk_def->is_deferred &&
+ !(user_session->sql_flags & SQLITE_DeferFKs) &&
@@ -844,59 +586,31 @@ sqlite3FkCheck(Parse * pParse, /* Parse context */
if ((user_session->sql_flags & SQLITE_ForeignKeys) == 0)
return;
- /* Loop through all the foreign key constraints for which pTab is the
- * child table (the table that the foreign key definition is part of).
+ /*
+ * Loop through all the foreign key constraints for which
+ * pTab is the child table.
*/
- for (pFKey = pTab->pFKey; pFKey; pFKey = pFKey->pNextFrom) {
- Table *pTo; /* Parent table of foreign key pFKey */
- Index *pIdx = 0; /* Index on key columns in pTo */
- int *aiFree = 0;
- int *aiCol;
- int iCol;
- int i;
+ struct space *space = space_by_id(pTab->def->id);
+ assert(space != NULL);
+ for (struct fkey *fk = space->child_fkey; fk != NULL;
+ fk = fk->fkey_child_next) {
+ struct fkey_def *fk_def = fk->def;
int bIgnore = 0;
-
- if (aChange
- && sqlite3_stricmp(pTab->def->name, pFKey->zTo) != 0
- && fkChildIsModified(pFKey, aChange) == 0) {
+ if (aChange != NULL && space->def->id != fk_def->parent_id &&
+ !fkey_child_is_modified(fk_def, aChange))
continue;
- }
-
@@ -977,100 +686,74 @@ sqlite3FkCheck(Parse * pParse, /* Parse context */
* might be set incorrectly if any OP_FkCounter
related scans are
* omitted.
*/
- if (!pFKey->isDeferred && eAction != OE_Cascade
- && eAction != OE_SetNull) {
+ if (!fk_def->is_deferred &&
+ action != FKEY_ACTION_CASCADE &&
+ action != FKEY_ACTION_SET_NULL) {
sqlite3MayAbort(pParse);
}
}
- pItem->zName = 0;
sqlite3SrcListDelete(db, pSrc);
}
- sqlite3DbFree(db, aiCol);
}
}
#define COLUMN_MASK(x) (((x)>31) ? 0xffffffff : ((u32)1<<(x)))
-/*
- * This function is called before generating code to update or delete a
- * row contained in table pTab.
- */
-u32
-sqlite3FkOldmask(Parse * pParse, /* Parse context */
- Table * pTab /* Table being modified */
- )
+uint32_t
+fkey_old_mask(uint32_t space_id)
{
- u32 mask = 0;
+ uint32_t mask = 0;
struct session *user_session = current_session();
-
if (user_session->sql_flags & SQLITE_ForeignKeys) {
- FKey *p;
- int i;
- for (p = pTab->pFKey; p; p = p->pNextFrom) {
- for (i = 0; i < p->nCol; i++)
- mask |= COLUMN_MASK(p->aCol[i].iFrom);
+ struct space *space = space_by_id(space_id);
+ for (struct fkey *fk = space->child_fkey; fk != NULL;
+ fk = fk->fkey_child_next) {
+ struct fkey_def *def = fk->def;
+ for (uint32_t i = 0; i < def->field_count; ++i)
+ mask |=COLUMN_MASK(def->links[i].child_field);
}
- for (p = sqlite3FkReferences(pTab); p; p = p->pNextTo) {
- Index *pIdx = 0;
- sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
- if (pIdx) {
- int nIdxCol = index_column_count(pIdx);
- for (i = 0; i < nIdxCol; i++) {
- assert(pIdx->aiColumn[i] >= 0);
- mask |= COLUMN_MASK(pIdx->aiColumn[i]);
- }
- }
+ for (struct fkey *fk = space->parent_fkey; fk != NULL;
+ fk = fk->fkey_parent_next) {
+ struct fkey_def *def = fk->def;
+ for (uint32_t i = 0; i < def->field_count; ++i)
+ mask |= COLUMN_MASK(def->links[i].parent_field);
}
}
return mask;
}
diff --git a/src/box/sql/main.c b/src/box/sql/main.c
index 00dc7a631..618cdc420 100644
--- a/src/box/sql/main.c
+++ b/src/box/sql/main.c
@@ -730,11 +730,6 @@ sqlite3RollbackAll(Vdbe * pVdbe, int tripCode)
{
sqlite3 *db = pVdbe->db;
(void)tripCode;
- struct session *user_session = current_session();
-
- /* DDL is impossible inside a transaction. */
- assert((user_session->sql_flags & SQLITE_InternChanges) == 0
- || db->init.busy == 1);
/* If one has been configured, invoke the rollback-hook callback */
if (db->xRollbackCallback && (!pVdbe->auto_commit)) {
diff --git a/src/box/sql/parse.y b/src/box/sql/parse.y
index b2940b7c4..1b84dbcaa 100644
--- a/src/box/sql/parse.y
+++ b/src/box/sql/parse.y
@@ -300,19 +301,23 @@ autoinc(X) ::= AUTOINCR. {X = 1;}
// check fails.
//
%type refargs {int}
-refargs(A) ::= . { A = ON_CONFLICT_ACTION_NONE*0x0101; /* EV:
R-19803-45884 */}
+refargs(A) ::= . { A = FKEY_NO_ACTION; }
refargs(A) ::= refargs(A) refarg(Y). { A = (A & ~Y.mask) | Y.value; }
%type refarg {struct {int value; int mask;}}
-refarg(A) ::= MATCH nm. { A.value = 0; A.mask = 0x000000; }
+refarg(A) ::= MATCH matcharg(X). { A.value = X<<16; A.mask = 0xff0000; }
refarg(A) ::= ON INSERT refact. { A.value = 0; A.mask = 0x000000; }
refarg(A) ::= ON DELETE refact(X). { A.value = X; A.mask = 0x0000ff; }
refarg(A) ::= ON UPDATE refact(X). { A.value = X<<8; A.mask = 0x00ff00; }
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index 5c5369aeb..2489b31b2 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h> @@ -4280,8 +4271,57 @@ sql_trigger_colmask(Parse
*parser, struct sql_trigger *trigger,
#define sqlite3IsToplevel(p) ((p)->pToplevel==0)
int sqlite3JoinType(Parse *, Token *, Token *, Token *);
-void sqlite3CreateForeignKey(Parse *, ExprList *, Token *, ExprList *, int);
-void sqlite3DeferForeignKey(Parse *, int);
+
+/**
+ * Change defer mode of last FK constraint processed during
+ * <CREATE TABLE> statement.
+ *
+ * @param parse_context Current parsing context.
+ * @param is_deferred Change defer mode to this value.
+ */
+void
+fkey_change_defer_mode(struct Parse *parse_context, bool is_deferred);
+
diff --git a/test/sql-tap/fkey1.test.lua b/test/sql-tap/fkey1.test.lua
index 494af4b4a..3c29b097d 100755
--- a/test/sql-tap/fkey1.test.lua
+++ b/test/sql-tap/fkey1.test.lua
@@ -17,10 +17,10 @@ test:do_execsql_test(
"fkey1-1.2",
[[
CREATE TABLE t1(
- a INTEGER PRIMARY KEY,
+ a PRIMARY KEY,
b INTEGER
REFERENCES t1 ON DELETE CASCADE
- REFERENCES t2,
+ REFERENCES t2 (x),
c TEXT,
FOREIGN KEY (b, c) REFERENCES t2(x, y) ON UPDATE CASCADE);
]], {
diff --git a/test/sql-tap/fkey4.test.lua b/test/sql-tap/fkey4.test.lua
index 9415b62cb..9810ce22f 100755
--- a/test/sql-tap/fkey4.test.lua
+++ b/test/sql-tap/fkey4.test.lua
@@ -186,7 +186,7 @@ test:do_execsql_test(
DROP TABLE IF EXISTS c1;
DROP TABLE IF EXISTS p1;
CREATE TABLE p1(a PRIMARY KEY, b);
- CREATE TABLE c1(x PRIMARY KEY REFERENCES p1 DEFERRABLE INITIALLY
DEFERRED);
+ CREATE TABLE c1(x PRIMARY KEY REFERENCES p1);
INSERT INTO p1 VALUES (1, 'one');
INSERT INTO p1 VALUES (2, 'two');
INSERT INTO c1 VALUES (1);
diff --git a/test/sql-tap/table.test.lua b/test/sql-tap/table.test.lua
index 6aa290742..24f494852 100755
--- a/test/sql-tap/table.test.lua
+++ b/test/sql-tap/table.test.lua
@@ -791,14 +791,16 @@ test:do_catchsql_test(
);
]], {
-- <table-10.7>
- 0
+ 1, "table \"T4\" doesn't feature column B"
-- </table-10.7>
})
test:do_catchsql_test(
"table-10.8",
[[
- DROP TABLE t6;
+ DROP TABLE IF EXISTS t6;
+ DROP TABLE IF EXISTS t4;
+ CREATE TABLE t4(x UNIQUE, y, PRIMARY KEY (x, y));
CREATE TABLE t6(a primary key,b,c,
FOREIGN KEY (b,c) REFERENCES t4(x,y) MATCH PARTIAL
ON UPDATE SET NULL ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED
@@ -861,7 +863,7 @@ test:do_test(
]]
end, {
-- <table-10.12>
- 1, [[unknown column "X" in foreign key definition]]
+ 1, [[no such column X]]
-- </table-10.12>
})