[tarantool-patches] [RFC PATCH 18/23] vinyl: store pointer to scheduler in struct vy_task

  • From: Vladimir Davydov <vdavydov.dev@xxxxxxxxx>
  • To: kostja@xxxxxxxxxxxxx
  • Date: Sun, 8 Jul 2018 19:48:49 +0300

Currently, we don't really need it, but once we switch communication
channel between the scheduler and workers from pthread mutex/cond to
cbus (needed for #2129), tasks won't be completed on behalf of the
scheduler fiber and hence we will need a back pointer from vy_task to
vy_scheduler.

Needed for #2129
---
 src/box/vy_scheduler.c | 74 ++++++++++++++++++++++++++------------------------
 1 file changed, 39 insertions(+), 35 deletions(-)

diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index c175bea8..5684f4d4 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -72,24 +72,27 @@ struct vy_task_ops {
         * which is too heavy for the tx thread (like IO or compression).
         * Returns 0 on success. On failure returns -1 and sets diag.
         */
-       int (*execute)(struct vy_scheduler *scheduler, struct vy_task *task);
+       int (*execute)(struct vy_task *task);
        /**
         * This function is called by the scheduler upon task completion.
         * It may be used to finish the task from the tx thread context.
         *
         * Returns 0 on success. On failure returns -1 and sets diag.
         */
-       int (*complete)(struct vy_scheduler *scheduler, struct vy_task *task);
+       int (*complete)(struct vy_task *task);
        /**
         * This function is called by the scheduler if either ->execute
         * or ->complete failed. It may be used to undo changes done to
         * the LSM tree when preparing the task.
         */
-       void (*abort)(struct vy_scheduler *scheduler, struct vy_task *task);
+       void (*abort)(struct vy_task *task);
 };
 
 struct vy_task {
+       /** Virtual method table. */
        const struct vy_task_ops *ops;
+       /** Pointer to the scheduler. */
+       struct vy_scheduler *scheduler;
        /** Return code of ->execute. */
        int status;
        /** If ->execute fails, the error is stored here. */
@@ -138,10 +141,10 @@ struct vy_task {
  * does not free it from under us.
  */
 static struct vy_task *
-vy_task_new(struct mempool *pool, struct vy_lsm *lsm,
+vy_task_new(struct vy_scheduler *scheduler, struct vy_lsm *lsm,
            const struct vy_task_ops *ops)
 {
-       struct vy_task *task = mempool_alloc(pool);
+       struct vy_task *task = mempool_alloc(&scheduler->task_pool);
        if (task == NULL) {
                diag_set(OutOfMemory, sizeof(*task),
                         "mempool", "struct vy_task");
@@ -149,16 +152,17 @@ vy_task_new(struct mempool *pool, struct vy_lsm *lsm,
        }
        memset(task, 0, sizeof(*task));
        task->ops = ops;
+       task->scheduler = scheduler;
        task->lsm = lsm;
        task->cmp_def = key_def_dup(lsm->cmp_def);
        if (task->cmp_def == NULL) {
-               mempool_free(pool, task);
+               mempool_free(&scheduler->task_pool, task);
                return NULL;
        }
        task->key_def = key_def_dup(lsm->key_def);
        if (task->key_def == NULL) {
                key_def_delete(task->cmp_def);
-               mempool_free(pool, task);
+               mempool_free(&scheduler->task_pool, task);
                return NULL;
        }
        vy_lsm_ref(lsm);
@@ -168,14 +172,13 @@ vy_task_new(struct mempool *pool, struct vy_lsm *lsm,
 
 /** Free a task allocated with vy_task_new(). */
 static void
-vy_task_delete(struct mempool *pool, struct vy_task *task)
+vy_task_delete(struct vy_task *task)
 {
        key_def_delete(task->cmp_def);
        key_def_delete(task->key_def);
        vy_lsm_unref(task->lsm);
        diag_destroy(&task->diag);
-       TRASH(task);
-       mempool_free(pool, task);
+       mempool_free(&task->scheduler->task_pool, task);
 }
 
 static bool
@@ -643,7 +646,7 @@ vy_run_discard(struct vy_run *run)
 }
 
 static int
-vy_task_write_run(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_write_run(struct vy_task *task)
 {
        struct vy_lsm *lsm = task->lsm;
        struct vy_stmt_stream *wi = task->wi;
@@ -676,7 +679,7 @@ vy_task_write_run(struct vy_scheduler *scheduler, struct 
vy_task *task)
                if (rc != 0)
                        break;
 
-               if (!scheduler->is_worker_pool_running) {
+               if (!task->scheduler->is_worker_pool_running) {
                        diag_set(FiberIsCancelled);
                        rc = -1;
                        break;
@@ -698,14 +701,15 @@ fail:
 }
 
 static int
-vy_task_dump_execute(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_dump_execute(struct vy_task *task)
 {
-       return vy_task_write_run(scheduler, task);
+       return vy_task_write_run(task);
 }
 
 static int
-vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_dump_complete(struct vy_task *task)
 {
+       struct vy_scheduler *scheduler = task->scheduler;
        struct vy_lsm *lsm = task->lsm;
        struct vy_run *new_run = task->new_run;
        int64_t dump_lsn = new_run->dump_lsn;
@@ -871,8 +875,9 @@ fail:
 }
 
 static void
-vy_task_dump_abort(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_dump_abort(struct vy_task *task)
 {
+       struct vy_scheduler *scheduler = task->scheduler;
        struct vy_lsm *lsm = task->lsm;
 
        assert(lsm->is_dumping);
@@ -975,8 +980,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct 
vy_lsm *lsm,
                return 0;
        }
 
-       struct vy_task *task = vy_task_new(&scheduler->task_pool,
-                                          lsm, &dump_ops);
+       struct vy_task *task = vy_task_new(scheduler, lsm, &dump_ops);
        if (task == NULL)
                goto err;
 
@@ -1031,7 +1035,7 @@ err_wi_sub:
 err_wi:
        vy_run_discard(new_run);
 err_run:
-       vy_task_delete(&scheduler->task_pool, task);
+       vy_task_delete(task);
 err:
        diag_log();
        say_error("%s: could not start dump", vy_lsm_name(lsm));
@@ -1039,14 +1043,15 @@ err:
 }
 
 static int
-vy_task_compact_execute(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_compact_execute(struct vy_task *task)
 {
-       return vy_task_write_run(scheduler, task);
+       return vy_task_write_run(task);
 }
 
 static int
-vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_compact_complete(struct vy_task *task)
 {
+       struct vy_scheduler *scheduler = task->scheduler;
        struct vy_lsm *lsm = task->lsm;
        struct vy_range *range = task->range;
        struct vy_run *new_run = task->new_run;
@@ -1191,8 +1196,9 @@ vy_task_compact_complete(struct vy_scheduler *scheduler, 
struct vy_task *task)
 }
 
 static void
-vy_task_compact_abort(struct vy_scheduler *scheduler, struct vy_task *task)
+vy_task_compact_abort(struct vy_task *task)
 {
+       struct vy_scheduler *scheduler = task->scheduler;
        struct vy_lsm *lsm = task->lsm;
        struct vy_range *range = task->range;
 
@@ -1243,8 +1249,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, 
struct vy_lsm *lsm,
                return 0;
        }
 
-       struct vy_task *task = vy_task_new(&scheduler->task_pool,
-                                          lsm, &compact_ops);
+       struct vy_task *task = vy_task_new(scheduler, lsm, &compact_ops);
        if (task == NULL)
                goto err_task;
 
@@ -1303,7 +1308,7 @@ err_wi_sub:
 err_wi:
        vy_run_discard(new_run);
 err_run:
-       vy_task_delete(&scheduler->task_pool, task);
+       vy_task_delete(task);
 err_task:
        diag_log();
        say_error("%s: could not start compacting range %s: %s",
@@ -1444,12 +1449,11 @@ fail:
 }
 
 static int
-vy_scheduler_complete_task(struct vy_scheduler *scheduler,
-                          struct vy_task *task)
+vy_task_complete(struct vy_task *task)
 {
        if (task->lsm->is_dropped) {
                if (task->ops->abort)
-                       task->ops->abort(scheduler, task);
+                       task->ops->abort(task);
                return 0;
        }
 
@@ -1464,7 +1468,7 @@ vy_scheduler_complete_task(struct vy_scheduler *scheduler,
                        diag_move(diag_get(), diag);
                        goto fail; });
        if (task->ops->complete &&
-           task->ops->complete(scheduler, task) != 0) {
+           task->ops->complete(task) != 0) {
                assert(!diag_is_empty(diag_get()));
                diag_move(diag_get(), diag);
                goto fail;
@@ -1472,8 +1476,8 @@ vy_scheduler_complete_task(struct vy_scheduler *scheduler,
        return 0;
 fail:
        if (task->ops->abort)
-               task->ops->abort(scheduler, task);
-       diag_move(diag, &scheduler->diag);
+               task->ops->abort(task);
+       diag_move(diag, &task->scheduler->diag);
        return -1;
 }
 
@@ -1510,11 +1514,11 @@ vy_scheduler_f(va_list va)
 
                /* Complete and delete all processed tasks. */
                stailq_foreach_entry_safe(task, next, &output_queue, link) {
-                       if (vy_scheduler_complete_task(scheduler, task) != 0)
+                       if (vy_task_complete(task) != 0)
                                tasks_failed++;
                        else
                                tasks_done++;
-                       vy_task_delete(&scheduler->task_pool, task);
+                       vy_task_delete(task);
                        scheduler->workers_available++;
                        assert(scheduler->workers_available <=
                               scheduler->worker_pool_size);
@@ -1615,7 +1619,7 @@ vy_worker_f(void *arg)
                assert(task != NULL);
 
                /* Execute task */
-               task->status = task->ops->execute(scheduler, task);
+               task->status = task->ops->execute(task);
                if (task->status != 0) {
                        struct diag *diag = diag_get();
                        assert(!diag_is_empty(diag));
-- 
2.11.0


Other related posts: