[tarantool-patches] Re: [PATCH v9] sql: add index_def to struct Index

  • From: Ivan Koptelov <ivan.koptelov@xxxxxxxxxxxxx>
  • To: Vladislav Shpilevoy <v.shpilevoy@xxxxxxxxxxxxx>, tarantool-patches@xxxxxxxxxxxxx
  • Date: Tue, 3 Jul 2018 14:37:32 +0300

In previous patch versions I didn't fix one test, which prints all error codes.
At failed, because I added new error code. Now the test is fixed.

Now every sqlite struct Index is created with tnt struct
index_def inside. This allows us to use tnt index_def
in work with sqlite indexes in the same manner as with
tnt index and is a step to remove sqlite Index with
tnt index.
Fields coll_array, coll_id_array, aiColumn, sort_order
and zName are removed from Index. All usages of this
fields changed to usage of corresponding index_def
fields.
index_is_unique(), sql_index_collation() and
index_column_count() are removed with calls of
index_def corresponding fields.

Closes: #3369

---
Branch:
https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where
Issue:https://github.com/tarantool/tarantool/issues/3369

  src/box/sql.c                        |  54 ++-
  src/box/sql/analyze.c                |  85 ++---
  src/box/sql/build.c                  | 713 +++++++++++++++++------------------
  src/box/sql/delete.c                 |  10 +-
  src/box/sql/expr.c                   |  61 +--
  src/box/sql/fkey.c                   | 132 +++----
  src/box/sql/insert.c                 | 145 ++++---
  src/box/sql/pragma.c                 |  30 +-
  src/box/sql/select.c                 |   2 +-
  src/box/sql/sqliteInt.h              | 111 ++----
  src/box/sql/update.c                 |  39 +-
  src/box/sql/vdbeaux.c                |   2 +-
  src/box/sql/vdbemem.c                |  21 +-
  src/box/sql/where.c                  | 180 ++++-----
  src/box/sql/wherecode.c              | 102 ++---
  test/sql-tap/colname.test.lua        |   4 +-
  test/sql/message-func-indexes.result |   8 +-
  17 files changed, 821 insertions(+), 878 deletions(-)

diff --git a/src/box/sql.c b/src/box/sql.c
index 11353150e..24e37652e 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1452,8 +1452,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void *buf)

      /* If table's PK is single column which is INTEGER, then
       * treat it as strict type, not affinity.  */
-    if (pk_idx && pk_idx->nColumn == 1) {
-        int pk = pk_idx->aiColumn[0];
+    if (pk_idx != NULL && pk_idx->def->key_def->part_count == 1) {
+        int pk = pk_idx->def->key_def->parts[0].fieldno;

1. You again sent the patch with spaces instead of tabs. Please, cope
with it. Looks like you copied 'git diff/show' output. Either use format-patch
or use 'git --no-pager diff/show'.

          if (def->fields[pk].type == FIELD_TYPE_INTEGER)
              pk_forced_int = pk;
      }
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 0da7d805b..662fc698e 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c> @@ -2646,18 +2535,154 @@ addIndexToTable(Index * pIndex, Table * pTab)> + struct Expr *column_expr = sqlite3ExprSkipCollate(expr);
+        if (column_expr->op != TK_COLUMN) {
+            sqlite3ErrorMsg(parse, tnt_errcode_desc(ER_UNSUPPORTED),
+                        "Tarantool", "functional indexes");

2. Patch to allow SQL_TARANTOOL_ERROR has been pushed today, so you can use
here diag_set again.

+            goto tnt_error;
+        }
+
@@ -2805,108 +2828,92 @@ sql_create_index(struct Parse *parse, struct Token *token,

3. Crash:

box.cfg{}
box.sql.execute('CREATE TABLE test (a int, b int, c int, PRIMARY KEY (a, a COLLATE kek, b, c))')
Process 15886 stopped
* thread #1, queue = 'com.apple.main-thread', stop reason = EXC_BAD_ACCESS (code=1, address=0x80)
    frame #0: 0x000000010035f77b tarantool`sql_create_index(parse=0x000000010481f8b0, token=0x0000000000000000, tbl_name=0x0000000000000000, col_list=0x00000001029039d8, on_error=ON_CONFLICT_ACTION_DEFAULT, start=0x0000000000000000, where=0x0000000000000000, sort_order=SORT_ORDER_ASC, if_not_exist=false, idx_type='\x02') at build.c:2895
   2892         * PRIMARY KEY contains no repeated columns.
   2893         */
   2894        if (IsPrimaryKeyIndex(index)) {
-> 2895            struct key_part *parts = index->def->key_def->parts;
   2896            uint32_t part_count = index->def->key_def->part_count;
   2897            uint32_t new_part_count = 1;
   2898
Target 0: (tarantool) stopped.

@@ -3070,54 +3080,17 @@ sql_create_index(struct Parse *parse, struct Token *token,
-/**
- * Return number of columns in given index.
- * If space is ephemeral, use internal
- * SQL structure to fetch the value.
- */
-uint32_t
-index_column_count(const Index *idx)
-{
-    assert(idx != NULL);
-    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-    struct space *space = space_by_id(space_id);
-    /* It is impossible to find an ephemeral space by id. */
-    if (space == NULL)
-        return idx->nColumn;
-
-    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-    struct index *index = space_index(space, index_id);
-    assert(index != NULL);
-    return index->def->key_def->part_count;
-}
-
-/** Return true if given index is unique and not nullable. */
-bool
-index_is_unique_not_null(const Index *idx)

4. Same as on the previous review. Still is used in a pair of places.

Are you sure? I searched through the whole project and didn't find it.
There is only a variable with the same name in one place
-{
-    assert(idx != NULL);
-    uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-    struct space *space = space_by_id(space_id);
-    assert(space != NULL);
-
-    uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-    struct index *index = space_index(space, index_id);
-    assert(index != NULL);
-    return (index->def->opts.is_unique &&
-        !index->def->key_def->is_nullable);
+    sqlite3DbFree(db, name);
  }
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index c0c26ce29..225fddc23 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -2913,11 +2898,32 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,    /* WHERE clause information */
           */
          Index *pFirst;    /* First of real indices on the table */
          memset(&sPk, 0, sizeof(Index));
-        sPk.nColumn = 1;
-        sPk.aiColumn = &aiColumnPk;
          sPk.aiRowLogEst = aiRowEstPk;
          sPk.onError = ON_CONFLICT_ACTION_REPLACE;
          sPk.pTable = pTab;
+
+        struct key_def *key_def = key_def_new(1);
+        if (key_def == NULL) {
+            pWInfo->pParse->nErr++;
+            pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+            return SQL_TARANTOOL_ERROR;
+        }
+
+        key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
+                 ON_CONFLICT_ACTION_ABORT,
+                 NULL, COLL_NONE, SORT_ORDER_ASC);
+
+        sPk.def = index_def_new(pTab->def->id, 0, "primary",
+                    sizeof("primary") - 1, TREE,
+                    &index_opts_default, key_def, NULL);

11. Where is sPk.def is deleted?
It is deleted in freeIndex() with sPk

5. I do not see any mention of freeIndex() in where.c. Where is it deleted?

sPk is declared on stack. If it was deleted with freeIndex, Tarantool
would crash.

+        key_def_delete(key_def);
+
+        if (sPk.def == NULL) {
+            pWInfo->pParse->nErr++;
+            pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+            return SQL_TARANTOOL_ERROR;
+        }
+
          aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
          aiRowEstPk[1] = 0;
          pFirst = pSrc->pTab->pIndex;

6. Where is the test, that I gave you on the previous review and that
lead to crash? Please, add it to the test suite. And the new test in
this review too.


Now every sqlite struct Index is created with tnt struct
index_def inside. This allows us to use tnt index_def
in work with sqlite indexes in the same manner as with
tnt index and is a step to remove sqlite Index with
tnt index.
Fields coll_array, coll_id_array, aiColumn, sort_order
and zName are removed from Index. All usages of this
fields changed to usage of corresponding index_def
fields.
index_is_unique(), sql_index_collation() and
index_column_count() are removed with calls of
index_def corresponding fields.

Closes: #3369
---
Branch:
https://github.com/tarantool/tarantool/tree/sb/gh-3369-use-index-def-in-select-and-where
Issue: https://github.com/tarantool/tarantool/issues/3369

 src/box/errcode.h                                  |   1 +
 src/box/sql.c                                      |  54 +-
 src/box/sql/analyze.c                              |  85 ++-
 src/box/sql/build.c                                | 727 ++++++++++-----------
 src/box/sql/delete.c                               |  10 +-
 src/box/sql/expr.c                                 |  61 +-
 src/box/sql/fkey.c                                 | 213 +++---
 src/box/sql/insert.c                               | 145 ++--
 src/box/sql/pragma.c                               |  30 +-
 src/box/sql/select.c                               |   2 +-
 src/box/sql/sqliteInt.h                            | 116 ++--
 src/box/sql/update.c                               |  39 +-
 src/box/sql/vdbeaux.c                              |   2 +-
 src/box/sql/vdbemem.c                              |  21 +-
 src/box/sql/where.c                                | 182 +++---
 src/box/sql/wherecode.c                            | 102 +--
 test/box/misc.result                               |   1 +
 .../{collation.test.lua => collation1.test.lua}    |   0
 test/sql-tap/collation2.test.lua                   |  20 +
 test/sql-tap/colname.test.lua                      |   4 +-
 test/sql-tap/identifier_case.test.lua              |   4 +-
 test/sql/message-func-indexes.result               |   8 +-
 22 files changed, 900 insertions(+), 927 deletions(-)
 rename test/sql-tap/{collation.test.lua => collation1.test.lua} (100%)
 create mode 100755 test/sql-tap/collation2.test.lua

diff --git a/src/box/errcode.h b/src/box/errcode.h
index c76018cbf..2229c5cbd 100644
--- a/src/box/errcode.h
+++ b/src/box/errcode.h
@@ -215,6 +215,7 @@ struct errcode_record {
        /*160 */_(ER_ACTION_MISMATCH,           "Field %d contains %s on conflict 
action, but %s in index parts") \
        /*161 */_(ER_VIEW_MISSING_SQL,          "Space declared as a view must have 
SQL statement") \
        /*162 */_(ER_FOREIGN_KEY_CONSTRAINT,    "Can not commit transaction: 
deferred foreign keys violations are not resolved") \
+       /*163 */_(ER_NO_SUCH_COLLATION,         "Collation '%s' does not 
exist") \
/*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/src/box/sql.c b/src/box/sql.c
index 03b4f156a..b00e8655d 100644
--- a/src/box/sql.c
+++ b/src/box/sql.c
@@ -1418,8 +1418,8 @@ int tarantoolSqlite3MakeTableFormat(Table *pTable, void 
*buf)
/* If table's PK is single column which is INTEGER, then
         * treat it as strict type, not affinity.  */
-       if (pk_idx && pk_idx->nColumn == 1) {
-               int pk = pk_idx->aiColumn[0];
+       if (pk_idx != NULL && pk_idx->def->key_def->part_count == 1) {
+               int pk = pk_idx->def->key_def->parts[0].fieldno;
                if (def->fields[pk].type == FIELD_TYPE_INTEGER)
                        pk_forced_int = pk;
        }
@@ -1530,20 +1530,19 @@ tarantoolSqlite3MakeTableOpts(Table *pTable, const char 
*zSql, char *buf)
  */
 int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
 {
-       struct space_def *def = pIndex->pTable->def;
-       assert(def != NULL);
+       struct field_def *fields = pIndex->pTable->def->fields;
+       struct key_def *key_def = pIndex->def->key_def;
        const struct Enc *enc = get_enc(buf);
-       struct SqliteIndex *primary_index;
-       char *base = buf, *p;
-       int pk_forced_int = -1;
-
-       primary_index = sqlite3PrimaryKeyIndex(pIndex->pTable);
+       char *base = buf;
+       uint32_t pk_forced_int = UINT32_MAX;
+       struct SqliteIndex *primary_index =
+               sqlite3PrimaryKeyIndex(pIndex->pTable);
/* If table's PK is single column which is INTEGER, then
         * treat it as strict type, not affinity.  */
-       if (primary_index->nColumn == 1) {
-               int pk = primary_index->aiColumn[0];
-               if (def->fields[pk].type == FIELD_TYPE_INTEGER)
+       if (primary_index->def->key_def->part_count == 1) {
+               int pk = primary_index->def->key_def->parts[0].fieldno;
+               if (fields[pk].type == FIELD_TYPE_INTEGER)
                        pk_forced_int = pk;
        }
@@ -1553,46 +1552,45 @@ int tarantoolSqlite3MakeIdxParts(SqliteIndex *pIndex, void *buf)
         * primary key columns. Query planner depends on this particular
         * data layout.
         */
-       int i, n = pIndex->nColumn;
-
-       p = enc->encode_array(base, n);
-       for (i = 0; i < n; i++) {
-               int col = pIndex->aiColumn[i];
-               assert(def->fields[col].is_nullable ==
-                      action_is_nullable(def->fields[col].nullable_action));
+       struct key_part *part = key_def->parts;
+       char *p = enc->encode_array(base, key_def->part_count);
+       for (uint32_t i = 0; i < key_def->part_count; ++i, ++part) {
+               uint32_t col = part->fieldno;
+               assert(fields[col].is_nullable ==
+                      action_is_nullable(fields[col].nullable_action));
                const char *t;
                if (pk_forced_int == col) {
                        t = "integer";
                } else {
-                       enum affinity_type affinity = def->fields[col].affinity;
-                       t = convertSqliteAffinity(affinity,
-                                                 def->fields[col].is_nullable);
+                       t = convertSqliteAffinity(fields[col].affinity,
+                                                 fields[col].is_nullable);
                }
                /* do not decode default collation */
-               uint32_t cid = pIndex->coll_id_array[i];
+               uint32_t cid = part->coll_id;
                p = enc->encode_map(p, cid == COLL_NONE ? 5 : 6);
                p = enc->encode_str(p, "type", sizeof("type")-1);
                p = enc->encode_str(p, t, strlen(t));
                p = enc->encode_str(p, "field", sizeof("field")-1);
                p = enc->encode_uint(p, col);
                if (cid != COLL_NONE) {
-                       p = enc->encode_str(p, "collation", 
sizeof("collation")-1);
+                       p = enc->encode_str(p, "collation",
+                                           sizeof("collation") - 1);
                        p = enc->encode_uint(p, cid);
                }
                p = enc->encode_str(p, "is_nullable", 11);
-               p = enc->encode_bool(p, def->fields[col].is_nullable);
+               p = enc->encode_bool(p, fields[col].is_nullable);
                p = enc->encode_str(p, "nullable_action", 15);
                const char *action_str =
-                       
on_conflict_action_strs[def->fields[col].nullable_action];
+                       on_conflict_action_strs[fields[col].nullable_action];
                p = enc->encode_str(p, action_str, strlen(action_str));
p = enc->encode_str(p, "sort_order", 10);
-               enum sort_order sort_order = pIndex->sort_order[i];
+               enum sort_order sort_order = part->sort_order;
                assert(sort_order < sort_order_MAX);
                const char *sort_order_str = sort_order_strs[sort_order];
                p = enc->encode_str(p, sort_order_str, strlen(sort_order_str));
        }
-       return (int)(p - base);
+       return p - base;
 }
/*
diff --git a/src/box/sql/analyze.c b/src/box/sql/analyze.c
index 5f73f026e..c5afff214 100644
--- a/src/box/sql/analyze.c
+++ b/src/box/sql/analyze.c
@@ -848,8 +848,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
        for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
                int addrRewind; /* Address of "OP_Rewind iIdxCur" */
                int addrNextRow;        /* Address of "next_row:" */
-               const char *zIdxName;   /* Name of the index */
-               int nColTest;   /* Number of columns to test for changes */
+               const char *idx_name;   /* Name of the index */
if (pOnlyIdx && pOnlyIdx != pIdx)
                        continue;
@@ -857,17 +856,16 @@ analyzeOneTable(Parse * pParse,   /* Parser context */
                 * names. Thus, for the sake of clarity, use
                 * instead more familiar table name.
                 */
-               if (IsPrimaryKeyIndex(pIdx)) {
-                       zIdxName = pTab->def->name;
-               } else {
-                       zIdxName = pIdx->zName;
-               }
-               nColTest = index_column_count(pIdx);
+               if (IsPrimaryKeyIndex(pIdx))
+                       idx_name = pTab->def->name;
+               else
+                       idx_name = pIdx->def->name;
+               int part_count = pIdx->def->key_def->part_count;
/* Populate the register containing the index name. */
-               sqlite3VdbeLoadString(v, regIdxname, zIdxName);
+               sqlite3VdbeLoadString(v, regIdxname, idx_name);
                VdbeComment((v, "Analysis for %s.%s", pTab->def->name,
-                       zIdxName));
+                           idx_name));
/*
                 * Pseudo-code for loop that calls stat_push():
@@ -906,7 +904,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                 * when building a record to insert into the sample column of
                 * the _sql_stat4 table).
                 */
-               pParse->nMem = MAX(pParse->nMem, regPrev + nColTest);
+               pParse->nMem = MAX(pParse->nMem, regPrev + part_count);
/* Open a read-only cursor on the index being analyzed. */
                struct space *space =
@@ -917,7 +915,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                sqlite3VdbeAddOp3(v, OP_OpenRead, iIdxCur, pIdx->tnum,
                                  space_ptr_reg);
                sql_vdbe_set_p4_key_def(pParse, pIdx);
-               VdbeComment((v, "%s", pIdx->zName));
+               VdbeComment((v, "%s", pIdx->def->name));
/* Invoke the stat_init() function. The arguments are:
                 *
@@ -930,8 +928,8 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                 * The third argument is only used for STAT4
                 */
                sqlite3VdbeAddOp2(v, OP_Count, iIdxCur, regStat4 + 3);
-               sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 1);
-               sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regStat4 + 2);
+               sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 1);
+               sqlite3VdbeAddOp2(v, OP_Integer, part_count, regStat4 + 2);
                sqlite3VdbeAddOp4(v, OP_Function0, 0, regStat4 + 1, regStat4,
                                  (char *)&statInitFuncdef, P4_FUNCDEF);
                sqlite3VdbeChangeP5(v, 3);
@@ -949,11 +947,11 @@ analyzeOneTable(Parse * pParse,   /* Parser context */
                sqlite3VdbeAddOp2(v, OP_Integer, 0, regChng);
                addrNextRow = sqlite3VdbeCurrentAddr(v);
- if (nColTest > 0) {
+               if (part_count > 0) {
                        int endDistinctTest = sqlite3VdbeMakeLabel(v);
                        int *aGotoChng; /* Array of jump instruction addresses 
*/
                        aGotoChng =
-                           sqlite3DbMallocRawNN(db, sizeof(int) * nColTest);
+                           sqlite3DbMallocRawNN(db, sizeof(int) * part_count);
                        if (aGotoChng == 0)
                                continue;
@@ -969,7 +967,7 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                         */
                        sqlite3VdbeAddOp0(v, OP_Goto);
                        addrNextRow = sqlite3VdbeCurrentAddr(v);
-                       if (nColTest == 1 && index_is_unique(pIdx)) {
+                       if (part_count == 1 && pIdx->def->opts.is_unique) {
                                /* For a single-column UNIQUE index, once we 
have found a non-NULL
                                 * row, we know that all the rest will be 
distinct, so skip
                                 * subsequent distinctness tests.
@@ -978,13 +976,12 @@ analyzeOneTable(Parse * pParse,   /* Parser context */
                                                  endDistinctTest);
                                VdbeCoverage(v);
                        }
-                       for (i = 0; i < nColTest; i++) {
-                               uint32_t id;
-                               struct coll *coll =
-                                       sql_index_collation(pIdx, i, &id);
+                       struct key_part *part = pIdx->def->key_def->parts;
+                       for (i = 0; i < part_count; ++i, ++part) {
+                               struct coll *coll = part->coll;
                                sqlite3VdbeAddOp2(v, OP_Integer, i, regChng);
                                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                                                 pIdx->aiColumn[i], regTemp);
+                                                 part->fieldno, regTemp);
                                aGotoChng[i] =
                                    sqlite3VdbeAddOp4(v, OP_Ne, regTemp, 0,
                                                      regPrev + i, (char *)coll,
@@ -992,7 +989,7 @@ analyzeOneTable(Parse * pParse,     /* Parser context */
                                sqlite3VdbeChangeP5(v, SQLITE_NULLEQ);
                                VdbeCoverage(v);
                        }
-                       sqlite3VdbeAddOp2(v, OP_Integer, nColTest, regChng);
+                       sqlite3VdbeAddOp2(v, OP_Integer, part_count, regChng);
                        sqlite3VdbeGoto(v, endDistinctTest);
/*
@@ -1003,11 +1000,11 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                         *  ...
                         */
                        sqlite3VdbeJumpHere(v, addrNextRow - 1);
-                       for (i = 0; i < nColTest; i++) {
+                       part = pIdx->def->key_def->parts;
+                       for (i = 0; i < part_count; ++i, ++part) {
                                sqlite3VdbeJumpHere(v, aGotoChng[i]);
                                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                                                 pIdx->aiColumn[i],
-                                                 regPrev + i);
+                                                 part->fieldno, regPrev + i);
                        }
                        sqlite3VdbeResolveLabel(v, endDistinctTest);
                        sqlite3DbFree(db, aGotoChng);
@@ -1022,19 +1019,18 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                 */
                assert(regKey == (regStat4 + 2));
                Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-               int j, k, regKeyStat;
-               int nPkColumn = (int)index_column_count(pPk);
-               regKeyStat = sqlite3GetTempRange(pParse, nPkColumn);
-               for (j = 0; j < nPkColumn; j++) {
-                       k = pPk->aiColumn[j];
-                       assert(k >= 0 && k < (int)pTab->def->field_count);
-                       sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k, regKeyStat 
+ j);
-                       VdbeComment((v, "%s",
-                               pTab->def->fields[pPk->aiColumn[j]].name));
+               int pk_part_count = (int) pPk->def->key_def->part_count;
+               int regKeyStat = sqlite3GetTempRange(pParse, pk_part_count);
+               for (int j = 0; j < pk_part_count; ++j) {
+                       int k = pPk->def->key_def->parts[j].fieldno;
+                       assert(k >= 0 && k < (int) pTab->def->field_count);
+                       sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
+                                         regKeyStat + j);
+                       VdbeComment((v, "%s", pTab->def->fields[k].name));
                }
                sqlite3VdbeAddOp3(v, OP_MakeRecord, regKeyStat,
-                                 nPkColumn, regKey);
-               sqlite3ReleaseTempRange(pParse, regKeyStat, nPkColumn);
+                                 pk_part_count, regKey);
+               sqlite3ReleaseTempRange(pParse, regKeyStat, pk_part_count);
assert(regChng == (regStat4 + 1));
                sqlite3VdbeAddOp4(v, OP_Function0, 1, regStat4, regTemp,
@@ -1057,11 +1053,11 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                int regDLt = regStat1 + 2;
                int regSample = regStat1 + 3;
                int regCol = regStat1 + 4;
-               int regSampleKey = regCol + nColTest;
+               int regSampleKey = regCol + part_count;
                int addrNext;
                int addrIsNull;
- pParse->nMem = MAX(pParse->nMem, regCol + nColTest);
+               pParse->nMem = MAX(pParse->nMem, regCol + part_count);
addrNext = sqlite3VdbeCurrentAddr(v);
                callStatGet(v, regStat4, STAT_GET_KEY, regSampleKey);
@@ -1077,12 +1073,11 @@ analyzeOneTable(Parse * pParse, /* Parser context */
                 * be taken
                 */
                VdbeCoverageNeverTaken(v);
-               for (i = 0; i < nColTest; i++) {
-                       sqlite3ExprCodeLoadIndexColumn(pParse, pIdx,
-                                                                        
iTabCur, i,
-                                                                        regCol 
+ i);
+               for (i = 0; i < part_count; i++) {
+                       sqlite3ExprCodeLoadIndexColumn(pParse, pIdx, iTabCur, i,
+                                                      regCol + i);
                }
-               sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, nColTest,
+               sqlite3VdbeAddOp3(v, OP_MakeRecord, regCol, part_count,
                                  regSample);
                sqlite3VdbeAddOp3(v, OP_MakeRecord, regTabname, 6, regTemp);
                sqlite3VdbeAddOp2(v, OP_IdxReplace, iStatCur + 1, regTemp);
@@ -1146,7 +1141,7 @@ analyzeTable(Parse * pParse, Table * pTab, Index * 
pOnlyIdx)
        iStatCur = pParse->nTab;
        pParse->nTab += 3;
        if (pOnlyIdx) {
-               openStatTable(pParse, iStatCur, pOnlyIdx->zName, "idx");
+               openStatTable(pParse, iStatCur, pOnlyIdx->def->name, "idx");
        } else {
                openStatTable(pParse, iStatCur, pTab->def->name, "tbl");
        }
diff --git a/src/box/sql/build.c b/src/box/sql/build.c
index 3737a119f..48e0b8d5e 100644
--- a/src/box/sql/build.c
+++ b/src/box/sql/build.c
@@ -241,6 +241,8 @@ static void
 freeIndex(sqlite3 * db, Index * p)
 {
        sql_expr_delete(db, p->pPartIdxWhere, false);
+       if (p->def != NULL)
+               index_def_delete(p->def);
        sqlite3DbFree(db, p->zColAff);
        sqlite3DbFree(db, p);
 }
@@ -259,7 +261,8 @@ sqlite3UnlinkAndDeleteIndex(sqlite3 * db, Index * pIndex)
struct session *user_session = current_session();
- pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash, pIndex->zName, 0);
+       pIndex = sqlite3HashInsert(&pIndex->pTable->idxHash,
+                                  pIndex->def->name, 0);
        if (ALWAYS(pIndex)) {
                if (pIndex->pTable->pIndex == pIndex) {
                        pIndex->pTable->pIndex = pIndex->pNext;
@@ -364,7 +367,7 @@ deleteTable(sqlite3 * db, Table * pTable)
                pNext = pIndex->pNext;
                assert(pIndex->pSchema == pTable->pSchema);
                if ((db == 0 || db->pnBytesFreed == 0)) {
-                       char *zName = pIndex->zName;
+                       char *zName = pIndex->def->name;
                        TESTONLY(Index *
                                 pOld =) sqlite3HashInsert(&pTable->idxHash,
                                                           zName, 0);
@@ -1026,7 +1029,7 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
        Table *p = pParse->pNewTable;
        if (p == NULL)
                return;
-       int i = p->def->field_count - 1;
+       uint32_t i = p->def->field_count - 1;
        sqlite3 *db = pParse->db;
        char *zColl = sqlite3NameFromToken(db, pToken);
        if (!zColl)
@@ -1034,22 +1037,21 @@ sqlite3AddCollateType(Parse * pParse, Token * pToken)
        uint32_t *id = &p->def->fields[i].coll_id;
        p->aCol[i].coll = sql_get_coll_seq(pParse, zColl, id);
        if (p->aCol[i].coll != NULL) {
-               Index *pIdx;
                /* If the column is declared as "<name> PRIMARY KEY COLLATE 
<type>",
                 * then an index may have been created on this column before the
                 * collation type was added. Correct this if it is the case.
                 */
-               for (pIdx = p->pIndex; pIdx; pIdx = pIdx->pNext) {
-                       assert(pIdx->nColumn == 1);
-                       if (pIdx->aiColumn[0] == i) {
-                               id = &pIdx->coll_id_array[0];
-                               pIdx->coll_array[0] =
+               for (struct Index *pIdx = p->pIndex; pIdx != NULL;
+                    pIdx = pIdx->pNext) {
+                       assert(pIdx->def->key_def->part_count == 1);
+                       if (pIdx->def->key_def->parts[0].fieldno == i) {
+                               id = &pIdx->def->key_def->parts[0].coll_id;
+                               pIdx->def->key_def->parts[0].coll =
                                        sql_column_collation(p->def, i, id);
                        }
                }
-       } else {
-               sqlite3DbFree(db, zColl);
        }
+       sqlite3DbFree(db, zColl);
 }
struct coll *
@@ -1079,66 +1081,6 @@ sql_column_collation(struct space_def *def, uint32_t 
column, uint32_t *coll_id)
        return space->format->fields[column].coll;
 }
-struct key_def*
-sql_index_key_def(struct Index *idx)
-{
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       assert(space != NULL);
-       struct index *index = space_index(space, index_id);
-       assert(index != NULL && index->def != NULL);
-       return index->def->key_def;
-}
-
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *coll_id)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-       struct space *space = space_by_id(space_id);
-
-       assert(column < idx->nColumn);
-       /*
-        * If space is still under construction, or it is
-        * an ephemeral space, then fetch collation from
-        * SQL internal structure.
-        */
-       if (space == NULL) {
-               assert(column < idx->nColumn);
-               *coll_id = idx->coll_id_array[column];
-               return idx->coll_array[column];
-       }
-
-       struct key_def *key_def = sql_index_key_def(idx);
-       assert(key_def != NULL && key_def->part_count >= column);
-       *coll_id = key_def->parts[column].coll_id;
-       return key_def->parts[column].coll;
-}
-
-enum sort_order
-sql_index_column_sort_order(Index *idx, uint32_t column)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->pTable->tnum);
-       struct space *space = space_by_id(space_id);
-
-       assert(column < idx->nColumn);
-       /*
-        * If space is still under construction, or it is
-        * an ephemeral space, then fetch collation from
-        * SQL internal structure.
-        */
-       if (space == NULL) {
-               assert(column < idx->nColumn);
-               return idx->sort_order[column];
-       }
-
-       struct key_def *key_def = sql_index_key_def(idx);
-       assert(key_def != NULL && key_def->part_count >= column);
-       return key_def->parts[column].sort_order;
-}
-
 struct ExprList *
 space_checks_expr_list(uint32_t space_id)
 {
@@ -1322,17 +1264,6 @@ createTableStmt(sqlite3 * db, Table * p)
        return zStmt;
 }
-/* Return true if value x is found any of the first nCol entries of aiCol[]
- */
-static int
-hasColumn(const i16 * aiCol, int nCol, int x)
-{
-       while (nCol-- > 0)
-               if (x == *(aiCol++))
-                       return 1;
-       return 0;
-}
-
 /*
  * This routine runs at the end of parsing a CREATE TABLE statement.
  * The job of this routine is to convert both
@@ -1349,13 +1280,12 @@ static void
 convertToWithoutRowidTable(Parse * pParse, Table * pTab)
 {
        Index *pPk;
-       int i, j;
        sqlite3 *db = pParse->db;
/* Mark every PRIMARY KEY column as NOT NULL (except for imposter tables)
         */
        if (!db->init.imposterTable) {
-               for (i = 0; i < (int)pTab->def->field_count; i++) {
+               for (uint32_t i = 0; i < pTab->def->field_count; i++) {
                        if (pTab->aCol[i].is_primkey) {
                                pTab->def->fields[i].nullable_action
                                        = ON_CONFLICT_ACTION_ABORT;
@@ -1387,20 +1317,6 @@ convertToWithoutRowidTable(Parse * pParse, Table * pTab)
                pTab->iPKey = -1;
        } else {
                pPk = sqlite3PrimaryKeyIndex(pTab);
-
-               /*
-                * Remove all redundant columns from the PRIMARY KEY.  For 
example, change
-                * "PRIMARY KEY(a,b,a,b,c,b,c,d)" into just "PRIMARY 
KEY(a,b,c,d)".  Later
-                * code assumes the PRIMARY KEY contains no repeated columns.
-                */
-               for (i = j = 1; i < pPk->nColumn; i++) {
-                       if (hasColumn(pPk->aiColumn, j, pPk->aiColumn[i])) {
-                               pPk->nColumn--;
-                       } else {
-                               pPk->aiColumn[j++] = pPk->aiColumn[i];
-                       }
-               }
-               pPk->nColumn = j;
        }
        assert(pPk != 0);
 }
@@ -1482,7 +1398,7 @@ createIndex(Parse * pParse, Index * pIndex, int iSpaceId, 
int iIndexId,
        }
        sqlite3VdbeAddOp4(v,
                          OP_String8, 0, iFirstCol + 2, 0,
-                         sqlite3DbStrDup(pParse->db, pIndex->zName),
+                         sqlite3DbStrDup(pParse->db, pIndex->def->name),
                          P4_DYNAMIC);
        sqlite3VdbeAddOp4(v, OP_String8, 0, iFirstCol + 3, 0, "tree",
                          P4_STATIC);
@@ -1519,7 +1435,7 @@ makeIndexSchemaRecord(Parse * pParse,
sqlite3VdbeAddOp4(v,
                          OP_String8, 0, iFirstCol, 0,
-                         sqlite3DbStrDup(pParse->db, pIndex->zName),
+                         sqlite3DbStrDup(pParse->db, pIndex->def->name),
                          P4_DYNAMIC);
if (pParse->pNewTable) {
@@ -2448,15 +2364,16 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int 
memRootPage)
        } else {
                tnum = pIndex->tnum;
        }
-       struct key_def *def = key_def_dup(sql_index_key_def(pIndex));
+       struct key_def *def = key_def_dup(pIndex->def->key_def);
        if (def == NULL) {
                sqlite3OomFault(db);
                return;
        }
        /* Open the sorter cursor if we are to use one. */
        iSorter = pParse->nTab++;
-       sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0, pIndex->nColumn,
-                         (char *)def, P4_KEYDEF);
+       sqlite3VdbeAddOp4(v, OP_SorterOpen, iSorter, 0,
+                         pIndex->def->key_def->part_count, (char *)def,
+                         P4_KEYDEF);
/* Open the table. Loop through all rows of the table, inserting index
         * records into the sorter.
@@ -2487,7 +2404,8 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int 
memRootPage)
                sqlite3VdbeGoto(v, j2);
                addr2 = sqlite3VdbeCurrentAddr(v);
                sqlite3VdbeAddOp4Int(v, OP_SorterCompare, iSorter, j2,
-                                    regRecord, pIndex->nColumn);
+                                    regRecord,
+                                    pIndex->def->key_def->part_count);
                VdbeCoverage(v);
                parser_emit_unique_constraint(pParse, ON_CONFLICT_ACTION_ABORT,
                                              pIndex);
@@ -2507,44 +2425,15 @@ sqlite3RefillIndex(Parse * pParse, Index * pIndex, int 
memRootPage)
        sqlite3VdbeAddOp1(v, OP_Close, iSorter);
 }
-/*
- * Allocate heap space to hold an Index object with nCol columns.
- *
- * Increase the allocation size to provide an extra nExtra bytes
- * of 8-byte aligned space after the Index object and return a
- * pointer to this extra space in *ppExtra.
- */
-Index *
-sqlite3AllocateIndexObject(sqlite3 * db,       /* Database connection */
-                          i16 nCol,    /* Total number of columns in the index 
*/
-                          int nExtra,  /* Number of bytes of extra space to 
alloc */
-                          char **ppExtra       /* Pointer to the "extra" space 
*/
-    )
+struct Index *
+sql_index_alloc(struct sqlite3 *db, uint32_t part_count)
 {
-       Index *p;               /* Allocated index object */
-       int nByte;              /* Bytes of space for Index object + arrays */
-
-       nByte = ROUND8(sizeof(Index)) +             /* Index structure   */
-           ROUND8(sizeof(struct coll *) * nCol) +  /* Index.coll_array  */
-           ROUND8(sizeof(uint32_t) * nCol) +       /* Index.coll_id_array*/
-           ROUND8(sizeof(LogEst) * (nCol + 1) +    /* Index.aiRowLogEst */
-                  sizeof(i16) * nCol +             /* Index.aiColumn    */
-                  sizeof(enum sort_order) * nCol); /* Index.sort_order  */
-       p = sqlite3DbMallocZero(db, nByte + nExtra);
-       if (p) {
-               char *pExtra = ((char *)p) + ROUND8(sizeof(Index));
-               p->coll_array = (struct coll **)pExtra;
-               pExtra += ROUND8(sizeof(struct coll **) * nCol);
-               p->coll_id_array = (uint32_t *) pExtra;
-               pExtra += ROUND8(sizeof(uint32_t) * nCol);
-               p->aiRowLogEst = (LogEst *) pExtra;
-               pExtra += sizeof(LogEst) * (nCol + 1);
-               p->aiColumn = (i16 *) pExtra;
-               pExtra += sizeof(i16) * nCol;
-               p->sort_order = (enum sort_order *) pExtra;
-               p->nColumn = nCol;
-               *ppExtra = ((char *)p) + nByte;
-       }
+       /* Size of struct Index and aiRowLogEst. */
+       int nByte = ROUND8(sizeof(struct Index)) +
+                   ROUND8(sizeof(LogEst) * (part_count + 1));
+       struct Index *p = sqlite3DbMallocZero(db, nByte);
+       if (p != NULL)
+               p->aiRowLogEst = (LogEst *) ((char *)p + ROUND8(sizeof(*p)));
        return p;
 }
@@ -2631,46 +2520,187 @@ addIndexToTable(Index * pIndex, Table * pTab)
        }
 }
-bool
-index_is_unique(Index *idx)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       assert(space != NULL);
-       struct index *tnt_index = space_index(space, index_id);
-       assert(tnt_index != NULL);
+/**
+ * Allocate memory on parser region and copy given string (part of
+ * the sql statement) into the allocated memory.
+ * @param parse Parse context.
+ * @param str String (a part of sql statement) to be copied.
+ *
+ * @retval size Appended size.
+ */
+static int
+sql_append(struct Parse *parse, const char *str)
+{
+       const size_t str_len = strlen(str);
+       char *str_part = region_alloc(&parse->region, str_len);
+       if (str_part == NULL) {
+               diag_set(OutOfMemory, str_len, "region_alloc", "str_part");
+               parse->rc = SQL_TARANTOOL_ERROR;
+               parse->nErr++;
+               return 0;
+       }
+       memcpy(str_part, str, str_len);
+       return str_len;
+}
+
+/**
+ * Create and set index_def in the given Index.
+ *
+ * @param parse Parse context.
+ * @param index Index for which index_def should be created. It is
+ *              used only to set index_def at the end of the
+ *              function.
+ * @param table Table which is indexed by 'index' param.
+ * @param iid Index ID.
+ * @param name Index name.
+ * @param name_len Index name length.
+ * @param is_unique Is given 'index' unique or not.
+ * @param expr_list List of expressions, describe which columns
+ *                  of 'table' are used in index and also their
+ *                  collations, orders, etc.
+ * @param idx_type Index type, one of the following:
+ *                 SQLITE_IDXTYPE_APPDEF 0 (Index is created with
+ *                 CREATE INDEX statement)
+ *                 SQLITE_IDXTYPE_UNIQUE 1 (Index is created
+ *                 automatically to implement a UNIQUE constraint)
+ *                 SQLITE_IDXTYPE_PRIMARYKEY 2 (Index is a PRIMARY
+ *                 KEY).
+ */
+static void
+set_index_def(struct Parse *parse, struct Index *index, struct Table *table,
+             uint32_t iid, const char *name, uint32_t name_len, bool is_unique,
+             struct ExprList *expr_list, u8 idx_type)
+{
+       struct space_def *space_def = table->def;
+       size_t sql_size = 0;
+       struct index_opts opts;
+       index_opts_create(&opts);
+       index->def = NULL;
+       opts.is_unique = is_unique;
+
+       struct key_def *key_def = key_def_new(expr_list->nExpr);
+       if (key_def == NULL)
+               goto tnt_error;
+
+       /* Build initial parts of SQL statement.  */
+       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+               sql_size += sql_append(parse, "CREATE INDEX ");
+               sql_size += sql_append(parse, name);
+               sql_size += sql_append(parse, " ON ");
+               sql_size += sql_append(parse, space_def->name);
+               sql_size += sql_append(parse, " (");
+       }
+
+       for (int i = 0; i < expr_list->nExpr; i++) {
+               struct Expr *expr = expr_list->a[i].pExpr;
+               sql_resolve_self_reference(parse, table, NC_IdxExpr, expr, 0);
+               if (parse->nErr > 0)
+                       goto cleanup;
+
+               struct Expr *column_expr = sqlite3ExprSkipCollate(expr);
+               if (column_expr->op != TK_COLUMN) {
+                       diag_set(ClientError, ER_UNSUPPORTED, "Tarantool",
+                                "functional indexes");
+                       goto tnt_error;
+               }
+
+               uint32_t fieldno = column_expr->iColumn;
+               const char *column_name = column_expr->u.zToken;
+               uint32_t coll_id;
+               struct coll *coll;
+               if (expr->op == TK_COLLATE) {
+                       coll = sql_get_coll_seq(parse, expr->u.zToken,
+                                               &coll_id);
+                       if (coll == NULL &&
+                           strcasecmp(expr->u.zToken, "binary") != 0) {
+                               diag_set(ClientError, ER_NO_SUCH_COLLATION,
+                                        expr->u.zToken);
+                               goto tnt_error;
+                       }
+                       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+                               sql_size += sql_append(parse, column_name);
+                               sql_size += sql_append(parse, " COLLATE ");
+                               sql_size += sql_append(parse, expr->u.zToken);
+                               sql_size += sql_append(parse, ", ");
+                       }
+               } else {
+                       coll = sql_column_collation(space_def, fieldno,
+                                                   &coll_id);
+                       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+                               sql_size += sql_append(parse, column_name);
+                               sql_size += sql_append(parse, ", ");
+                       }
+               }
+               /*
+                * Tarantool: DESC indexes are not supported so
+                * far.
+                */
+               key_def_set_part(key_def, i, fieldno,
+                                space_def->fields[fieldno].type,
+                                space_def->fields[fieldno].nullable_action,
+                                coll, coll_id, SORT_ORDER_ASC);
+       }
+       if (parse->nErr > 0)
+               goto cleanup;
- return tnt_index->def->opts.is_unique;
+       if (idx_type == SQLITE_IDXTYPE_APPDEF) {
+               opts.sql = region_join(&parse->region, sql_size);
+               if (opts.sql == NULL) {
+                       diag_set(OutOfMemory, sql_size, "region_join",
+                                "opts.sql");
+                       goto tnt_error;
+               }
+               /*
+                * fix last ", " with ")\0" to finish statement.
+                */
+               memcpy(&opts.sql[sql_size - 2], ")\0", 2);
+       }
+
+       struct key_def *pk_key_def;
+       if (idx_type == SQLITE_IDXTYPE_APPDEF)
+               pk_key_def = table->pIndex->def->key_def;
+       else
+               pk_key_def = NULL;
+
+       index->def = index_def_new(space_def->id, iid, name, name_len, TREE,
+                                  &opts, key_def, pk_key_def);
+       if (index->def == NULL)
+               goto tnt_error;
+cleanup:
+       if (key_def != NULL)
+               key_def_delete(key_def);
+       return;
+tnt_error:
+       parse->rc = SQL_TARANTOOL_ERROR;
+       ++parse->nErr;
+       goto cleanup;
 }
void
 sql_create_index(struct Parse *parse, struct Token *token,
                 struct SrcList *tbl_name, struct ExprList *col_list,
-                int on_error, struct Token *start, struct Expr *where,
-                enum sort_order sort_order, bool if_not_exist, u8 idx_type)
-{
-       Table *pTab = 0;        /* Table to be indexed */
-       Index *pIndex = 0;      /* The index to be created */
-       char *zName = 0;        /* Name of the index */
-       int nName;              /* Number of characters in zName */
-       int i, j;
-       DbFixer sFix;           /* For assigning database names to pTable */
-       sqlite3 *db = parse->db;
-       struct ExprList_item *col_listItem;     /* For looping over col_list */
-       int nExtra = 0;         /* Space allocated for zExtra[] */
-       char *zExtra = 0;       /* Extra space after the Index object */
+                enum on_conflict_action on_error, struct Token *start,
+                struct Expr *where, enum sort_order sort_order,
+                bool if_not_exist, u8 idx_type)
+{
+       /* Table to be indexed.  */
+       struct Table *table = NULL;
+       /* The index to be created.  */
+       struct Index *index = NULL;
+       /* Name of the index.  */
+       char *name = NULL;
+       int name_len;
+       struct sqlite3 *db = parse->db;
        struct session *user_session = current_session();
- if (db->mallocFailed || parse->nErr > 0) {
+       if (db->mallocFailed || parse->nErr > 0)
                goto exit_create_index;
-       }
-       /* Do not account nested operations: the count of such
-        * operations depends on Tarantool data dictionary internals,
-        * such as data layout in system spaces. Also do not account
-        * PRIMARY KEY and UNIQUE constraint - they had been accounted
-        * in CREATE TABLE already.
+       /*
+        * Do not account nested operations: the count of such
+        * operations depends on Tarantool data dictionary
+        * internals, such as data layout in system spaces. Also
+        * do not account PRIMARY KEY and UNIQUE constraint - they
+        * had been accounted in CREATE TABLE already.
         */
        if (!parse->nested && idx_type == SQLITE_IDXTYPE_APPDEF) {
                Vdbe *v = sqlite3GetVdbe(parse);
@@ -2681,39 +2711,43 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
        assert(db->pSchema != NULL);
/*
-        * Find the table that is to be indexed.  Return early if not found.
+        * Find the table that is to be indexed.
+        * Return early if not found.
         */
        if (tbl_name != NULL) {
-
-               /* Use the two-part index name to determine the database
-                * to search for the table. 'Fix' the table name to this db
-                * before looking up the table.
+               /*
+                * Use the two-part index name to determine the
+                * database to search for the table. 'Fix' the
+                * table name to this db before looking up the
+                * table.
                 */
                assert(token && token->z);
-
-               sqlite3FixInit(&sFix, parse, "index", token);
-               if (sqlite3FixSrcList(&sFix, tbl_name)) {
-                       /* Because the parser constructs tbl_name from a single 
identifier,
+               DbFixer db_fixer;
+               sqlite3FixInit(&db_fixer, parse, "index", token);
+               if (sqlite3FixSrcList(&db_fixer, tbl_name)) {
+                       /*
+                        * Because the parser constructs tbl_name
+                        * from a single identifier,
                         * sqlite3FixSrcList can never fail.
                         */
-                       assert(0);
+                       unreachable();
                }
-               pTab = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
-               assert(db->mallocFailed == 0 || pTab == 0);
-               if (pTab == 0)
+               table = sqlite3LocateTable(parse, 0, tbl_name->a[0].zName);
+               assert(db->mallocFailed == 0 || table == NULL);
+               if (table == NULL)
                        goto exit_create_index;
-               sqlite3PrimaryKeyIndex(pTab);
+               sqlite3PrimaryKeyIndex(table);
        } else {
                assert(token == NULL);
                assert(start == NULL);
-               pTab = parse->pNewTable;
-               if (!pTab)
+               table = parse->pNewTable;
+               if (table == NULL)
                        goto exit_create_index;
        }
- assert(pTab != 0);
+       assert(table != NULL);
        assert(parse->nErr == 0);
-       if (pTab->def->opts.is_view) {
+       if (table->def->opts.is_view) {
                sqlite3ErrorMsg(parse, "views may not be indexed");
                goto exit_create_index;
        }
@@ -2731,42 +2765,38 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
         * primary key or UNIQUE constraint.  We have to invent
         * our own name.
         */
-       if (token) {
-               zName = sqlite3NameFromToken(db, token);
-               if (zName == 0)
+       if (token != NULL) {
+               name = sqlite3NameFromToken(db, token);
+               if (name == NULL)
                        goto exit_create_index;
-               assert(token->z != 0);
+               assert(token->z != NULL);
                if (!db->init.busy) {
-                       if (sqlite3HashFind(&db->pSchema->tblHash, zName) !=
+                       if (sqlite3HashFind(&db->pSchema->tblHash, name) !=
                            NULL) {
-                               sqlite3ErrorMsg(parse,
-                                               "there is already a table named 
%s",
-                                               zName);
+                               sqlite3ErrorMsg(parse, "there is already a "\
+                                               "table named %s", name);
                                goto exit_create_index;
                        }
                }
-               if (sqlite3HashFind(&pTab->idxHash, zName) != NULL) {
+               if (sqlite3HashFind(&table->idxHash, name) != NULL) {
                        if (!if_not_exist) {
                                sqlite3ErrorMsg(parse,
                                                "index %s.%s already exists",
-                                               pTab->def->name, zName);
+                                               table->def->name, name);
                        } else {
                                assert(!db->init.busy);
                        }
                        goto exit_create_index;
                }
        } else {
-               int n;
-               Index *pLoop;
-               for (pLoop = pTab->pIndex, n = 1; pLoop;
+               int n = 1;
+               for (struct Index *pLoop = table->pIndex; pLoop != NULL;
                     pLoop = pLoop->pNext, n++) {
                }
-               zName =
-                   sqlite3MPrintf(db, "sqlite_autoindex_%s_%d", 
pTab->def->name,
-                                  n);
-               if (zName == 0) {
+               name = sqlite3MPrintf(db, "sqlite_autoindex_%s_%d",
+                                     table->def->name, n);
+               if (name == NULL)
                        goto exit_create_index;
-               }
        }
/*
@@ -2776,9 +2806,9 @@ sql_create_index(struct Parse *parse, struct Token *token,
         * simulate this.
         */
        if (col_list == NULL) {
-               Token prevCol;
-               uint32_t last_field = pTab->def->field_count - 1;
-               sqlite3TokenInit(&prevCol, pTab->def->fields[last_field].name);
+               struct Token prevCol;
+               uint32_t last_field = table->def->field_count - 1;
+               sqlite3TokenInit(&prevCol, table->def->fields[last_field].name);
                col_list = sql_expr_list_append(parse->db, NULL,
                                                sqlite3ExprAlloc(db, TK_ID,
                                                                 &prevCol, 0));
@@ -2790,108 +2820,93 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
                sqlite3ExprListCheckLength(parse, col_list, "index");
        }
- /* Figure out how many bytes of space are required to store explicitly
-        * specified collation sequence names.
-        */
-       for (i = 0; i < col_list->nExpr; i++) {
-               Expr *pExpr = col_list->a[i].pExpr;
-               assert(pExpr != 0);
-               if (pExpr->op == TK_COLLATE) {
-                       nExtra += (1 + sqlite3Strlen30(pExpr->u.zToken));
-               }
-       }
+       /* Allocate the index structure.  */
+       name_len = sqlite3Strlen30(name);
- /*
-        * Allocate the index structure.
-        */
-       nName = sqlite3Strlen30(zName);
-       pIndex = sqlite3AllocateIndexObject(db, col_list->nExpr,
-                                           nName + nExtra + 1, &zExtra);
-       if (db->mallocFailed) {
+       if (name_len > BOX_NAME_MAX) {
+               sqlite3ErrorMsg(parse, "%s.%s exceeds indexes' names length "\
+                               "limit", table->def->name, name);
                goto exit_create_index;
        }
-       assert(EIGHT_BYTE_ALIGNMENT(pIndex->aiRowLogEst));
-       assert(EIGHT_BYTE_ALIGNMENT(pIndex->coll_array));
-       pIndex->zName = zExtra;
-       zExtra += nName + 1;
-       memcpy(pIndex->zName, zName, nName + 1);
-       pIndex->pTable = pTab;
-       pIndex->onError = (u8) on_error;
+
+       if (sqlite3CheckIdentifierName(parse, name) != SQLITE_OK)
+               goto exit_create_index;
+
+       index = sql_index_alloc(db, col_list->nExpr);
+       if (db->mallocFailed)
+               goto exit_create_index;
+
+       assert(EIGHT_BYTE_ALIGNMENT(index->aiRowLogEst));
+       index->pTable = table;
+       index->onError = (u8) on_error;
        /*
         * Don't make difference between UNIQUE indexes made by user
         * using CREATE INDEX statement and those created during
         * CREATE TABLE processing.
         */
        if (idx_type == SQLITE_IDXTYPE_APPDEF &&
-           on_error != ON_CONFLICT_ACTION_NONE) {
-               pIndex->idxType = SQLITE_IDXTYPE_UNIQUE;
-       } else {
-               pIndex->idxType = idx_type;
-       }
-       pIndex->pSchema = db->pSchema;
-       pIndex->nColumn = col_list->nExpr;
+           on_error != ON_CONFLICT_ACTION_NONE)
+               index->idxType = SQLITE_IDXTYPE_UNIQUE;
+       else
+               index->idxType = idx_type;
+       index->pSchema = db->pSchema;
        /* Tarantool have access to each column by any index */
-       if (where) {
-               sql_resolve_self_reference(parse, pTab, NC_PartIdx, where,
+       if (where != NULL) {
+               sql_resolve_self_reference(parse, table, NC_PartIdx, where,
                                           NULL);
-               pIndex->pPartIdxWhere = where;
+               index->pPartIdxWhere = where;
                where = NULL;
        }
- /* Analyze the list of expressions that form the terms of the index and
-        * report any errors.  In the common case where the expression is 
exactly
-        * a table column, store that column in aiColumn[].
-        *
-        * TODO: Issue a warning if two or more columns of the index are 
identical.
-        * TODO: Issue a warning if the table primary key is used as part of the
-        * index key.
+       /*
+        * TODO: Issue a warning if two or more columns of the
+        * index are identical.
+        * TODO: Issue a warning if the table primary key is used
+        * as part of the index key.
         */
-       for (i = 0, col_listItem = col_list->a; i < col_list->nExpr;
-            i++, col_listItem++) {
-               Expr *pCExpr;   /* The i-th index expression */
-               sql_resolve_self_reference(parse, pTab, NC_IdxExpr,
-                                          col_listItem->pExpr, NULL);
-               if (parse->nErr > 0)
-                       goto exit_create_index;
-               pCExpr = sqlite3ExprSkipCollate(col_listItem->pExpr);
-               if (pCExpr->op != TK_COLUMN) {
-                       sqlite3ErrorMsg(parse,
-                                       "functional indexes aren't supported "
-                                       "in the current version");
-                       goto exit_create_index;
-               } else {
-                       j = pCExpr->iColumn;
-                       assert(j <= 0x7fff);
-                       if (j < 0) {
-                               j = pTab->iPKey;
-                       }
-                       pIndex->aiColumn[i] = (i16) j;
-               }
-               struct coll *coll;
-               uint32_t id;
-               if (col_listItem->pExpr->op == TK_COLLATE) {
-                       const char *coll_name = col_listItem->pExpr->u.zToken;
-                       coll = sql_get_coll_seq(parse, coll_name, &id);
- if (coll == NULL &&
-                           sqlite3StrICmp(coll_name, "binary") != 0) {
-                               goto exit_create_index;
+       uint32_t max_iid = 0;
+       for (struct Index *index = table->pIndex; index != NULL;
+            index = index->pNext) {
+               max_iid = max_iid > index->def->iid ?
+                         max_iid : index->def->iid + 1;
+       }
+
+       bool is_unique = on_error != ON_CONFLICT_ACTION_NONE;
+       set_index_def(parse,  index, table, max_iid, name, name_len,
+                     is_unique, col_list, idx_type);
+
+       if (index->def == NULL)
+               goto exit_create_index;
+       /*
+        * Remove all redundant columns from the PRIMARY KEY.
+        * For example, change "PRIMARY KEY(a,b,a,b,c,b,c,d)" into
+        * just "PRIMARY KEY(a,b,c,d)". Later code assumes the
+        * PRIMARY KEY contains no repeated columns.
+        */
+       if (IsPrimaryKeyIndex(index)) {
+               struct key_part *parts = index->def->key_def->parts;
+               uint32_t part_count = index->def->key_def->part_count;
+               uint32_t new_part_count = 1;
+
+               for(uint32_t i = 1; i < part_count; i++) {
+                       uint32_t j;
+                       for(j = 0; j < new_part_count; j++) {
+                               if(parts[i].fieldno == parts[j].fieldno)
+                                       break;
                        }
-               } else if (j >= 0) {
-                       coll = sql_column_collation(pTab->def, j, &id);
-               } else {
-                       id = COLL_NONE;
-                       coll = NULL;
+
+                       if (j == new_part_count)
+                               parts[new_part_count++] = parts[i];
                }
-               pIndex->coll_array[i] = coll;
-               pIndex->coll_id_array[i] = id;
- /* Tarantool: DESC indexes are not supported so far.
-                * See gh-3016.
-                */
-               pIndex->sort_order[i] = SORT_ORDER_ASC;
+               index->def->key_def->part_count = new_part_count;
        }
-       if (pTab == parse->pNewTable) {
+
+       if (!index_def_is_valid(index->def, table->def->name))
+               goto exit_create_index;
+
+       if (table == parse->pNewTable) {
                /* This routine has been called to create an automatic index as 
a
                 * result of a PRIMARY KEY or UNIQUE clause on a column 
definition, or
                 * a PRIMARY KEY or UNIQUE clause following the column 
definitions.
@@ -2913,28 +2928,28 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
                 * the constraint occur in different orders, then the 
constraints are
                 * considered distinct and both result in separate indices.
                 */
-               Index *pIdx;
-               for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-                       int k;
+               for (struct Index *pIdx = table->pIndex; pIdx != NULL;
+                    pIdx = pIdx->pNext) {
+                       uint32_t k;
                        assert(IsUniqueIndex(pIdx));
                        assert(pIdx->idxType != SQLITE_IDXTYPE_APPDEF);
-                       assert(IsUniqueIndex(pIndex));
+                       assert(IsUniqueIndex(index));
- if (pIdx->nColumn != pIndex->nColumn)
+                       if (pIdx->def->key_def->part_count !=
+                           index->def->key_def->part_count)
                                continue;
-                       for (k = 0; k < pIdx->nColumn; k++) {
-                               assert(pIdx->aiColumn[k] >= 0);
-                               if (pIdx->aiColumn[k] != pIndex->aiColumn[k])
+                       for (k = 0; k < pIdx->def->key_def->part_count; k++) {
+                               if (pIdx->def->key_def->parts[k].fieldno !=
+                                   index->def->key_def->parts[k].fieldno)
                                        break;
                                struct coll *coll1, *coll2;
-                               uint32_t id;
-                               coll1 = sql_index_collation(pIdx, k, &id);
-                               coll2 = sql_index_collation(pIndex, k, &id);
+                               coll1 = pIdx->def->key_def->parts[k].coll;
+                               coll2 = index->def->key_def->parts[k].coll;
                                if (coll1 != coll2)
                                        break;
                        }
-                       if (k == pIdx->nColumn) {
-                               if (pIdx->onError != pIndex->onError) {
+                       if (k == pIdx->def->key_def->part_count) {
+                               if (pIdx->onError != index->onError) {
                                        /* This constraint creates the same 
index as a previous
                                         * constraint specified somewhere in 
the CREATE TABLE statement.
                                         * However the ON CONFLICT clauses are 
different. If both this
@@ -2942,17 +2957,19 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
                                         * ON CONFLICT clauses this is an 
error. Otherwise, use the
                                         * explicitly specified behavior for 
the index.
                                         */
-                                       if (!
-                                           (pIdx->onError == 
ON_CONFLICT_ACTION_DEFAULT
-                                            || pIndex->onError ==
-                                            ON_CONFLICT_ACTION_DEFAULT)) {
+                                       if (pIdx->onError !=
+                                           ON_CONFLICT_ACTION_DEFAULT &&
+                                           index->onError !=
+                                           ON_CONFLICT_ACTION_DEFAULT) {
                                                sqlite3ErrorMsg(parse,
-                                                               "conflicting ON 
CONFLICT clauses specified",
-                                                               0);
-                                       }
-                                       if (pIdx->onError == 
ON_CONFLICT_ACTION_DEFAULT) {
-                                               pIdx->onError = pIndex->onError;
+                                                               "conflicting "\
+                                                               "ON CONFLICT "\
+                                                               "clauses "\
+                                                               "specified");
                                        }
+                                       if (pIdx->onError ==
+                                           ON_CONFLICT_ACTION_DEFAULT)
+                                               pIdx->onError = index->onError;
                                }
                                if (idx_type == SQLITE_IDXTYPE_PRIMARYKEY)
                                        pIdx->idxType = idx_type;
@@ -2966,15 +2983,16 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
         */
        assert(parse->nErr == 0);
        if (db->init.busy) {
-               Index *p;
-               p = sqlite3HashInsert(&pTab->idxHash, pIndex->zName, pIndex);
-               if (p) {
-                       assert(p == pIndex);    /* Malloc must have failed */
+               struct Index *p = sqlite3HashInsert(&table->idxHash,
+                                                   index->def->name, index);
+               if (p != NULL) {
+                       /* Malloc must have failed. */
+                       assert(p == index);
                        sqlite3OomFault(db);
                        goto exit_create_index;
                }
                user_session->sql_flags |= SQLITE_InternChanges;
-               pIndex->tnum = db->init.newTnum;
+               index->tnum = db->init.newTnum;
        }
/*
@@ -3025,14 +3043,14 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
                                       ON_CONFLICT_ACTION_NONE ? "" : " UNIQUE",
                                       n, token->z);
- iSpaceId = SQLITE_PAGENO_TO_SPACEID(pTab->tnum);
+               iSpaceId = SQLITE_PAGENO_TO_SPACEID(table->tnum);
                iIndexId = getNewIid(parse, iSpaceId, iCursor);
                sqlite3VdbeAddOp1(v, OP_Close, iCursor);
-               createIndex(parse, pIndex, iSpaceId, iIndexId, zStmt);
+               createIndex(parse, index, iSpaceId, iIndexId, zStmt);
/* consumes zStmt */
                iFirstSchemaCol =
-                   makeIndexSchemaRecord(parse, pIndex, iSpaceId, iIndexId,
+                   makeIndexSchemaRecord(parse, index, iSpaceId, iIndexId,
                                          zStmt);
/* Reparse the schema. Code an OP_Expire
@@ -3055,54 +3073,17 @@ sql_create_index(struct Parse *parse, struct Token 
*token,
if (!db->init.busy && tbl_name != NULL)
                goto exit_create_index;
-       addIndexToTable(pIndex, pTab);
-       pIndex = NULL;
+       addIndexToTable(index, table);
+       index = NULL;
/* Clean up before exiting */
  exit_create_index:
-       if (pIndex)
-               freeIndex(db, pIndex);
+       if (index != NULL)
+               freeIndex(db, index);
        sql_expr_delete(db, where, false);
        sql_expr_list_delete(db, col_list);
        sqlite3SrcListDelete(db, tbl_name);
-       sqlite3DbFree(db, zName);
-}
-
-/**
- * Return number of columns in given index.
- * If space is ephemeral, use internal
- * SQL structure to fetch the value.
- */
-uint32_t
-index_column_count(const Index *idx)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       /* It is impossible to find an ephemeral space by id. */
-       if (space == NULL)
-               return idx->nColumn;
-
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct index *index = space_index(space, index_id);
-       assert(index != NULL);
-       return index->def->key_def->part_count;
-}
-
-/** Return true if given index is unique and not nullable. */
-bool
-index_is_unique_not_null(const Index *idx)
-{
-       assert(idx != NULL);
-       uint32_t space_id = SQLITE_PAGENO_TO_SPACEID(idx->tnum);
-       struct space *space = space_by_id(space_id);
-       assert(space != NULL);
-
-       uint32_t index_id = SQLITE_PAGENO_TO_INDEXID(idx->tnum);
-       struct index *index = space_index(space, index_id);
-       assert(index != NULL);
-       return (index->def->opts.is_unique &&
-               !index->def->key_def->is_nullable);
+       sqlite3DbFree(db, name);
 }
void
@@ -3728,9 +3709,9 @@ parser_emit_unique_constraint(struct Parse *parser,
        const struct space_def *def = index->pTable->def;
        StrAccum err_accum;
        sqlite3StrAccumInit(&err_accum, parser->db, 0, 0, 200);
-       for (int j = 0; j < index->nColumn; ++j) {
-               assert(index->aiColumn[j] >= 0);
-               const char *col_name = def->fields[index->aiColumn[j]].name;
+       struct key_part *part = index->def->key_def->parts;
+       for (uint32_t j = 0; j < index->def->key_def->part_count; ++j, ++part) {
+               const char *col_name = def->fields[part->fieldno].name;
                if (j != 0)
                        sqlite3StrAccumAppend(&err_accum, ", ", 2);
                sqlite3XPrintf(&err_accum, "%s.%s", def->name, col_name);
@@ -3751,11 +3732,11 @@ static bool
 collationMatch(struct coll *coll, struct Index *index)
 {
        assert(coll != NULL);
-       for (int i = 0; i < index->nColumn; i++) {
-               uint32_t id;
-               struct coll *idx_coll = sql_index_collation(index, i, &id);
-               assert(idx_coll != 0 || index->aiColumn[i] < 0);
-               if (index->aiColumn[i] >= 0 && coll == idx_coll)
+       struct key_part *part = index->def->key_def->parts;
+       for (uint32_t i = 0; i < index->def->key_def->part_count; i++, part++) {
+               struct coll *idx_coll = part->coll;
+               assert(idx_coll != NULL);
+               if (coll == idx_coll)
                        return true;
        }
        return false;
diff --git a/src/box/sql/delete.c b/src/box/sql/delete.c
index 5a799714d..5a7cf7652 100644
--- a/src/box/sql/delete.c
+++ b/src/box/sql/delete.c
@@ -268,11 +268,12 @@ sql_table_delete_from(struct Parse *parse, struct SrcList 
*tab_list,
/* Extract the primary key for the current row */
                if (!is_view) {
-                       for (int i = 0; i < pk_len; i++) {
+                       struct key_part *part = pk_def->parts;
+                       for (int i = 0; i < pk_len; i++, part++) {
                                struct space_def *def = space->def;
                                sqlite3ExprCodeGetColumnOfTable(v, def,
                                                                tab_cursor,
-                                                               
pk_def->parts[i].fieldno,
+                                                               part->fieldno,
                                                                reg_pk + i);
                        }
                } else {
@@ -568,13 +569,14 @@ sql_generate_index_key(struct Parse *parse, struct Index 
*index, int cursor,
                        *part_idx_label = 0;
                }
        }
-       int col_cnt = index_column_count(index);
+       int col_cnt = index->def->key_def->part_count;
        int reg_base = sqlite3GetTempRange(parse, col_cnt);
        if (prev != NULL && (reg_base != reg_prev ||
                             prev->pPartIdxWhere != NULL))
                prev = NULL;
        for (int j = 0; j < col_cnt; j++) {
-               if (prev != NULL && prev->aiColumn[j] == index->aiColumn[j]) {
+               if (prev != NULL && prev->def->key_def->parts[j].fieldno ==
+                                   index->def->key_def->parts[j].fieldno) {
                        /*
                         * This column was already computed by the
                         * previous index.
diff --git a/src/box/sql/expr.c b/src/box/sql/expr.c
index 70e134f21..00d8dedd8 100644
--- a/src/box/sql/expr.c
+++ b/src/box/sql/expr.c
@@ -2405,21 +2405,28 @@ sqlite3FindInIndex(Parse * pParse,      /* Parsing 
context */
                             pIdx = pIdx->pNext) {
                                Bitmask colUsed; /* Columns of the index used */
                                Bitmask mCol;   /* Mask for the current column 
*/
-                               if (pIdx->nColumn < nExpr)
+                               uint32_t part_count =
+                                       pIdx->def->key_def->part_count;
+                               struct key_part *parts =
+                                       pIdx->def->key_def->parts;
+                               if ((int)part_count < nExpr)
                                        continue;
                                /* Maximum nColumn is BMS-2, not BMS-1, so that 
we can compute
                                 * BITMASK(nExpr) without overflowing
                                 */
-                               testcase(pIdx->nColumn == BMS - 2);
-                               testcase(pIdx->nColumn == BMS - 1);
-                               if (pIdx->nColumn >= BMS - 1)
+                               testcase(part_count == BMS - 2);
+                               testcase(part_count == BMS - 1);
+                               if (part_count >= BMS - 1)
+                                       continue;
+                               if (mustBeUnique &&
+                                   ((int)part_count > nExpr ||
+                                    !pIdx->def->opts.is_unique)) {
+                                       /*
+                                        * This index is not
+                                        * unique over the IN RHS
+                                        * columns.
+                                        */
                                        continue;
-                               if (mustBeUnique) {
-                                       if (pIdx->nColumn > nExpr
-                                           || (pIdx->nColumn > nExpr
-                                           && !index_is_unique(pIdx))) {
-                                                       continue;       /* This 
index is not unique over the IN RHS columns */
-                                       }
                                }
colUsed = 0; /* Columns of index used so far */
@@ -2432,16 +2439,15 @@ sqlite3FindInIndex(Parse * pParse,      /* Parsing 
context */
                                        int j;
for (j = 0; j < nExpr; j++) {
-                                               if (pIdx->aiColumn[j] !=
-                                                   pRhs->iColumn) {
+                                               if ((int) parts[j].fieldno !=
+                                                   pRhs->iColumn)
                                                        continue;
-                                               }
-                                               struct coll *idx_coll;
-                                               idx_coll = 
sql_index_collation(pIdx, j, &id);
+
+                                               struct coll *idx_coll =
+                                                            parts[j].coll;
                                                if (pReq != NULL &&
-                                                   pReq != idx_coll) {
+                                                   pReq != idx_coll)
                                                        continue;
-                                               }
                                                break;
                                        }
                                        if (j == nExpr)
@@ -2466,18 +2472,17 @@ sqlite3FindInIndex(Parse * pParse,      /* Parsing 
context */
                                                          0, 0, 0,
                                                          sqlite3MPrintf(db,
                                                          "USING INDEX %s FOR 
IN-OPERATOR",
-                                                         pIdx->zName),
+                                                         pIdx->def->name),
                                                          P4_DYNAMIC);
                                        struct space *space =
                                                
space_by_id(SQLITE_PAGENO_TO_SPACEID(pIdx->tnum));
                                        vdbe_emit_open_cursor(pParse, iTab,
                                                              pIdx->tnum, 
space);
-                                       VdbeComment((v, "%s", pIdx->zName));
+                                       VdbeComment((v, "%s", pIdx->def->name));
                                        assert(IN_INDEX_INDEX_DESC ==
                                               IN_INDEX_INDEX_ASC + 1);
                                        eType = IN_INDEX_INDEX_ASC +
-                                               
sql_index_column_sort_order(pIdx,
-                                                                           0);
+                                               parts[0].sort_order;
if (prRhsHasNull) {
 #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
@@ -2499,7 +2504,7 @@ sqlite3FindInIndex(Parse * pParse,        /* Parsing 
context */
                                                        /* Tarantool: Check for 
null is performed on first key of the index.  */
                                                        sqlite3SetHasNullFlag(v,
                                                                              
iTab,
-                                                                             
pIdx->aiColumn[0],
+                                                                             
parts[0].fieldno,
                                                                              
*prRhsHasNull);
                                                }
                                        }
@@ -3130,12 +3135,12 @@ sqlite3ExprCodeIN(Parse * pParse,       /* Parsing and 
code generating context */
                struct Index *pk = sqlite3PrimaryKeyIndex(tab);
                assert(pk);
+ uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
                enum affinity_type affinity =
-                       tab->def->fields[pk->aiColumn[0]].affinity;
-               if (pk->nColumn == 1
-                   && affinity == AFFINITY_INTEGER
-                   && pk->aiColumn[0] < nVector) {
-                       int reg_pk = rLhs + pk->aiColumn[0];
+                       tab->def->fields[fieldno].affinity;
+               if (pk->def->key_def->part_count == 1 &&
+                   affinity == AFFINITY_INTEGER && (int)fieldno < nVector) {
+                       int reg_pk = rLhs + (int)fieldno;
                        sqlite3VdbeAddOp2(v, OP_MustBeInt, reg_pk, destIfFalse);
                }
        }
@@ -3467,7 +3472,7 @@ sqlite3ExprCodeLoadIndexColumn(Parse * pParse,    /* The 
parsing context */
                               int regOut       /* Store the index column value 
in this register */
     )
 {
-       i16 iTabCol = pIdx->aiColumn[iIdxCol];
+       i16 iTabCol = pIdx->def->key_def->parts[iIdxCol].fieldno;
        sqlite3ExprCodeGetColumnOfTable(pParse->pVdbe, pIdx->pTable->def,
                                        iTabCur, iTabCol, regOut);
 }
diff --git a/src/box/sql/fkey.c b/src/box/sql/fkey.c
index 6c75c4772..2b96bde7e 100644
--- a/src/box/sql/fkey.c
+++ b/src/box/sql/fkey.c
@@ -213,7 +213,6 @@ sqlite3FkLocateIndex(Parse * pParse,        /* Parse 
context to store any error in */
                     int **paiCol       /* OUT: Map of index columns in pFKey */
     )
 {
-       Index *pIdx = 0;        /* Value to return via *ppIdx */
        int *aiCol = 0;         /* Value to return via *paiCol */
        int nCol = pFKey->nCol;      /* Number of columns in parent key */
        char *zKey = pFKey->aCol[0].zCol;    /* Name of left-most parent key 
column */
@@ -255,83 +254,86 @@ sqlite3FkLocateIndex(Parse * pParse,      /* Parse 
context to store any error in */
                *paiCol = aiCol;
        }
- for (pIdx = pParent->pIndex; pIdx; pIdx = pIdx->pNext) {
-               int nIdxCol = index_column_count(pIdx);
-               if (nIdxCol == nCol && index_is_unique(pIdx)
-                   && pIdx->pPartIdxWhere == 0) {
-                       /* pIdx is a UNIQUE index (or a PRIMARY KEY) and has 
the right number
-                        * of columns. If each indexed column corresponds to a 
foreign key
-                        * column of pFKey, then this index is a winner.
+       struct Index *index = NULL;
+       for (index = pParent->pIndex; index != NULL; index = index->pNext) {
+               int part_count = index->def->key_def->part_count;
+               if (part_count != nCol || !index->def->opts.is_unique ||
+                   index->pPartIdxWhere != NULL)
+                       continue;
+               /*
+                * Index is a UNIQUE index (or a PRIMARY KEY) and
+                * has the right number of columns. If each
+                * indexed column corresponds to a foreign key
+                * column of pFKey, then this index is a winner.
+                */
+               if (zKey == NULL) {
+                       /*
+                        * If zKey is NULL, then this foreign key
+                        * is implicitly mapped to the PRIMARY KEY
+                        * of table pParent. The PRIMARY KEY index
+                        * may be identified by the test.
                         */
-
-                       if (zKey == 0) {
-                               /* If zKey is NULL, then this foreign key is 
implicitly mapped to
-                                * the PRIMARY KEY of table pParent. The 
PRIMARY KEY index may be
-                                * identified by the test.
-                                */
-                               if (IsPrimaryKeyIndex(pIdx)) {
-                                       if (aiCol) {
-                                               int i;
-                                               for (i = 0; i < nCol; i++)
-                                                       aiCol[i] =
-                                                           pFKey->aCol[i].
-                                                           iFrom;
-                                       }
-                                       break;
+                       if (IsPrimaryKeyIndex(index)) {
+                               if (aiCol != NULL) {
+                                       for (int i = 0; i < nCol; i++)
+                                               aiCol[i] = pFKey->aCol[i].iFrom;
                                }
-                       } else {
-                               /* If zKey is non-NULL, then this foreign key 
was declared to
-                                * map to an explicit list of columns in table 
pParent. Check if this
-                                * index matches those columns. Also, check 
that the index uses
-                                * the default collation sequences for each 
column.
+                               break;
+                       }
+               } else {
+                       /*
+                        * If zKey is non-NULL, then this foreign
+                        * key was declared to map to an explicit
+                        * list of columns in table pParent. Check
+                        * if this index matches those columns.
+                        * Also, check that the index uses the
+                        * default collation sequences for each
+                        * column.
+                        */
+                       int i, j;
+                       struct key_part *part = index->def->key_def->parts;
+                       for (i = 0; i < nCol; i++, part++) {
+                               /*
+                                * Index of column in parent
+                                * table.
                                 */
-                               int i, j;
-                               for (i = 0; i < nCol; i++) {
-                                       i16 iCol = pIdx->aiColumn[i];        /* 
Index of column in parent tbl */
-                                       char *zIdxCol;  /* Name of indexed 
column */
-
-                                       if (iCol < 0)
-                                               break;  /* No foreign keys 
against expression indexes */
-
-                                       /* If the index uses a collation 
sequence that is different from
-                                        * the default collation sequence for 
the column, this index is
-                                        * unusable. Bail out early in this 
case.
-                                        */
-                                       struct coll *def_coll;
-                                       uint32_t id;
-                                       def_coll = 
sql_column_collation(pParent->def,
-                                                                       iCol,
-                                                                       &id);
-                                       struct coll *coll =
-                                               sql_index_collation(pIdx, i,
-                                                                   &id);
-                                       if (def_coll != coll)
-                                               break;
-
-                                       zIdxCol =
-                                               pParent->def->fields[iCol].name;
-                                       for (j = 0; j < nCol; j++) {
-                                               if (strcmp
-                                                   (pFKey->aCol[j].zCol,
-                                                    zIdxCol) == 0) {
-                                                       if (aiCol)
-                                                               aiCol[i] =
-                                                                   pFKey->
-                                                                   aCol[j].
-                                                                   iFrom;
-                                                       break;
-                                               }
-                                       }
-                                       if (j == nCol)
-                                               break;
+                               i16 iCol = (int) part->fieldno;
+                               /*
+                                * If the index uses a collation
+                                * sequence that is different from
+                                * the default collation sequence
+                                * for the column, this index is
+                                * unusable. Bail out early in
+                                * this case.
+                                */
+                               uint32_t id;
+                               struct coll *def_coll =
+                                       sql_column_collation(pParent->def,
+                                                            iCol, &id);
+                               struct coll *coll = part->coll;
+                               if (def_coll != coll)
+                                       break;
+
+                               char *zIdxCol = pParent->def->fields[iCol].name;
+                               for (j = 0; j < nCol; j++) {
+                                       if (strcmp(pFKey->aCol[j].zCol,
+                                                  zIdxCol) != 0)
+                                               continue;
+                                       if (aiCol)
+                                               aiCol[i] = pFKey->aCol[j].iFrom;
+                                       break;
                                }
-                               if (i == nCol)
-                                       break;  /* pIdx is usable */
+                               if (j == nCol)
+                                       break;
+                       }
+                       if (i == nCol) {
+                               /* Index is usable. */
+                               break;
                        }
                }
        }
- if (!pIdx) {
+       if (index == NULL) {
                if (!pParse->disableTriggers) {
                        sqlite3ErrorMsg(pParse,
                                        "foreign key mismatch - \"%w\" referencing 
\"%w\"",
@@ -341,7 +343,7 @@ sqlite3FkLocateIndex(Parse * pParse,        /* Parse 
context to store any error in */
                return 1;
        }
- *ppIdx = pIdx;
+       *ppIdx = index;
        return 0;
 }
@@ -460,17 +462,19 @@ fkLookupParent(Parse * pParse, /* Parse context */
                         */
                        if (pTab == pFKey->pFrom && nIncr == 1) {
                                int iJump =
-                                   sqlite3VdbeCurrentAddr(v) + nCol + 1;
-                               for (i = 0; i < nCol; i++) {
+                                       sqlite3VdbeCurrentAddr(v) + nCol + 1;
+                               struct key_part *part =
+                                       pIdx->def->key_def->parts;
+                               for (i = 0; i < nCol; ++i, ++part) {
                                        int iChild = aiCol[i] + 1 + regData;
-                                       int iParent =
-                                           pIdx->aiColumn[i] + 1 + regData;
-                                       assert(pIdx->aiColumn[i] >= 0);
+                                       int iParent = 1 + regData +
+                                                     (int)part->fieldno;
                                        assert(aiCol[i] != pTab->iPKey);
-                                       if (pIdx->aiColumn[i] == pTab->iPKey) {
+                                       if ((int)part->fieldno == pTab->iPKey) {
                                                /* The parent key is a 
composite key that includes the IPK column */
                                                iParent = regData;
                                        }
+
                                        sqlite3VdbeAddOp3(v, OP_Ne, iChild,
                                                          iJump, iParent);
                                        VdbeCoverage(v);
@@ -614,7 +618,6 @@ fkScanChildren(Parse * pParse,      /* Parse context */
     )
 {
        sqlite3 *db = pParse->db;    /* Database handle */
-       int i;                  /* Iterator variable */
        Expr *pWhere = 0;       /* WHERE clause to scan with */
        NameContext sNameContext;       /* Context used to resolve WHERE clause 
*/
        WhereInfo *pWInfo;      /* Context used by sqlite3WhereXXX() */
@@ -622,7 +625,7 @@ fkScanChildren(Parse * pParse,      /* Parse context */
        Vdbe *v = sqlite3GetVdbe(pParse);
assert(pIdx == 0 || pIdx->pTable == pTab);
-       assert(pIdx == 0 || (int)index_column_count(pIdx) == pFKey->nCol);
+       assert(pIdx == 0 || (int) pIdx->def->key_def->part_count == 
pFKey->nCol);
        assert(pIdx != 0);
if (nIncr < 0) {
@@ -639,19 +642,20 @@ fkScanChildren(Parse * pParse,    /* Parse context */
         * the parent key columns. The affinity of the parent key column should
         * be applied to each child key value before the comparison takes place.
         */
-       for (i = 0; i < pFKey->nCol; i++) {
+       for (int i = 0; i < pFKey->nCol; i++) {
                Expr *pLeft;    /* Value from parent table row */
                Expr *pRight;   /* Column ref to child table */
                Expr *pEq;      /* Expression (pLeft = pRight) */
                i16 iCol;       /* Index of column in child table */
-               const char *zCol;       /* Name of column in child table */
+               const char *column_name;
- iCol = pIdx ? pIdx->aiColumn[i] : -1;
+               iCol = pIdx != NULL ?
+                      (int) pIdx->def->key_def->parts[i].fieldno : -1;
                pLeft = exprTableRegister(pParse, pTab, regData, iCol);
                iCol = aiCol ? aiCol[i] : pFKey->aCol[0].iFrom;
                assert(iCol >= 0);
-               zCol = pFKey->pFrom->def->fields[iCol].name;
-               pRight = sqlite3Expr(db, TK_ID, zCol);
+               column_name = pFKey->pFrom->def->fields[iCol].name;
+               pRight = sqlite3Expr(db, TK_ID, column_name);
                pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
                pWhere = sqlite3ExprAnd(db, pWhere, pEq);
        }
@@ -670,15 +674,14 @@ fkScanChildren(Parse * pParse,    /* Parse context */
Expr *pEq, *pAll = 0;
                Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-               assert(pIdx != 0);
-               int col_count = index_column_count(pPk);
-               for (i = 0; i < col_count; i++) {
-                       i16 iCol = pIdx->aiColumn[i];
-                       assert(iCol >= 0);
-                       pLeft = exprTableRegister(pParse, pTab, regData, iCol);
-                       pRight =
-                               exprTableColumn(db, pTab->def,
-                                               pSrc->a[0].iCursor, iCol);
+               assert(pIdx != NULL);
+               uint32_t part_count = pPk->def->key_def->part_count;
+               for (uint32_t i = 0; i < part_count; i++) {
+                       uint32_t fieldno = pIdx->def->key_def->parts[i].fieldno;
+                       pLeft = exprTableRegister(pParse, pTab, regData,
+                                                 fieldno);
+                       pRight = exprTableColumn(db, pTab->def,
+                                                pSrc->a[0].iCursor, fieldno);
                        pEq = sqlite3PExpr(pParse, TK_EQ, pLeft, pRight);
                        pAll = sqlite3ExprAnd(db, pAll, pEq);
                }
@@ -983,7 +986,6 @@ sqlite3FkCheck(Parse * pParse,      /* Parse context */
                        if (aiCol[i] == pTab->iPKey) {
                                aiCol[i] = -1;
                        }
-                       assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
                }
pParse->nTab++;
@@ -1108,19 +1110,19 @@ sqlite3FkOldmask(Parse * pParse,        /* Parse 
context */
if (user_session->sql_flags & SQLITE_ForeignKeys) {
                FKey *p;
-               int i;
                for (p = pTab->pFKey; p; p = p->pNextFrom) {
-                       for (i = 0; i < p->nCol; i++)
+                       for (int i = 0; i < p->nCol; i++)
                                mask |= COLUMN_MASK(p->aCol[i].iFrom);
                }
                for (p = sqlite3FkReferences(pTab); p; p = p->pNextTo) {
                        Index *pIdx = 0;
                        sqlite3FkLocateIndex(pParse, pTab, p, &pIdx, 0);
-                       if (pIdx) {
-                               int nIdxCol = index_column_count(pIdx);
-                               for (i = 0; i < nIdxCol; i++) {
-                                       assert(pIdx->aiColumn[i] >= 0);
-                                       mask |= COLUMN_MASK(pIdx->aiColumn[i]);
+                       if (pIdx != NULL) {
+                               uint32_t part_count =
+                                       pIdx->def->key_def->part_count;
+                               for (uint32_t i = 0; i < part_count; i++) {
+                                       mask |= COLUMN_MASK(pIdx->def->
+                                               key_def->parts[i].fieldno);
                                }
                        }
                }
@@ -1264,11 +1266,12 @@ fkActionTrigger(struct Parse *pParse, struct Table 
*pTab, struct FKey *pFKey,
                               || (pTab->iPKey >= 0
                                   && pTab->iPKey <
                                      (int)pTab->def->field_count));
-                       assert(pIdx == 0 || pIdx->aiColumn[i] >= 0);
+
+                       uint32_t fieldno = pIdx != NULL ?
+                                          pIdx->def->key_def->parts[i].fieldno 
:
+                                          (uint32_t)pTab->iPKey;
                        sqlite3TokenInit(&tToCol,
-                                        pTab->def->fields[pIdx ? pIdx->
-                                                   aiColumn[i] : pTab->iPKey].
-                                        name);
+                                        pTab->def->fields[fieldno].name);
                        sqlite3TokenInit(&tFromCol,
                                         pFKey->pFrom->def->fields[
                                                iFromCol].name);
diff --git a/src/box/sql/insert.c b/src/box/sql/insert.c
index c12043bde..762caaee5 100644
--- a/src/box/sql/insert.c
+++ b/src/box/sql/insert.c
@@ -89,14 +89,14 @@ sqlite3IndexAffinityStr(sqlite3 *db, Index *index)
         * sqliteDeleteIndex() when the Index structure itself is
         * cleaned up.
         */
-       int column_count = index_column_count(index);
+       int column_count = index->def->key_def->part_count;
        index->zColAff = (char *) sqlite3DbMallocRaw(0, column_count + 1);
        if (index->zColAff == NULL) {
                sqlite3OomFault(db);
                return NULL;
        }
        for (int n = 0; n < column_count; n++) {
-               uint16_t x = index->aiColumn[n];
+               uint16_t x = index->def->key_def->parts[n].fieldno;
                index->zColAff[n] = index->pTable->def->fields[x].affinity;
        }
        index->zColAff[column_count] = 0;
@@ -647,7 +647,7 @@ sqlite3Insert(Parse * pParse,       /* Parser context */
                     pIdx = pIdx->pNext, i++) {
                        assert(pIdx);
                        aRegIdx[i] = ++pParse->nMem;
-                       pParse->nMem += index_column_count(pIdx);
+                       pParse->nMem += pIdx->def->key_def->part_count;
                }
        }
@@ -1069,12 +1069,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse, /* The parser context */
        Index *pIdx;            /* Pointer to one of the indices */
        Index *pPk = 0;         /* The PRIMARY KEY index */
        sqlite3 *db;            /* Database connection */
-       int i;                  /* loop counter */
-       int ix;                 /* Index loop counter */
-       int nCol;               /* Number of columns */
        int addr1;              /* Address of jump instruction */
        int seenReplace = 0;    /* True if REPLACE is used to resolve INT PK 
conflict */
-       int nPkField;           /* Number of fields in PRIMARY KEY. */
        u8 isUpdate;            /* True if this is an UPDATE operation */
        u8 bAffinityDone = 0;   /* True if the OP_Affinity operation has been 
run */
        struct session *user_session = current_session();
@@ -1086,10 +1082,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,          
/* The parser context */
        struct space_def *def = pTab->def;
        /* This table is not a VIEW */
        assert(!def->opts.is_view);
-       nCol = def->field_count;
pPk = sqlite3PrimaryKeyIndex(pTab);
-       nPkField = index_column_count(pPk);
/* Record that this module has started */
        VdbeModuleComment((v, "BEGIN: GenCnstCks(%d,%d,%d,%d,%d)",
@@ -1099,17 +1093,16 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         
/* The parser context */
        enum on_conflict_action on_error;
        /* Test all NOT NULL constraints.
         */
-       for (i = 0; i < nCol; i++) {
-               if (i == pTab->iPKey) {
+       for (uint32_t i = 0; i < def->field_count; i++) {
+               if ((int) i == pTab->iPKey)
                        continue;
-               }
                if (aiChng && aiChng[i] < 0) {
                        /* Don't bother checking for NOT NULL on columns that 
do not change */
                        continue;
                }
                if (def->fields[i].is_nullable ||
                    (pTab->tabFlags & TF_Autoincrement &&
-                    pTab->iAutoIncPKey == i))
+                    pTab->iAutoIncPKey == (int) i))
                        continue;       /* This column is allowed to be NULL */
on_error = table_column_nullable_action(pTab, i);
@@ -1179,7 +1172,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           
/* The parser context */
                else
                        on_error = ON_CONFLICT_ACTION_ABORT;
- for (i = 0; i < checks->nExpr; i++) {
+               for (int i = 0; i < checks->nExpr; i++) {
                        int allOk;
                        Expr *pExpr = checks->a[i].pExpr;
                        if (aiChng
@@ -1206,13 +1199,16 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         
/* The parser context */
                }
        }
- /* Test all UNIQUE constraints by creating entries for each UNIQUE
-        * index and making sure that duplicate entries do not already exist.
-        * Compute the revised record entries for indices as we go.
+       /*
+        * Test all UNIQUE constraints by creating entries for
+        * each UNIQUE index and making sure that duplicate entries
+        * do not already exist. Compute the revised record entries
+        * for indices as we go.
         *
         * This loop also handles the case of the PRIMARY KEY index.
         */
-       for (ix = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, ix++) {
+       pIdx = pTab->pIndex;
+       for (int ix = 0; pIdx != NULL; pIdx = pIdx->pNext, ix++) {
                int regIdx;     /* Range of registers hold conent for pIdx */
                int regR;       /* Range of registers holding conflicting PK */
                int iThisCur;   /* Cursor for this UNIQUE index */
@@ -1253,10 +1249,11 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         
/* The parser context */
                 * the insert or update.  Store that record in the aRegIdx[ix] 
register
                 */
                regIdx = aRegIdx[ix] + 1;
-               int nIdxCol = (int) index_column_count(pIdx);
+               uint32_t part_count = pIdx->def->key_def->part_count;
                if (uniqueByteCodeNeeded) {
-                       for (i = 0; i < nIdxCol; ++i) {
-                               int fieldno = pIdx->aiColumn[i];
+                       for (uint32_t i = 0; i < part_count; ++i) {
+                               uint32_t fieldno =
+                                       pIdx->def->key_def->parts[i].fieldno;
                                int reg;
                                /*
                                 * OP_SCopy copies value in
@@ -1267,11 +1264,10 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         
/* The parser context */
                                 * needed for proper UNIQUE
                                 * constraint handling.
                                 */
-                               if (fieldno == pTab->iPKey)
+                               if ((int) fieldno == pTab->iPKey)
                                        reg = regNewData;
                                else
                                        reg = fieldno + regNewData + 1;
-                               assert(fieldno >= 0);
                                sqlite3VdbeAddOp2(v, OP_SCopy, reg, regIdx + i);
                                VdbeComment((v, "%s",
                                            def->fields[fieldno].name));
@@ -1283,9 +1279,12 @@ sqlite3GenerateConstraintChecks(Parse * pParse,          
/* The parser context */
                if (IsPrimaryKeyIndex(pIdx)) {
                        /* If PK is marked as INTEGER, use it as strict type,
                         * not as affinity. Emit code for type checking */
-                       if (nIdxCol == 1) {
-                               reg_pk = regNewData + 1 + pIdx->aiColumn[0];
-                               if (pTab->zColAff[pIdx->aiColumn[0]] ==
+                       if (part_count == 1) {
+                               uint32_t fieldno =
+                                       pIdx->def->key_def->parts[0].fieldno;
+                               reg_pk = regNewData + 1 + fieldno;
+
+                               if (pTab->zColAff[fieldno] ==
                                    AFFINITY_INTEGER) {
                                        int skip_if_null = 
sqlite3VdbeMakeLabel(v);
                                        if ((pTab->tabFlags & TF_Autoincrement) 
!= 0) {
@@ -1303,7 +1302,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           
/* The parser context */
sqlite3VdbeAddOp3(v, OP_MakeRecord, regNewData + 1,
                                          def->field_count, aRegIdx[ix]);
-                       VdbeComment((v, "for %s", pIdx->zName));
+                       VdbeComment((v, "for %s", pIdx->def->name));
                }
/* In an UPDATE operation, if this index is the PRIMARY KEY
@@ -1390,24 +1389,22 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         
/* The parser context */
                if (uniqueByteCodeNeeded) {
                        sqlite3VdbeAddOp4Int(v, OP_NoConflict, iThisCur,
                                             addrUniqueOk, regIdx,
-                                            index_column_count(pIdx));
+                                            pIdx->def->key_def->part_count);
                }
                VdbeCoverage(v);
+ const uint32_t pk_part_count = pPk->def->key_def->part_count;
                /* Generate code to handle collisions */
-               regR =
-                   (pIdx == pPk) ? regIdx : sqlite3GetTempRange(pParse,
-                                                                nPkField);
+               regR = pIdx == pPk ? regIdx :
+                      sqlite3GetTempRange(pParse, pk_part_count);
                if (isUpdate || on_error == ON_CONFLICT_ACTION_REPLACE) {
                        int x;
-                       int nPkCol = index_column_count(pPk);
                        /* Extract the PRIMARY KEY from the end of the index 
entry and
                         * store it in registers regR..regR+nPk-1
                         */
                        if (pIdx != pPk) {
-                               for (i = 0; i < nPkCol; i++) {
-                                       assert(pPk->aiColumn[i] >= 0);
-                                       x = pPk->aiColumn[i];
+                               for (uint32_t i = 0; i < pk_part_count; i++) {
+                                       x = pPk->def->key_def->parts[i].fieldno;
                                        sqlite3VdbeAddOp3(v, OP_Column,
                                                          iThisCur, x, regR + 
i);
                                        VdbeComment((v, "%s.%s", def->name,
@@ -1422,22 +1419,25 @@ sqlite3GenerateConstraintChecks(Parse * pParse,         
/* The parser context */
                                 * of the matched index row are different from 
the original PRIMARY
                                 * KEY values of this row before the update.
                                 */
-                               int addrJump =
-                                       sqlite3VdbeCurrentAddr(v) + nPkCol;
+                               int addrJump = sqlite3VdbeCurrentAddr(v) +
+                                              pk_part_count;
                                int op = OP_Ne;
-                               int regCmp = (IsPrimaryKeyIndex(pIdx) ?
-                                             regIdx : regR);
-
-                               for (i = 0; i < nPkCol; i++) {
-                                       uint32_t id;
-                                       char *p4 = (char *)sql_index_collation(pPk, 
i, &id);
-                                       x = pPk->aiColumn[i];
-                                       assert(x >= 0);
-                                       if (i == (nPkCol - 1)) {
+                               int regCmp = IsPrimaryKeyIndex(pIdx) ?
+                                            regIdx : regR;
+                               struct key_part *part =
+                                       pPk->def->key_def->parts;
+                               for (uint32_t i = 0; i < pk_part_count;
+                                    ++i, ++part) {
+                                       char *p4 = (char *) part->coll;
+                                       x = part->fieldno;
+                                       if (pPk->tnum==0)
+                                               x = -1;
+                                       if (i == (pk_part_count - 1)) {
                                                addrJump = addrUniqueOk;
                                                op = OP_Eq;
                                        }
-                                       sqlite3VdbeAddOp4(v, op, regOldData + 1 
+ x,
+                                       sqlite3VdbeAddOp4(v, op,
+                                                         regOldData + 1 + x,
                                                          addrJump, regCmp + i,
                                                          p4, P4_COLLSEQ);
                                        sqlite3VdbeChangeP5(v, SQLITE_NOTNULL);
@@ -1480,7 +1480,8 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           
/* The parser context */
                                                              NULL, NULL);
                        }
                        sql_generate_row_delete(pParse, pTab, trigger,
-                                               iDataCur, regR, nPkField, false,
+                                               iDataCur, regR, pk_part_count,
+                                               false,
                                                ON_CONFLICT_ACTION_REPLACE,
                                                pIdx == pPk ? ONEPASS_SINGLE :
                                                ONEPASS_OFF, -1);
@@ -1490,7 +1491,7 @@ sqlite3GenerateConstraintChecks(Parse * pParse,           
/* The parser context */
                }
                sqlite3VdbeResolveLabel(v, addrUniqueOk);
                if (regR != regIdx)
-                       sqlite3ReleaseTempRange(pParse, regR, nPkField);
+                       sqlite3ReleaseTempRange(pParse, regR, pk_part_count);
        }
*pbMayReplace = seenReplace;
@@ -1608,8 +1609,8 @@ sqlite3OpenTableAndIndices(Parse * pParse,        /* 
Parsing context */
                    IsPrimaryKeyIndex(pIdx) ||          /* Condition 2 */
                    sqlite3FkReferences(pTab) ||        /* Condition 3 */
                    /* Condition 4 */
-                   (index_is_unique(pIdx) && pIdx->onError !=
-                    ON_CONFLICT_ACTION_DEFAULT &&
+                   (pIdx->def->opts.is_unique &&
+                    pIdx->onError != ON_CONFLICT_ACTION_DEFAULT &&
                     /* Condition 4.1 */
                     pIdx->onError != ON_CONFLICT_ACTION_ABORT) ||
                     /* Condition 4.2 */
@@ -1627,7 +1628,7 @@ sqlite3OpenTableAndIndices(Parse * pParse,        /* 
Parsing context */
                                                  space_ptr_reg);
                                sql_vdbe_set_p4_key_def(pParse, pIdx);
                                sqlite3VdbeChangeP5(v, p5);
-                               VdbeComment((v, "%s", pIdx->zName));
+                               VdbeComment((v, "%s", pIdx->def->name));
                        }
                }
        }
@@ -1661,30 +1662,25 @@ int sqlite3_xferopt_count;
 static int
 xferCompatibleIndex(Index * pDest, Index * pSrc)
 {
-       uint32_t i;
        assert(pDest && pSrc);
        assert(pDest->pTable != pSrc->pTable);
-       uint32_t nDestCol = index_column_count(pDest);
-       uint32_t nSrcCol = index_column_count(pSrc);
-       if (nDestCol != nSrcCol) {
-               return 0;       /* Different number of columns */
-       }
+       uint32_t dest_idx_part_count = pDest->def->key_def->part_count;
+       uint32_t src_idx_part_count = pSrc->def->key_def->part_count;
+       if (dest_idx_part_count != src_idx_part_count)
+               return 0;
        if (pDest->onError != pSrc->onError) {
                return 0;       /* Different conflict resolution strategies */
        }
-       for (i = 0; i < nSrcCol; i++) {
-               if (pSrc->aiColumn[i] != pDest->aiColumn[i]) {
+       struct key_part *src_part = pSrc->def->key_def->parts;
+       struct key_part *dest_part = pDest->def->key_def->parts;
+       for (uint32_t i = 0; i < src_idx_part_count;
+            ++i, ++src_part, ++dest_part) {
+               if (src_part->fieldno != dest_part->fieldno)
                        return 0;       /* Different columns indexed */
-               }
-               if (sql_index_column_sort_order(pSrc, i) !=
-                   sql_index_column_sort_order(pDest, i)) {
+               if (src_part->sort_order != dest_part->sort_order)
                        return 0;       /* Different sort orders */
-               }
-               uint32_t id;
-               if (sql_index_collation(pSrc, i, &id) !=
-                   sql_index_collation(pDest, i, &id)) {
+               if (src_part->coll != dest_part->coll)
                        return 0;       /* Different collating sequences */
-               }
        }
        if (sqlite3ExprCompare(pSrc->pPartIdxWhere, pDest->pPartIdxWhere, -1)) {
                return 0;       /* Different WHERE clauses */
@@ -1856,16 +1852,15 @@ xferOptimization(Parse * pParse,        /* Parser 
context */
                }
        }
        for (pDestIdx = pDest->pIndex; pDestIdx; pDestIdx = pDestIdx->pNext) {
-               if (index_is_unique(pDestIdx)) {
+               if (pDestIdx->def->opts.is_unique)
                        destHasUniqueIdx = 1;
-               }
                for (pSrcIdx = pSrc->pIndex; pSrcIdx; pSrcIdx = pSrcIdx->pNext) 
{
                        if (xferCompatibleIndex(pDestIdx, pSrcIdx))
                                break;
                }
-               if (pSrcIdx == 0) {
-                       return 0;       /* pDestIdx has no corresponding index 
in pSrc */
-               }
+               /* pDestIdx has no corresponding index in pSrc. */
+               if (pSrcIdx == NULL)
+                       return 0;
        }
        /* Get server checks. */
        ExprList *pCheck_src = space_checks_expr_list(
@@ -1941,11 +1936,11 @@ xferOptimization(Parse * pParse,        /* Parser 
context */
                struct space *src_space =
                        space_by_id(SQLITE_PAGENO_TO_SPACEID(pSrcIdx->tnum));
                vdbe_emit_open_cursor(pParse, iSrc, pSrcIdx->tnum, src_space);
-               VdbeComment((v, "%s", pSrcIdx->zName));
+               VdbeComment((v, "%s", pSrcIdx->def->name));
                struct space *dest_space =
                        space_by_id(SQLITE_PAGENO_TO_SPACEID(pDestIdx->tnum));
                vdbe_emit_open_cursor(pParse, iDest, pDestIdx->tnum, 
dest_space);
-               VdbeComment((v, "%s", pDestIdx->zName));
+               VdbeComment((v, "%s", pDestIdx->def->name));
                addr1 = sqlite3VdbeAddOp2(v, OP_Rewind, iSrc, 0);
                VdbeCoverage(v);
                sqlite3VdbeAddOp2(v, OP_RowData, iSrc, regData);
diff --git a/src/box/sql/pragma.c b/src/box/sql/pragma.c
index be6a01c23..db0b9a482 100644
--- a/src/box/sql/pragma.c
+++ b/src/box/sql/pragma.c
@@ -369,9 +369,8 @@ sqlite3Pragma(Parse * pParse, Token * pId,  /* First part 
of [schema.]id field */
                        } else if (pk == NULL) {
                                k = 1;
                        } else {
-                               for (k = 1; k <= def->field_count &&
-                                    pk->aiColumn[k - 1] != (int) i; ++k) {
-                               }
+                               struct key_def *kdef = pk->def->key_def;
+                               k = key_def_find(kdef, i) - kdef->parts + 1;
                        }
                        bool is_nullable = def->fields[i].is_nullable;
                        char *expr_str = def->fields[i].default_value;
@@ -414,7 +413,7 @@ sqlite3Pragma(Parse * pParse, Token * pId,  /* First part 
of [schema.]id field */
                                        size_t avg_tuple_size_idx =
                                                sql_index_tuple_size(space, 
idx);
                                        sqlite3VdbeMultiLoad(v, 2, "sii",
-                                                            pIdx->zName,
+                                                            pIdx->def->name,
                                                             avg_tuple_size_idx,
                                                             
index_field_tuple_est(pIdx, 0));
                                        sqlite3VdbeAddOp2(v, OP_ResultRow, 1,
@@ -443,11 +442,13 @@ sqlite3Pragma(Parse * pParse, Token * pId,        /* 
First part of [schema.]id field */
                                                 */
                                                pParse->nMem = 3;
                                        }
-                                       mx = index_column_count(pIdx);
+                                       mx = pIdx->def->key_def->part_count;
                                        assert(pParse->nMem <=
                                               pPragma->nPragCName);
-                                       for (i = 0; i < mx; i++) {
-                                               i16 cnum = pIdx->aiColumn[i];
+                                       struct key_part *part =
+                                               pIdx->def->key_def->parts;
+                                       for (i = 0; i < mx; i++, part++) {
+                                               i16 cnum = (int) part->fieldno;
                                                assert(pIdx->pTable);
                                                sqlite3VdbeMultiLoad(v, 1,
                                                                     "iis", i,
@@ -461,19 +462,18 @@ sqlite3Pragma(Parse * pParse, Token * pId,        /* 
First part of [schema.]id field */
                                                                     name);
                                                if (pPragma->iArg) {
                                                        const char *c_n;
-                                                       uint32_t id;
+                                                       uint32_t id =
+                                                               part->coll_id;
                                                        struct coll *coll =
-                                                               
sql_index_collation(pIdx, i, &id);
+                                                               part->coll;
                                                        if (coll != NULL)
                                                                c_n = 
coll_by_id(id)->name;
                                                        else
                                                                c_n = "BINARY";
-                                                       enum sort_order 
sort_order;
-                                                       sort_order = 
sql_index_column_sort_order(pIdx,
-                                                                               
                 i);
                                                        sqlite3VdbeMultiLoad(v,
                                                                             4,
                                                                             
"isi",
+                                                                            
part->
                                                                             
sort_order,
                                                                             
c_n,
                                                                             i <
@@ -503,10 +503,8 @@ sqlite3Pragma(Parse * pParse, Token * pId, /* First part 
of [schema.]id field */
                                                    { "c", "u", "pk" };
                                                sqlite3VdbeMultiLoad(v, 1,
                                                                     "isisi", i,
-                                                                    pIdx->
-                                                                    zName,
-                                                                    
index_is_unique
-                                                                    (pIdx),
+                                                                    
pIdx->def->name,
+                                                                    
pIdx->def->opts.is_unique,
                                                                     azOrigin
                                                                     [pIdx->
                                                                      idxType],
diff --git a/src/box/sql/select.c b/src/box/sql/select.c
index 54f78a9de..adf10feca 100644
--- a/src/box/sql/select.c
+++ b/src/box/sql/select.c
@@ -4366,7 +4366,7 @@ sqlite3IndexedByLookup(Parse * pParse, struct 
SrcList_item *pFrom)
                char *zIndexedBy = pFrom->u1.zIndexedBy;
                Index *pIdx;
                for (pIdx = pTab->pIndex;
-                    pIdx && strcmp(pIdx->zName, zIndexedBy);
+                    pIdx && strcmp(pIdx->def->name, zIndexedBy);
                     pIdx = pIdx->pNext) ;
                if (!pIdx) {
                        sqlite3ErrorMsg(pParse, "no such index: %s", zIndexedBy,
diff --git a/src/box/sql/sqliteInt.h b/src/box/sql/sqliteInt.h
index e939663b6..3a560f1db 100644
--- a/src/box/sql/sqliteInt.h
+++ b/src/box/sql/sqliteInt.h
@@ -2051,28 +2051,6 @@ struct UnpackedRecord {
  * Each SQL index is represented in memory by an
  * instance of the following structure.
  *
- * The columns of the table that are to be indexed are described
- * by the aiColumn[] field of this structure.  For example, suppose
- * we have the following table and index:
- *
- *     CREATE TABLE Ex1(c1 int, c2 int, c3 text);
- *     CREATE INDEX Ex2 ON Ex1(c3,c1);
- *
- * In the Table structure describing Ex1, nCol==3 because there are
- * three columns in the table.  In the Index structure describing
- * Ex2, nColumn==2 since 2 of the 3 columns of Ex1 are indexed.
- * The value of aiColumn is {2, 0}.  aiColumn[0]==2 because the
- * first column to be indexed (c3) has an index of 2 in Ex1.aCol[].
- * The second column to be indexed (c1) has an index of 0 in
- * Ex1.aCol[], hence Ex2.aiColumn[1]==0.
- *
- * The Index.onError field determines whether or not the indexed columns
- * must be unique and what to do if they are not.  When Index.onError=
- * ON_CONFLICT_ACTION_NONE, it means this is not a unique index.
- * Otherwise it is a unique index and the value of Index.onError indicate
- * the which conflict resolution algorithm to employ whenever an attempt
- * is made to insert a non-unique element.
- *
  * While parsing a CREATE TABLE or CREATE INDEX statement in order to
  * generate VDBE code (as opposed to reading from Tarantool's _space
  * space as part of parsing an existing database schema), transient instances
@@ -2082,26 +2060,30 @@ struct UnpackedRecord {
  * program is executed). See convertToWithoutRowidTable() for details.
  */
 struct Index {
-       char *zName;            /* Name of this index */
-       i16 *aiColumn;          /* Which columns are used by this index.  1st 
is 0 */
-       LogEst *aiRowLogEst;    /* From ANALYZE: Est. rows selected by each 
column */
-       Table *pTable;          /* The SQL table being indexed */
-       char *zColAff;          /* String defining the affinity of each column 
*/
-       Index *pNext;           /* The next index associated with the same 
table */
-       Schema *pSchema;        /* Schema containing this index */
-       /** Sorting order for each column. */
-       enum sort_order *sort_order;
-       /** Array of collation sequences for index. */
-       struct coll **coll_array;
-       /** Array of collation identifiers. */
-       uint32_t *coll_id_array;
-       Expr *pPartIdxWhere;    /* WHERE clause for partial indices */
-       int tnum;               /* DB Page containing root of this index */
-       u16 nColumn;            /* Number of columns stored in the index */
-       u8 onError;             /* ON_CONFLICT_ACTION_ABORT, _IGNORE, _REPLACE,
-                                * or _NONE
-                                */
-       unsigned idxType:2;     /* 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX */
+       /** From ANALYZE: Est. rows selected by each column. */
+       LogEst *aiRowLogEst;
+       /** The SQL table being indexed. */
+       Table *pTable;
+       /** String defining the affinity of each column. */
+       char *zColAff;
+       /** The next index associated with the same table. */
+       Index *pNext;
+       /** Schema containing this index. */
+       Schema *pSchema;
+       /** WHERE clause for partial indices. */
+       Expr *pPartIdxWhere;
+       /** DB Page containing root of this index. */
+       int tnum;
+       /**
+        * Conflict resolution algorithm to employ whenever an
+        * attempt is made to insert a non-unique element in
+        * unique index.
+        */
+       u8 onError;
+       /** 1==UNIQUE, 2==PRIMARY KEY, 0==CREATE INDEX. */
+       unsigned idxType:2;
+       /** Index definition. */
+       struct index_def *def;
 };
/**
@@ -3532,34 +3514,6 @@ void sqlite3AddCollateType(Parse *, Token *);
  */
 struct coll *
 sql_column_collation(struct space_def *def, uint32_t column, uint32_t 
*coll_id);
-/**
- * Return name of given column collation from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @param[out] coll_id Collation identifier.
- * @retval Pointer to collation.
- */
-struct coll *
-sql_index_collation(Index *idx, uint32_t column, uint32_t *id);
-
-/**
- * Return key_def of provided struct Index.
- * @param idx Pointer to `struct Index` object.
- * @retval Pointer to `struct key_def`.
- */
-struct key_def*
-sql_index_key_def(struct Index *idx);
-
-/**
- * Return sort order of given column from index.
- *
- * @param idx Index which is used to fetch column.
- * @param column Number of column.
- * @retval Sort order of requested column.
- */
-enum sort_order
-sql_index_column_sort_order(Index *idx, uint32_t column);
void sqlite3EndTable(Parse *, Token *, Token *, Select *);
@@ -3646,9 +3600,16 @@ void sqlite3SrcListShiftJoinType(SrcList *);
 void sqlite3SrcListAssignCursors(Parse *, SrcList *);
 void sqlite3IdListDelete(sqlite3 *, IdList *);
 void sqlite3SrcListDelete(sqlite3 *, SrcList *);
-Index *sqlite3AllocateIndexObject(sqlite3 *, i16, int, char **);
-bool
-index_is_unique(Index *);
+/**
+ * Allocate SQL index object with part count fields.
+ * @param db SQLite environment.
+ * @param part_count Index part_count.
+ *
+ * @retval NULL Memory error.
+ * @retval not NULL Index object.
+ */
+struct Index *
+sql_index_alloc(struct sqlite3 *db, uint32_t part_count);
/**
  * Create a new index for an SQL table.  name is the name of the
@@ -3678,8 +3639,9 @@ index_is_unique(Index *);
 void
 sql_create_index(struct Parse *parse, struct Token *token,
                 struct SrcList *tbl_name, struct ExprList *col_list,
-                int on_error, struct Token *start, struct Expr *pi_where,
-                enum sort_order sort_order, bool if_not_exist, u8 idx_type);
+                enum on_conflict_action on_error, struct Token *start,
+                struct Expr *pi_where, enum sort_order sort_order,
+                bool if_not_exist, u8 idx_type);
/**
  * This routine will drop an existing named index.  This routine
@@ -4531,10 +4493,6 @@ int sqlite3InvokeBusyHandler(BusyHandler *);
 int
 sql_analysis_load(struct sqlite3 *db);
-uint32_t
-index_column_count(const Index *);
-bool
-index_is_unique_not_null(const Index *);
 void sqlite3RegisterLikeFunctions(sqlite3 *, int);
 int sqlite3IsLikeFunction(sqlite3 *, Expr *, int *, char *);
 void sqlite3SchemaClear(sqlite3 *);
diff --git a/src/box/sql/update.c b/src/box/sql/update.c
index 212adbcb3..113e3ba0e 100644
--- a/src/box/sql/update.c
+++ b/src/box/sql/update.c
@@ -239,17 +239,18 @@ sqlite3Update(Parse * pParse,             /* The parser 
context */
         */
        for (j = 0, pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext, j++) {
                int reg;
-               int nIdxCol = index_column_count(pIdx);
+               uint32_t part_count = pIdx->def->key_def->part_count;
                if (chngPk || hasFK || pIdx->pPartIdxWhere || pIdx == pPk) {
                        reg = ++pParse->nMem;
-                       pParse->nMem += nIdxCol;
+                       pParse->nMem += part_count;
                } else {
                        reg = 0;
-                       for (i = 0; i < nIdxCol; i++) {
-                               i16 iIdxCol = pIdx->aiColumn[i];
-                               if (iIdxCol < 0 || aXRef[iIdxCol] >= 0) {
+                       for (uint32_t i = 0; i < part_count; i++) {
+                               uint32_t fieldno =
+                                       pIdx->def->key_def->parts[i].fieldno;
+                               if (aXRef[fieldno] >= 0) {
                                        reg = ++pParse->nMem;
-                                       pParse->nMem += nIdxCol;
+                                       pParse->nMem += part_count;
                                        break;
                                }
                        }
@@ -299,17 +300,18 @@ sqlite3Update(Parse * pParse,             /* The parser 
context */
         * In this case we have to manually load columns in order to make tuple.
         */
        int iPk;        /* First of nPk memory cells holding PRIMARY KEY value 
*/
-       i16 nPk;        /* Number of components of the PRIMARY KEY */
+       /* Number of components of the PRIMARY KEY.  */
+       uint32_t pk_part_count;
        int addrOpen;   /* Address of the OpenEphemeral instruction */
if (is_view) {
-               nPk = nKey;
+               pk_part_count = nKey;
        } else {
                assert(pPk != 0);
-               nPk = index_column_count(pPk);
+               pk_part_count = pPk->def->key_def->part_count;
        }
        iPk = pParse->nMem + 1;
-       pParse->nMem += nPk;
+       pParse->nMem += pk_part_count;
        regKey = ++pParse->nMem;
        iEph = pParse->nTab++;
        sqlite3VdbeAddOp2(v, OP_Null, 0, iPk);
@@ -318,7 +320,8 @@ sqlite3Update(Parse * pParse,               /* The parser 
context */
                addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
                                             nKey);
        } else {
-               addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph, nPk);
+               addrOpen = sqlite3VdbeAddOp2(v, OP_OpenTEphemeral, iEph,
+                                            pk_part_count);
                sql_vdbe_set_p4_key_def(pParse, pPk);
        }
@@ -328,27 +331,27 @@ sqlite3Update(Parse * pParse, /* The parser context */
                goto update_cleanup;
        okOnePass = sqlite3WhereOkOnePass(pWInfo, aiCurOnePass);
        if (is_view) {
-               for (i = 0; i < nPk; i++) {
+               for (i = 0; i < (int) pk_part_count; i++) {
                        sqlite3VdbeAddOp3(v, OP_Column, iDataCur, i, iPk + i);
                }
        } else {
-               for (i = 0; i < nPk; i++) {
-                       assert(pPk->aiColumn[i] >= 0);
+               for (i = 0; i < (int) pk_part_count; i++) {
                        sqlite3ExprCodeGetColumnOfTable(v, def, iDataCur,
-                                                       pPk->aiColumn[i],
+                                                       pPk->def->key_def->
+                                                               
parts[i].fieldno,
                                                        iPk + i);
                }
        }
if (okOnePass) {
                sqlite3VdbeChangeToNoop(v, addrOpen);
-               nKey = nPk;
+               nKey = pk_part_count;
                regKey = iPk;
        } else {
                const char *zAff = is_view ? 0 :
                                   sqlite3IndexAffinityStr(pParse->db, pPk);
-               sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, nPk, regKey,
-                                         zAff, nPk);
+               sqlite3VdbeAddOp4(v, OP_MakeRecord, iPk, pk_part_count,
+                                 regKey, zAff, pk_part_count);
                sqlite3VdbeAddOp2(v, OP_IdxInsert, iEph, regKey);
                /* Set flag to save memory allocating one by malloc. */
                sqlite3VdbeChangeP5(v, 1);
diff --git a/src/box/sql/vdbeaux.c b/src/box/sql/vdbeaux.c
index a8bd1e582..e50a23b3d 100644
--- a/src/box/sql/vdbeaux.c
+++ b/src/box/sql/vdbeaux.c
@@ -1150,7 +1150,7 @@ sql_vdbe_set_p4_key_def(struct Parse *parse, struct Index 
*idx)
        struct Vdbe *v = parse->pVdbe;
        assert(v != NULL);
        assert(idx != NULL);
-       struct key_def *def = key_def_dup(sql_index_key_def(idx));
+       struct key_def *def = key_def_dup(idx->def->key_def);
        if (def == NULL)
                sqlite3OomFault(parse->db);
        else
diff --git a/src/box/sql/vdbemem.c b/src/box/sql/vdbemem.c
index 2ce90747d..d0e16bafb 100644
--- a/src/box/sql/vdbemem.c
+++ b/src/box/sql/vdbemem.c
@@ -1087,15 +1087,15 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
                        Index *pIdx = p->pIdx;       /* Index being probed */
                        int nByte;      /* Bytes of space to allocate */
                        int i;  /* Counter variable */
-                       int nCol = index_column_count(pIdx);
+                       int part_count = pIdx->def->key_def->part_count;
- nByte = sizeof(Mem) * nCol +
+                       nByte = sizeof(Mem) * part_count +
                                ROUND8(sizeof(UnpackedRecord));
                        pRec =
                            (UnpackedRecord *) sqlite3DbMallocZero(db, nByte);
                        if (pRec == NULL)
                                return NULL;
-                       pRec->key_def = key_def_dup(sql_index_key_def(pIdx));
+                       pRec->key_def = key_def_dup(pIdx->def->key_def);
                        if (pRec->key_def == NULL) {
                                sqlite3DbFree(db, pRec);
                                sqlite3OomFault(db);
@@ -1103,7 +1103,7 @@ valueNew(sqlite3 * db, struct ValueNewStat4Ctx *p)
                        }
                        pRec->aMem = (Mem *)((char *) pRec +
                                             ROUND8(sizeof(UnpackedRecord)));
-                       for (i = 0; i < nCol; i++) {
+                       for (i = 0; i < (int) part_count; i++) {
                                pRec->aMem[i].flags = MEM_Null;
                                pRec->aMem[i].db = db;
                        }
@@ -1621,15 +1621,12 @@ sql_stat4_column(struct sqlite3 *db, const char 
*record, uint32_t col_num,
 void
 sqlite3Stat4ProbeFree(UnpackedRecord * pRec)
 {
-       if (pRec) {
-               int i;
-               int nCol = pRec->key_def->part_count;
-               Mem *aMem = pRec->aMem;
-               sqlite3 *db = aMem[0].db;
-               for (i = 0; i < nCol; i++) {
+       if (pRec != NULL) {
+               int part_count = pRec->key_def->part_count;
+               struct Mem *aMem = pRec->aMem;
+               for (int i = 0; i < part_count; i++)
                        sqlite3VdbeMemRelease(&aMem[i]);
-               }
-               sqlite3DbFree(db, pRec);
+               sqlite3DbFree(aMem[0].db, pRec);
        }
 }
diff --git a/src/box/sql/where.c b/src/box/sql/where.c
index 85143ed20..7ca02095f 100644
--- a/src/box/sql/where.c
+++ b/src/box/sql/where.c
@@ -372,13 +372,19 @@ whereScanInit(WhereScan * pScan,  /* The WhereScan object 
being initialized */
        pScan->is_column_seen = false;
        if (pIdx) {
                int j = iColumn;
-               iColumn = pIdx->aiColumn[j];
+               iColumn = pIdx->def->key_def->parts[j].fieldno;
+               /*
+                * pIdx->tnum == 0 means that pIdx is a fake
+                * integer primary key index.
+                */
+               if (pIdx->tnum == 0)
+                       iColumn = -1;
+
                if (iColumn >= 0) {
                        char affinity =
                                pIdx->pTable->def->fields[iColumn].affinity;
                        pScan->idxaff = affinity;
-                       uint32_t id;
-                       pScan->coll = sql_index_collation(pIdx, j, &id);
+                       pScan->coll = pIdx->def->key_def->parts[j].coll;
                        pScan->is_column_seen = true;
                }
        }
@@ -541,47 +547,24 @@ findIndexCol(Parse * pParse,      /* Parse context */
             Index * pIdx,      /* Index to match column of */
             int iCol)          /* Column of index to match */
 {
+       struct key_part *part_to_match = &pIdx->def->key_def->parts[iCol];
        for (int i = 0; i < pList->nExpr; i++) {
                Expr *p = sqlite3ExprSkipCollate(pList->a[i].pExpr);
-               if (p->op == TK_COLUMN &&
-                   p->iColumn == pIdx->aiColumn[iCol] &&
-                   p->iTable == iBase) {
+               if (p->op == TK_COLUMN && p->iTable == iBase &&
+                   p->iColumn == (int) part_to_match->fieldno) {
                        bool is_found;
                        uint32_t id;
                        struct coll *coll = sql_expr_coll(pParse,
                                                          pList->a[i].pExpr,
                                                          &is_found, &id);
-                       if (is_found &&
-                           coll == sql_index_collation(pIdx, iCol, &id)) {
+                       if (is_found && coll == part_to_match->coll)
                                return i;
-                       }
                }
        }
return -1;
 }
-/*
- * Return TRUE if the iCol-th column of index pIdx is NOT NULL
- */
-static int
-indexColumnNotNull(Index * pIdx, int iCol)
-{
-       int j;
-       assert(pIdx != 0);
-       assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
-       j = pIdx->aiColumn[iCol];
-       if (j >= 0) {
-               return !pIdx->pTable->def->fields[j].is_nullable;
-       } else if (j == (-1)) {
-               return 1;
-       } else {
-               assert(j == (-2));
-               return 0;       /* Assume an indexed expression can always 
yield a NULL */
-
-       }
-}
-
 /*
  * Return true if the DISTINCT expression-list passed as the third argument
  * is redundant.
@@ -633,9 +616,9 @@ isDistinctRedundant(Parse * pParse,         /* Parsing 
context */
         *      contain a "col=X" term are subject to a NOT NULL constraint.
         */
        for (pIdx = pTab->pIndex; pIdx; pIdx = pIdx->pNext) {
-               if (!index_is_unique(pIdx))
+               if (!pIdx->def->opts.is_unique)
                        continue;
-               int col_count = index_column_count(pIdx);
+               int col_count = pIdx->def->key_def->part_count;
                for (i = 0; i < col_count; i++) {
                        if (0 ==
                            sqlite3WhereFindTerm(pWC, iBase, i, ~(Bitmask) 0,
@@ -643,11 +626,12 @@ isDistinctRedundant(Parse * pParse,               /* 
Parsing context */
                                if (findIndexCol
                                    (pParse, pDistinct, iBase, pIdx, i) < 0)
                                        break;
-                               if (indexColumnNotNull(pIdx, i) == 0)
+                               uint32_t j = 
pIdx->def->key_def->parts[i].fieldno;
+                               if (pIdx->pTable->def->fields[j].is_nullable)
                                        break;
                        }
                }
-               if (i == (int)index_column_count(pIdx)) {
+               if (i == (int) pIdx->def->key_def->part_count) {
                        /* This index implies that the DISTINCT qualifier is 
redundant. */
                        return 1;
                }
@@ -835,8 +819,7 @@ constructAutomaticIndex(Parse * pParse,                     
/* The parsing context */
        }
/* Construct the Index object to describe this index */
-       pIdx =
-           sqlite3AllocateIndexObject(pParse->db, nKeyCol + 1, 0, &zNotUsed);
+       pIdx = sql_index_alloc(pParse->db, nKeyCol + 1);
        if (pIdx == 0)
                goto end_auto_index_create;
        pLoop->pIndex = pIdx;
@@ -1184,7 +1167,7 @@ whereRangeAdjust(WhereTerm * pTerm, LogEst nNew)
 char
 sqlite3IndexColumnAffinity(sqlite3 * db, Index * pIdx, int iCol)
 {
-       assert(iCol >= 0 && iCol < (int)index_column_count(pIdx));
+       assert(iCol >= 0 && iCol < (int) pIdx->def->key_def->part_count);
        if (!pIdx->zColAff) {
                if (sqlite3IndexAffinityStr(db, pIdx) == 0)
                        return AFFINITY_BLOB;
@@ -1246,13 +1229,12 @@ whereRangeSkipScanEst(Parse * pParse,           /* Parsing 
& code generating context */
        int nUpper = index->def->opts.stat->sample_count + 1;
        int rc = SQLITE_OK;
        u8 aff = sqlite3IndexColumnAffinity(db, p, nEq);
-       uint32_t id;
sqlite3_value *p1 = 0; /* Value extracted from pLower */
        sqlite3_value *p2 = 0;  /* Value extracted from pUpper */
        sqlite3_value *pVal = 0;        /* Value extracted from record */
- struct coll *pColl = sql_index_collation(p, nEq, &id);
+       struct coll *coll = p->def->key_def->parts[nEq].coll;
        if (pLower) {
                rc = sqlite3Stat4ValueFromExpr(pParse, pLower->pExpr->pRight,
                                               aff, &p1);
@@ -1273,12 +1255,12 @@ whereRangeSkipScanEst(Parse * pParse,           /* Parsing 
& code generating context */
                        rc = sql_stat4_column(db, samples[i].sample_key, nEq,
                                              &pVal);
                        if (rc == SQLITE_OK && p1) {
-                               int res = sqlite3MemCompare(p1, pVal, pColl);
+                               int res = sqlite3MemCompare(p1, pVal, coll);
                                if (res >= 0)
                                        nLower++;
                        }
                        if (rc == SQLITE_OK && p2) {
-                               int res = sqlite3MemCompare(p2, pVal, pColl);
+                               int res = sqlite3MemCompare(p2, pVal, coll);
                                if (res >= 0)
                                        nUpper++;
                        }
@@ -1448,7 +1430,7 @@ whereRangeScanEst(Parse * pParse, /* Parsing & code 
generating context */
                               || (pLower->eOperator & (WO_GT | WO_GE)) != 0);
                        assert(pUpper == 0
                               || (pUpper->eOperator & (WO_LT | WO_LE)) != 0);
-                       if (sql_index_column_sort_order(p, nEq) !=
+                       if (p->def->key_def->parts[nEq].sort_order !=
                            SORT_ORDER_ASC) {
                                /* The roles of pLower and pUpper are swapped 
for a DESC index */
                                SWAP(pLower, pUpper);
@@ -1598,7 +1580,7 @@ whereEqualScanEst(Parse * pParse, /* Parsing & code 
generating context */
        int bOk;
assert(nEq >= 1);
-       assert(nEq <= (int)index_column_count(p));
+       assert(nEq <= (int) p->def->key_def->part_count);
        assert(pBuilder->nRecValid < nEq);
/* If values are not available for all fields of the index to the left
@@ -1619,7 +1601,7 @@ whereEqualScanEst(Parse * pParse, /* Parsing & code 
generating context */
whereKeyStats(pParse, p, pRec, 0, a);
        WHERETRACE(0x10, ("equality scan regions %s(%d): %d\n",
-                         p->zName, nEq - 1, (int)a[1]));
+                         p->def->name, nEq - 1, (int)a[1]));
        *pnRow = a[1];
return rc;
@@ -1751,7 +1733,7 @@ whereLoopPrint(WhereLoop * p, WhereClause * pWC)
                           pItem->zAlias ? pItem->zAlias : pTab->def->name);
 #endif
        const char *zName;
-       if (p->pIndex && (zName = p->pIndex->zName) != 0) {
+       if (p->pIndex != NULL && (zName = p->pIndex->def->name) != NULL) {
                if (strncmp(zName, "sqlite_autoindex_", 17) == 0) {
                        int i = sqlite3Strlen30(zName) - 1;
                        while (zName[i] != '_')
@@ -2314,7 +2296,7 @@ whereRangeVectorLen(Parse * pParse,       /* Parsing 
context */
        int nCmp = sqlite3ExprVectorSize(pTerm->pExpr->pLeft);
        int i;
- nCmp = MIN(nCmp, (int)(index_column_count(pIdx) - nEq));
+       nCmp = MIN(nCmp, (int)(pIdx->def->key_def->part_count - nEq));
        for (i = 1; i < nCmp; i++) {
                /* Test if comparison i of pTerm is compatible with column 
(i+nEq)
                 * of the index. If not, exit the loop.
@@ -2335,13 +2317,11 @@ whereRangeVectorLen(Parse * pParse,     /* Parsing 
context */
                 * order of the index column is the same as the sort order of 
the
                 * leftmost index column.
                 */
-               if (pLhs->op != TK_COLUMN
-                   || pLhs->iTable != iCur
-                   || pLhs->iColumn != pIdx->aiColumn[i + nEq]
-                   || sql_index_column_sort_order(pIdx, i + nEq) !=
-                      sql_index_column_sort_order(pIdx, nEq)) {
+               struct key_part *parts = pIdx->def->key_def->parts;
+               if (pLhs->op != TK_COLUMN || pLhs->iTable != iCur ||
+                   pLhs->iColumn != (int)parts[i + nEq].fieldno ||
+                   parts[i + nEq].sort_order != parts[nEq].sort_order)
                        break;
-               }
aff = sqlite3CompareAffinity(pRhs, sqlite3ExprAffinity(pLhs));
                idxaff =
@@ -2353,7 +2333,7 @@ whereRangeVectorLen(Parse * pParse,       /* Parsing 
context */
                pColl = sql_binary_compare_coll_seq(pParse, pLhs, pRhs, &id);
                if (pColl == 0)
                        break;
-               if (sql_index_collation(pIdx, i + nEq, &id) != pColl)
+               if (pIdx->def->key_def->parts[i + nEq].coll != pColl)
                        break;
        }
        return i;
@@ -2396,13 +2376,13 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,     
/* The WhereLoop factory */
        LogEst rSize;           /* Number of rows in the table */
        LogEst rLogSize;        /* Logarithm of table size */
        WhereTerm *pTop = 0, *pBtm = 0; /* Top and bottom range constraints */
-       uint32_t nProbeCol = index_column_count(pProbe);
+       uint32_t probe_part_count = pProbe->def->key_def->part_count;
pNew = pBuilder->pNew;
        if (db->mallocFailed)
                return SQLITE_NOMEM_BKPT;
        WHERETRACE(0x800, ("BEGIN addBtreeIdx(%s), nEq=%d\n",
-                          pProbe->zName, pNew->nEq));
+                          pProbe->def->name, pNew->nEq));
assert((pNew->wsFlags & WHERE_TOP_LIMIT) == 0);
        if (pNew->wsFlags & WHERE_BTM_LIMIT) {
@@ -2431,7 +2411,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       
/* The WhereLoop factory */
                stat = &surrogate_stat;
        if (stat->is_unordered)
                opMask &= ~(WO_GT | WO_GE | WO_LT | WO_LE);
-       assert(pNew->nEq < nProbeCol);
+       assert(pNew->nEq < probe_part_count);
saved_nEq = pNew->nEq;
        saved_nBtm = pNew->nBtm;
@@ -2452,10 +2432,14 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,     
/* The WhereLoop factory */
                LogEst nOutUnadjusted;  /* nOut before IN() and WHERE 
adjustments */
                int nIn = 0;
                int nRecValid = pBuilder->nRecValid;
-               if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0)
-                   && indexColumnNotNull(pProbe, saved_nEq)
-                   ) {
-                       continue;       /* ignore IS [NOT] NULL constraints on 
NOT NULL columns */
+               uint32_t j = pProbe->def->key_def->parts[saved_nEq].fieldno;
+               if ((eOp == WO_ISNULL || (pTerm->wtFlags & TERM_VNULL) != 0) &&
+                   !pProbe->pTable->def->fields[j].is_nullable) {
+                       /*
+                        * Ignore IS [NOT] NULL constraints on NOT
+                        * NULL columns.
+                        */
+                       continue;
                }
                if (pTerm->prereqRight & pNew->maskSelf)
                        continue;
@@ -2523,14 +2507,16 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,     
/* The WhereLoop factory */
                                                         */
                        }
                } else if (eOp & WO_EQ) {
-                       int iCol = pProbe->aiColumn[saved_nEq];
+                       int iCol = 
pProbe->def->key_def->parts[saved_nEq].fieldno;
                        pNew->wsFlags |= WHERE_COLUMN_EQ;
                        assert(saved_nEq == pNew->nEq);
-                       if ((iCol > 0 && nInMul == 0
-                               && saved_nEq == nProbeCol - 1)
-                           ) {
-                               if (iCol >= 0 &&
-                                   !index_is_unique_not_null(pProbe)) {
+                       if (iCol > 0 && nInMul == 0 &&
+                           saved_nEq == probe_part_count - 1) {
+                               bool index_is_unique_not_null =
+                                       pProbe->def->key_def->is_nullable &&
+                                       pProbe->def->opts.is_unique;
+                               if (pProbe->tnum != 0 &&
+                                   !index_is_unique_not_null) {
                                        pNew->wsFlags |= WHERE_UNQ_WANTED;
                                } else {
                                        pNew->wsFlags |= WHERE_ONEROW;
@@ -2592,8 +2578,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       
/* The WhereLoop factory */
                        assert(eOp & (WO_ISNULL | WO_EQ | WO_IN));
assert(pNew->nOut == saved_nOut);
-                       if (pTerm->truthProb <= 0
-                           && pProbe->aiColumn[saved_nEq] >= 0) {
+                       if (pTerm->truthProb <= 0 && pProbe->tnum != 0 ) {
                                assert((eOp & WO_IN) || nIn == 0);
                                testcase(eOp & WO_IN);
                                pNew->nOut += pTerm->truthProb;
@@ -2695,8 +2680,8 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       
/* The WhereLoop factory */
                        pNew->nOut = nOutUnadjusted;
                }
- if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0
-                   && pNew->nEq < nProbeCol) {
+               if ((pNew->wsFlags & WHERE_TOP_LIMIT) == 0 &&
+                   pNew->nEq < probe_part_count) {
                        whereLoopAddBtreeIndex(pBuilder, pSrc, pProbe,
                                               nInMul + nIn);
                }
@@ -2724,7 +2709,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       
/* The WhereLoop factory */
         * more expensive.
         */
        assert(42 == sqlite3LogEst(18));
-       if (saved_nEq == saved_nSkip && saved_nEq + 1U < nProbeCol &&
+       if (saved_nEq == saved_nSkip && saved_nEq + 1U < probe_part_count &&
            stat->skip_scan_enabled == true &&
            /* TUNING: Minimum for skip-scan */
            index_field_tuple_est(pProbe, saved_nEq + 1) >= 42 &&
@@ -2749,7 +2734,7 @@ whereLoopAddBtreeIndex(WhereLoopBuilder * pBuilder,       
/* The WhereLoop factory */
        }
WHERETRACE(0x800, ("END addBtreeIdx(%s), nEq=%d, rc=%d\n",
-                          pProbe->zName, saved_nEq, rc));
+                          pProbe->def->name, saved_nEq, rc));
        return rc;
 }
@@ -2792,7 +2777,7 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
 {
        ExprList *pOB;
        int ii, jj;
-       int nIdxCol = index_column_count(pIndex);
+       int part_count = pIndex->def->key_def->part_count;
        if (index_is_unordered(pIndex))
                return 0;
        if ((pOB = pBuilder->pWInfo->pOrderBy) == 0)
@@ -2802,8 +2787,9 @@ indexMightHelpWithOrderBy(WhereLoopBuilder * pBuilder,
                if (pExpr->op == TK_COLUMN && pExpr->iTable == iCursor) {
                        if (pExpr->iColumn < 0)
                                return 1;
-                       for (jj = 0; jj < nIdxCol; jj++) {
-                               if (pExpr->iColumn == pIndex->aiColumn[jj])
+                       for (jj = 0; jj < part_count; jj++) {
+                               if (pExpr->iColumn == (int)
+                                   pIndex->def->key_def->parts[jj].fieldno)
                                        return 1;
                        }
                }
@@ -2882,7 +2868,6 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,    /* 
WHERE clause information */
        Index *pProbe;          /* An index we are evaluating */
        Index sPk;              /* A fake index object for the primary key */
        LogEst aiRowEstPk[2];   /* The aiRowLogEst[] value for the sPk index */
-       i16 aiColumnPk = -1;    /* The aColumn[] value for the sPk index */
        SrcList *pTabList;      /* The FROM clause */
        struct SrcList_item *pSrc;      /* The FROM clause btree term to add */
        WhereLoop *pNew;        /* Template WhereLoop object */
@@ -2913,11 +2898,32 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,  /* 
WHERE clause information */
                 */
                Index *pFirst;  /* First of real indices on the table */
                memset(&sPk, 0, sizeof(Index));
-               sPk.nColumn = 1;
-               sPk.aiColumn = &aiColumnPk;
                sPk.aiRowLogEst = aiRowEstPk;
                sPk.onError = ON_CONFLICT_ACTION_REPLACE;
                sPk.pTable = pTab;
+
+               struct key_def *key_def = key_def_new(1);
+               if (key_def == NULL) {
+                       pWInfo->pParse->nErr++;
+                       pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+                       return SQL_TARANTOOL_ERROR;
+               }
+
+               key_def_set_part(key_def, 0, 0, pTab->def->fields[0].type,
+                                ON_CONFLICT_ACTION_ABORT,
+                                NULL, COLL_NONE, SORT_ORDER_ASC);
+
+               sPk.def = index_def_new(pTab->def->id, 0, "primary",
+                                       sizeof("primary") - 1, TREE,
+                                       &index_opts_default, key_def, NULL);
+               key_def_delete(key_def);
+
+               if (sPk.def == NULL) {
+                       pWInfo->pParse->nErr++;
+                       pWInfo->pParse->rc = SQL_TARANTOOL_ERROR;
+                       return SQL_TARANTOOL_ERROR;
+               }
+
                aiRowEstPk[0] = sql_space_tuple_log_count(pTab);
                aiRowEstPk[1] = 0;
                pFirst = pSrc->pTab->pIndex;
@@ -3058,6 +3064,8 @@ whereLoopAddBtree(WhereLoopBuilder * pBuilder,    /* 
WHERE clause information */
                if (pSrc->pIBIndex)
                        break;
        }
+       if (&sPk == pProbe && sPk.def != NULL)
+               index_def_delete(sPk.def);
        return rc;
 }
@@ -3392,8 +3400,8 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo, /* The WHERE clause */
                                   index_is_unordered(pIndex)) {
                                return 0;
                        } else {
-                               nColumn = index_column_count(pIndex);
-                               isOrderDistinct = index_is_unique(pIndex);
+                               nColumn = pIndex->def->key_def->part_count;
+                               isOrderDistinct = pIndex->def->opts.is_unique;
                        }
/* Loop through all columns of the index and deal with the ones
@@ -3454,9 +3462,10 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,    /* The 
WHERE clause */
                                 * (revIdx) for the j-th column of the index.
                                 */
                                if (pIndex != NULL) {
-                                       iColumn = pIndex->aiColumn[j];
-                                       revIdx = 
sql_index_column_sort_order(pIndex,
-                                                                            j);
+                                       struct key_def *def =
+                                               pIndex->def->key_def;
+                                       iColumn = def->parts[j].fieldno;
+                                       revIdx = def->parts[j].sort_order;
                                        if (iColumn == pIndex->pTable->iPKey)
                                                iColumn = -1;
                                } else {
@@ -3506,8 +3515,7 @@ wherePathSatisfiesOrderBy(WhereInfo * pWInfo,     /* The 
WHERE clause */
                                                                      
pOrderBy->a[i].pExpr,
                                                                      &is_found, 
&id);
                                                struct coll *idx_coll =
-                                                       
sql_index_collation(pIndex,
-                                                                           j, 
&id);
+                                                       
pIndex->def->key_def->parts[j].coll;
                                                if (is_found &&
                                                    coll != idx_coll)
                                                        continue;
@@ -4777,7 +4785,7 @@ sqlite3WhereBegin(Parse * pParse, /* The parser context */
                                        sqlite3VdbeChangeP5(v, OPFLAG_SEEKEQ);  
/* Hint to COMDB2 */
                                }
                                if (pIx != NULL)
-                                       VdbeComment((v, "%s", pIx->zName));
+                                       VdbeComment((v, "%s", pIx->def->name));
                                else
                                        VdbeComment((v, "%s", idx_def->name));
 #ifdef SQLITE_ENABLE_COLUMN_USED_MASK
@@ -4910,7 +4918,7 @@ sqlite3WhereEnd(WhereInfo * pWInfo)
                if (pLevel->addrSkip) {
                        sqlite3VdbeGoto(v, pLevel->addrSkip);
                        VdbeComment((v, "next skip-scan on %s",
-                                    pLoop->pIndex->zName));
+                                    pLoop->pIndex->def->name));
                        sqlite3VdbeJumpHere(v, pLevel->addrSkip);
                        sqlite3VdbeJumpHere(v, pLevel->addrSkip - 2);
                }
diff --git a/src/box/sql/wherecode.c b/src/box/sql/wherecode.c
index c35c25ac4..1976583fa 100644
--- a/src/box/sql/wherecode.c
+++ b/src/box/sql/wherecode.c
@@ -48,7 +48,7 @@
 static const char *
 explainIndexColumnName(Index * pIdx, int i)
 {
-       i = pIdx->aiColumn[i];
+       i = pIdx->def->key_def->parts[i].fieldno;
        return pIdx->pTable->def->fields[i].name;
 }
@@ -243,7 +243,7 @@ sqlite3WhereExplainOneScan(Parse * pParse, /* Parse context */
                        if (zFmt) {
                                sqlite3StrAccumAppend(&str, " USING ", 7);
                                if (pIdx != NULL)
-                                       sqlite3XPrintf(&str, zFmt, pIdx->zName);
+                                       sqlite3XPrintf(&str, zFmt, 
pIdx->def->name);
                                else if (idx_def != NULL)
                                        sqlite3XPrintf(&str, zFmt, 
idx_def->name);
                                else
@@ -488,7 +488,7 @@ codeEqualityTerm(Parse * pParse,    /* The parsing context 
*/
                int *aiMap = 0;
if (pLoop->pIndex != 0 &&
-                   sql_index_column_sort_order(pLoop->pIndex, iEq)) {
+                   pLoop->pIndex->def->key_def->parts[iEq].sort_order) {
                        testcase(iEq == 0);
                        testcase(bRev);
                        bRev = !bRev;
@@ -736,7 +736,7 @@ codeAllEqualityTerms(Parse * pParse,        /* Parsing 
context */
                sqlite3VdbeAddOp1(v, (bRev ? OP_Last : OP_Rewind), iIdxCur);
                VdbeCoverageIf(v, bRev == 0);
                VdbeCoverageIf(v, bRev != 0);
-               VdbeComment((v, "begin skip-scan on %s", pIdx->zName));
+               VdbeComment((v, "begin skip-scan on %s", pIdx->def->name));
                j = sqlite3VdbeAddOp0(v, OP_Goto);
                pLevel->addrSkip =
                    sqlite3VdbeAddOp4Int(v, (bRev ? OP_SeekLT : OP_SeekGT),
@@ -746,7 +746,8 @@ codeAllEqualityTerms(Parse * pParse,        /* Parsing 
context */
                sqlite3VdbeJumpHere(v, j);
                for (j = 0; j < nSkip; j++) {
                        sqlite3VdbeAddOp3(v, OP_Column, iIdxCur,
-                                         pIdx->aiColumn[j], regBase + j);
+                                         pIdx->def->key_def->parts[j].fieldno,
+                                         regBase + j);
                        VdbeComment((v, "%s", explainIndexColumnName(pIdx, j)));
                }
        }
@@ -1037,14 +1038,14 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        
/* Complete information about t
                assert(pWInfo->pOrderBy == 0
                       || pWInfo->pOrderBy->nExpr == 1
                       || (pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) == 0);
-               int nIdxCol;
+               uint32_t part_count;
                if (pIdx != NULL)
-                       nIdxCol = index_column_count(pIdx);
+                       part_count = pIdx->def->key_def->part_count;
                else
-                       nIdxCol = idx_def->key_def->part_count;
-               if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0
-                   && pWInfo->nOBSat > 0 && (nIdxCol > nEq)) {
-                       j = pIdx->aiColumn[nEq];
+                       part_count = idx_def->key_def->part_count;
+               if ((pWInfo->wctrlFlags & WHERE_ORDERBY_MIN) != 0 &&
+                   pWInfo->nOBSat > 0 && part_count > nEq) {
+                       j = pIdx->def->key_def->parts[nEq].fieldno;
                        /* Allow seek for column with `NOT NULL` == false 
attribute.
                         * If a column may contain NULL-s, the comparator 
installed
                         * by Tarantool is prepared to seek using a NULL value.
@@ -1055,8 +1056,7 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,  /* 
Complete information about t
                         * FYI: entries in an index are ordered as follows:
                         *      NULL, ... NULL, min_value, ...
                         */
-                       if (j >= 0 &&
-                           pIdx->pTable->def->fields[j].is_nullable) {
+                       if (pIdx->pTable->def->fields[j].is_nullable) {
                                assert(pLoop->nSkip == 0);
                                bSeekPastNull = 1;
                                nExtraReg = 1;
@@ -1093,16 +1093,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        
/* Complete information about t
                                testcase(pIdx->aSortOrder[nEq] ==
                                         SORT_ORDER_DESC);
                                assert((bRev & ~1) == 0);
+                               struct key_def *def = pIdx->def->key_def;
                                pLevel->iLikeRepCntr <<= 1;
                                pLevel->iLikeRepCntr |=
-                                       bRev ^ 
(sql_index_column_sort_order(pIdx, nEq) ==
+                                       bRev ^ (def->parts[nEq].sort_order ==
                                                SORT_ORDER_DESC);
                        }
 #endif
                        if (pRangeStart == 0) {
-                               j = pIdx->aiColumn[nEq];
-                               if (j >= 0 &&
-                                   pIdx->pTable->def->fields[j].is_nullable)
+                               j = pIdx->def->key_def->parts[nEq].fieldno;
+                               if (pIdx->pTable->def->fields[j].is_nullable)
                                        bSeekPastNull = 1;
                        }
                }
@@ -1113,10 +1113,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo, /* 
Complete information about t
                 * a forward order scan on a descending index, interchange the
                 * start and end terms (pRangeStart and pRangeEnd).
                 */
-               if ((nEq < nIdxCol &&
-                    bRev == (sql_index_column_sort_order(pIdx, nEq) ==
-                             SORT_ORDER_ASC)) ||
-                   (bRev && nIdxCol == nEq)) {
+               if ((nEq < part_count &&
+                    bRev == (pIdx->def->key_def->parts[nEq].sort_order ==
+                             SORT_ORDER_ASC)) || (bRev && part_count == nEq)) {
                        SWAP(pRangeEnd, pRangeStart);
                        SWAP(bSeekPastNull, bStopAtNull);
                        SWAP(nBtm, nTop);
@@ -1196,16 +1195,16 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        
/* Complete information about t
                        }
                } else {
                        pk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-                       affinity =
-                               
pIdx->pTable->def->fields[pk->aiColumn[0]].affinity;
+                       uint32_t fieldno = pk->def->key_def->parts[0].fieldno;
+                       affinity = pIdx->pTable->def->fields[fieldno].affinity;
                }
- int nPkCol;
+               uint32_t pk_part_count;
                if (pk != NULL)
-                       nPkCol = index_column_count(pk);
+                       pk_part_count = pk->def->key_def->part_count;
                else
-                       nPkCol = idx_pk->key_def->part_count;
-               if (nPkCol == 1 && affinity == AFFINITY_INTEGER) {
+                       pk_part_count = idx_pk->key_def->part_count;
+               if (pk_part_count == 1 && affinity == AFFINITY_INTEGER) {
                        /* Right now INTEGER PRIMARY KEY is the only option to
                         * get Tarantool's INTEGER column type. Need special 
handling
                         * here: try to loosely convert FLOAT to INT. If RHS 
type
@@ -1213,8 +1212,9 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,  /* 
Complete information about t
                         */
                        int limit = pRangeStart == NULL ? nEq : nEq + 1;
                        for (int i = 0; i < limit; i++) {
-                               if ((pIdx != NULL && pIdx->aiColumn[i] ==
-                                    pk->aiColumn[0]) ||
+                               if ((pIdx != NULL &&
+                                    pIdx->def->key_def->parts[i].fieldno ==
+                                    pk->def->key_def->parts[0].fieldno) ||
                                    (idx_pk != NULL &&
                                     idx_def->key_def->parts[i].fieldno ==
                                     idx_pk->key_def->parts[0].fieldno)) {
@@ -1326,17 +1326,17 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        
/* Complete information about t
                        /* pIdx is a covering index.  No need to access the 
main table. */
                }  else if (iCur != iIdxCur) {
                        Index *pPk = sqlite3PrimaryKeyIndex(pIdx->pTable);
-                       int nPkCol = index_column_count(pPk);
-                       int iKeyReg = sqlite3GetTempRange(pParse, nPkCol);
-                       for (j = 0; j < nPkCol; j++) {
-                               k = pPk->aiColumn[j];
+                       int pk_part_count = pPk->def->key_def->part_count;
+                       int iKeyReg = sqlite3GetTempRange(pParse, 
pk_part_count);
+                       for (j = 0; j < pk_part_count; j++) {
+                               k = pPk->def->key_def->parts[j].fieldno;
                                sqlite3VdbeAddOp3(v, OP_Column, iIdxCur, k,
                                                  iKeyReg + j);
                        }
                        sqlite3VdbeAddOp4Int(v, OP_NotFound, iCur, addrCont,
-                                            iKeyReg, nPkCol);
+                                            iKeyReg, pk_part_count);
                        VdbeCoverage(v);
-                       sqlite3ReleaseTempRange(pParse, iKeyReg, nPkCol);
+                       sqlite3ReleaseTempRange(pParse, iKeyReg, pk_part_count);
                }
/* Record the instruction used to terminate the loop. */
@@ -1434,10 +1434,10 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        
/* Complete information about t
                 */
                if ((pWInfo->wctrlFlags & WHERE_DUPLICATES_OK) == 0) {
                        Index *pPk = sqlite3PrimaryKeyIndex(pTab);
-                       int nPkCol = index_column_count(pPk);
+                       int pk_part_count = pPk->def->key_def->part_count;
                        regRowset = pParse->nTab++;
                        sqlite3VdbeAddOp2(v, OP_OpenTEphemeral,
-                                         regRowset, nPkCol);
+                                         regRowset, pk_part_count);
                        sql_vdbe_set_p4_key_def(pParse, pPk);
                        regPk = ++pParse->nMem;
                }
@@ -1538,16 +1538,23 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo,        
/* Complete information about t
                                                int iSet =
                                                    ((ii == pOrWc->nTerm - 1) ? 
-1 : ii);
                                                Index *pPk = 
sqlite3PrimaryKeyIndex (pTab);
-                                               int nPk = 
index_column_count(pPk);
-                                               int iPk;
+                                               struct key_def *def =
+                                                       pPk->def->key_def;
/* Read the PK into an array of temp registers. */
-                                               r = sqlite3GetTempRange(pParse, 
nPk);
-                                               for (iPk = 0; iPk < nPk; iPk++) 
{
-                                                       int iCol = 
pPk->aiColumn[iPk];
+                                               r = sqlite3GetTempRange(pParse,
+                                                                       
def->part_count);
+                                               for (uint32_t iPk = 0;
+                                                    iPk < def->part_count;
+                                                    iPk++) {
+                                                       uint32_t fieldno =
+                                                               def->parts[iPk].
+                                                               fieldno;
                                                        
sqlite3ExprCodeGetColumnToReg
-                                                               (pParse, 
pTab->def,
-                                                                iCol, iCur,
+                                                               (pParse,
+                                                                pTab->def,
+                                                                fieldno,
+                                                                iCur,
                                                                 r + iPk);
                                                }
@@ -1567,20 +1574,21 @@ sqlite3WhereCodeOneLoopStart(WhereInfo * pWInfo, /* Complete information about t
                                                        jmp1 = 
sqlite3VdbeAddOp4Int
                                                                (v, OP_Found,
                                                                 regRowset, 0,
-                                                                r, nPk);
+                                                                r,
+                                                                
def->part_count);
                                                        VdbeCoverage(v);
                                                }
                                                if (iSet >= 0) {
                                                        sqlite3VdbeAddOp3
                                                                (v, 
OP_MakeRecord,
-                                                                r, nPk, regPk);
+                                                                r, 
def->part_count, regPk);
                                                        sqlite3VdbeAddOp2
                                                                (v, 
OP_IdxInsert,
                                                                 regRowset, 
regPk);
                                                }
/* Release the array of temp registers */
-                                               sqlite3ReleaseTempRange(pParse, 
r, nPk);
+                                               sqlite3ReleaseTempRange(pParse, r, 
def->part_count);
                                        }
/* Invoke the main loop body as a subroutine */
diff --git a/test/box/misc.result b/test/box/misc.result
index a00d03365..a0b35ecc2 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -487,6 +487,7 @@ t;
   160: box.error.ACTION_MISMATCH
   161: box.error.VIEW_MISSING_SQL
   162: box.error.FOREIGN_KEY_CONSTRAINT
+  163: box.error.NO_SUCH_COLLATION
 ...
 test_run:cmd("setopt delimiter ''");
 ---
diff --git a/test/sql-tap/collation.test.lua b/test/sql-tap/collation1.test.lua
similarity index 100%
rename from test/sql-tap/collation.test.lua
rename to test/sql-tap/collation1.test.lua
diff --git a/test/sql-tap/collation2.test.lua b/test/sql-tap/collation2.test.lua
new file mode 100755
index 000000000..64296b0be
--- /dev/null
+++ b/test/sql-tap/collation2.test.lua
@@ -0,0 +1,20 @@
+#!/usr/bin/env tarantool
+test = require("sqltester")
+test:plan(3)
+
+test:do_catchsql_test(
+        "collation-2.1",
+        'CREATE TABLE test1 (a int, b int, c int, PRIMARY KEY (a, a, a, b, 
c))',
+        nil)
+
+test:do_catchsql_test(
+        "collation-2.2",
+        'CREATE TABLE test2 (a int, b int, c int, PRIMARY KEY (a, a, a, b, b, 
a, c))',
+        nil)
+
+test:do_catchsql_test(
+        "collation-2.3",
+        'CREATE TABLE test3 (a int, b int, c int, PRIMARY KEY (a, a COLLATE 
foo, b, c))',
+        {1, "Collation 'FOO' does not exist"})
+
+test:finish_test()
diff --git a/test/sql-tap/colname.test.lua b/test/sql-tap/colname.test.lua
index c53a1e885..ddc06eea7 100755
--- a/test/sql-tap/colname.test.lua
+++ b/test/sql-tap/colname.test.lua
@@ -643,13 +643,13 @@ test:do_catchsql_test(
     "colname-11.2",
     [[CREATE TABLE t1(a, b, c, d, e,
       PRIMARY KEY(a), UNIQUE('b' COLLATE "unicode_ci" DESC));]],
-    {1, "/functional indexes aren't supported in the current version/"})
+    {1, "/Tarantool does not support functional indexes/"})
test:execsql("create table table1(a primary key, b, c)")
test:do_catchsql_test(
     "colname-11.3",
     [[ CREATE INDEX t1c ON table1('c'); ]],
-    {1, "/functional indexes aren't supported in the current version/"})
+    {1, "/Tarantool does not support functional indexes/"})
test:finish_test()
diff --git a/test/sql-tap/identifier_case.test.lua 
b/test/sql-tap/identifier_case.test.lua
index 5e7573ac4..ed9553c6b 100755
--- a/test/sql-tap/identifier_case.test.lua
+++ b/test/sql-tap/identifier_case.test.lua
@@ -206,8 +206,8 @@ data = {
     { 3,  [["binary"]], {0}},
     { 4,  [["bInaRy"]], {0}},
     { 5,  [["unicode"]], {0}},
-    { 6,  [[ unicode ]], {1,"no such collation sequence: UNICODE"}},
-    { 7,  [["UNICODE"]], {1,"no such collation sequence: UNICODE"}}
+    { 6,  [[ unicode ]], {1,"Collation 'UNICODE' does not exist"}},
+    { 7,  [["UNICODE"]], {1,"Collation 'UNICODE' does not exist"}}
 }
test:do_catchsql_test(
diff --git a/test/sql/message-func-indexes.result 
b/test/sql/message-func-indexes.result
index 4bf1cab49..dbcb12f03 100644
--- a/test/sql/message-func-indexes.result
+++ b/test/sql/message-func-indexes.result
@@ -12,25 +12,25 @@ box.sql.execute("CREATE TABLE t2(object INTEGER PRIMARY 
KEY, price INTEGER, coun
 -- should return certain message.
 box.sql.execute("CREATE INDEX i1 ON t1(a+1)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 box.sql.execute("CREATE INDEX i2 ON t1(a)")
 ---
 ...
 box.sql.execute("CREATE INDEX i3 ON t2(price + 100)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 box.sql.execute("CREATE INDEX i4 ON t2(price)")
 ---
 ...
 box.sql.execute("CREATE INDEX i5 ON t2(count + 1)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 box.sql.execute("CREATE INDEX i6 ON t2(count * price)")
 ---
-- error: functional indexes aren't supported in the current version
+- error: Tarantool does not support functional indexes
 ...
 -- Cleaning up.
 box.sql.execute("DROP TABLE t1")
--


Other related posts: