There's no point in separating REPLACE path between the cases when
the space has secondary indexes and when it only has the primary
index, because they are quite similar. Let's fold vy_replace_one
and vy_replace_impl into vy_replace to remove code duplication.
---
src/box/vinyl.c | 219 +++++++++++++++++---------------------------------------
1 file changed, 67 insertions(+), 152 deletions(-)
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 64004226..b93232a3 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1539,152 +1539,6 @@ vy_insert_secondary(struct vy_env *env, struct vy_tx
*tx, struct space *space,
}
/**
- * Execute REPLACE in a space with a single index, possibly with
- * lookup for an old tuple if the space has at least one
- * on_replace trigger.
- * @param env Vinyl environment.
- * @param tx Current transaction.
- * @param space Space in which replace.
- * @param request Request with the tuple data.
- * @param stmt Statement for triggers is filled with old
- * statement.
- *
- * @retval 0 Success.
- * @retval -1 Memory error OR duplicate key error OR the primary
- * index is not found OR a tuple reference increment
- * error.
- */
-static inline int
-vy_replace_one(struct vy_env *env, struct vy_tx *tx, struct space *space,
- struct request *request, struct txn_stmt *stmt)
-{
- (void)env;
- assert(tx != NULL && tx->state == VINYL_TX_READY);
- struct vy_lsm *pk = vy_lsm(space->index[0]);
- assert(pk->index_id == 0);
- if (tuple_validate_raw(pk->mem_format, request->tuple))
- return -1;
- struct tuple *new_tuple =
- vy_stmt_new_replace(pk->mem_format, request->tuple,
- request->tuple_end);
- if (new_tuple == NULL)
- return -1;
- /**
- * If the space has triggers, then we need to fetch the
- * old tuple to pass it to the trigger.
- */
- if (stmt != NULL && !rlist_empty(&space->on_replace)) {
- if (vy_get(pk, tx, vy_tx_read_view(tx),
- new_tuple, &stmt->old_tuple) != 0)
- goto error_unref;
- }
- if (vy_tx_set(tx, pk, new_tuple))
- goto error_unref;
-
- if (stmt != NULL)
- stmt->new_tuple = new_tuple;
- else
- tuple_unref(new_tuple);
- return 0;
-
-error_unref:
- tuple_unref(new_tuple);
- return -1;
-}
-
-/**
- * Execute REPLACE in a space with multiple indexes and lookup for
- * an old tuple, that should has been set in \p stmt->old_tuple if
- * the space has at least one on_replace trigger.
- * @param env Vinyl environment.
- * @param tx Current transaction.
- * @param space Vinyl space.
- * @param request Request with the tuple data.
- * @param stmt Statement for triggers filled with old
- * statement.
- *
- * @retval 0 Success
- * @retval -1 Memory error OR duplicate key error OR the primary
- * index is not found OR a tuple reference increment
- * error.
- */
-static inline int
-vy_replace_impl(struct vy_env *env, struct vy_tx *tx, struct space *space,
- struct request *request, struct txn_stmt *stmt)
-{
- assert(tx != NULL && tx->state == VINYL_TX_READY);
- struct tuple *old_stmt = NULL;
- struct tuple *new_stmt = NULL;
- struct tuple *delete = NULL;
- struct vy_lsm *pk = vy_lsm_find(space, 0);
- if (pk == NULL) /* space has no primary key */
- return -1;
- /* Primary key is dumped last. */
- assert(!vy_is_committed_one(env, pk));
- assert(pk->index_id == 0);
- if (tuple_validate_raw(pk->mem_format, request->tuple))
- return -1;
- new_stmt = vy_stmt_new_replace(pk->mem_format, request->tuple,
- request->tuple_end);
- if (new_stmt == NULL)
- return -1;
-
- /* Get full tuple from the primary index. */
- if (vy_get(pk, tx, vy_tx_read_view(tx), new_stmt, &old_stmt) != 0)
- goto error;
-
- /*
- * Replace in the primary index without explicit deletion
- * of the old tuple.
- */
- if (vy_tx_set(tx, pk, new_stmt) != 0)
- goto error;
-
- if (space->index_count > 1 && old_stmt != NULL) {
- delete = vy_stmt_new_surrogate_delete(pk->mem_format, old_stmt);
- if (delete == NULL)
- goto error;
- }
-
- /* Update secondary keys, avoid duplicates. */
- for (uint32_t iid = 1; iid < space->index_count; ++iid) {
- struct vy_lsm *lsm = vy_lsm(space->index[iid]);
- if (vy_is_committed_one(env, lsm))
- continue;
- /*
- * Delete goes first, so if old and new keys
- * fully match, there is no look up beyond the
- * transaction index.
- */
- if (old_stmt != NULL) {
- if (vy_tx_set(tx, lsm, delete) != 0)
- goto error;
- }
- if (vy_insert_secondary(env, tx, space, lsm, new_stmt) != 0)
- goto error;
- }
- if (delete != NULL)
- tuple_unref(delete);
- /*
- * The old tuple is used if there is an on_replace
- * trigger.
- */
- if (stmt != NULL) {
- stmt->new_tuple = new_stmt;
- stmt->old_tuple = old_stmt;
- }
- return 0;
-error:
- if (delete != NULL)
- tuple_unref(delete);
- if (old_stmt != NULL)
- tuple_unref(old_stmt);
- if (new_stmt != NULL)
- tuple_unref(new_stmt);
- return -1;
-}
-
-/**
* Check that the key can be used for search in a unique index
* LSM tree.
* @param lsm LSM tree for checking.
@@ -2307,18 +2161,79 @@ static int
vy_replace(struct vy_env *env, struct vy_tx *tx, struct txn_stmt *stmt,
struct space *space, struct request *request)
{
+ assert(tx != NULL && tx->state == VINYL_TX_READY);
if (vy_is_committed(env, space))
return 0;
if (request->type == IPROTO_INSERT)
return vy_insert(env, tx, stmt, space, request);
- if (space->index_count == 1) {
- /* Replace in a space with a single index. */
- return vy_replace_one(env, tx, space, request, stmt);
- } else {
- /* Replace in a space with secondary indexes. */
- return vy_replace_impl(env, tx, space, request, stmt);
+ struct vy_lsm *pk = vy_lsm_find(space, 0);
+ if (pk == NULL)
+ return -1;
+ /* Primary key is dumped last. */
+ assert(!vy_is_committed_one(env, pk));
+
+ /* Validate and create a statement for the new tuple. */
+ if (tuple_validate_raw(pk->mem_format, request->tuple))
+ return -1;
+ stmt->new_tuple = vy_stmt_new_replace(pk->mem_format, request->tuple,
+ request->tuple_end);
+ if (stmt->new_tuple == NULL)
+ return -1;
+ /*
+ * Get the overwritten tuple from the primary index if
+ * the space has on_replace triggers, in which case we
+ * need to pass the old tuple to trigger callbacks, or
+ * if the space has secondary indexes and so we need
+ * the old tuple to delete it from them.
+ */
+ if (space->index_count > 1 || !rlist_empty(&space->on_replace)) {
+ if (vy_get(pk, tx, vy_tx_read_view(tx),
+ stmt->new_tuple, &stmt->old_tuple) != 0)
+ return -1;
}
+ /*
+ * Replace in the primary index without explicit deletion
+ * of the old tuple.
+ */
+ if (vy_tx_set(tx, pk, stmt->new_tuple) != 0)
+ return -1;
+ if (space->index_count == 1)
+ return 0;
+ /*
+ * Replace in secondary indexes with explicit deletion
+ * of the old tuple, if any.
+ */
+ int rc = 0;
+ struct tuple *delete = NULL;
+ if (stmt->old_tuple != NULL) {
+ delete = vy_stmt_new_surrogate_delete(pk->mem_format,
+ stmt->old_tuple);
+ if (delete == NULL)
+ return -1;
+ }
+ for (uint32_t i = 1; i < space->index_count; i++) {
+ struct vy_lsm *lsm = vy_lsm(space->index[i]);
+ if (vy_is_committed_one(env, lsm))
+ continue;
+ /*
+ * DELETE goes first, so if old and new keys
+ * fully match, there is no look up beyond the
+ * transaction write set.
+ */
+ if (delete != NULL) {
+ rc = vy_tx_set(tx, lsm, delete);
+ if (rc != 0)
+ break;
+ }
+ rc = vy_insert_secondary(env, tx, space, lsm,
+ stmt->new_tuple);
+ if (rc != 0)
+ break;
+ }
+ if (delete != NULL)
+ tuple_unref(delete);
+ return rc;
}
static int
--
2.11.0