[tarantool-patches] [RFC PATCH 20/23] vinyl: use cbus for communication between scheduler and worker threads

  • From: Vladimir Davydov <vdavydov.dev@xxxxxxxxx>
  • To: kostja@xxxxxxxxxxxxx
  • Date: Sun, 8 Jul 2018 19:48:51 +0300

We need cbus for forwarding deferred DELETE statements generated in a
worker thread during primary index compaction to the tx thread where
they can be inserted into secondary indexes. Since pthread mutex/cond
and cbus are incompatible by their nature, let's rework communication
channel between the tx and worker threads using cbus.

Needed for #2129
---
 src/box/vy_scheduler.c | 215 ++++++++++++++++++++++++++++++-------------------
 src/box/vy_scheduler.h |  25 +-----
 2 files changed, 134 insertions(+), 106 deletions(-)

diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 4d84f9bc..bd3ad4be 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -46,6 +46,7 @@
 #include "errinj.h"
 #include "fiber.h"
 #include "fiber_cond.h"
+#include "cbus.h"
 #include "salad/stailq.h"
 #include "say.h"
 #include "vy_lsm.h"
@@ -55,14 +56,34 @@
 #include "vy_run.h"
 #include "vy_write_iterator.h"
 #include "trivia/util.h"
-#include "tt_pthread.h"
 
 /* Min and max values for vy_scheduler::timeout. */
 #define VY_SCHEDULER_TIMEOUT_MIN       1
 #define VY_SCHEDULER_TIMEOUT_MAX       60
 
-static void *vy_worker_f(void *);
+static int vy_worker_f(va_list);
 static int vy_scheduler_f(va_list);
+static void vy_task_execute_f(struct cmsg *);
+static void vy_task_complete_f(struct cmsg *);
+
+static const struct cmsg_hop vy_task_execute_route[] = {
+       { vy_task_execute_f, NULL },
+};
+
+static const struct cmsg_hop vy_task_complete_route[] = {
+       { vy_task_complete_f, NULL },
+};
+
+/** Vinyl worker thread. */
+struct vy_worker {
+       struct cord cord;
+       /** Pipe from tx to the worker thread. */
+       struct cpipe worker_pipe;
+       /** Pipe from the worker thread to tx. */
+       struct cpipe tx_pipe;
+       /** Link in vy_scheduler::idle_workers. */
+       struct stailq_entry in_idle;
+};
 
 struct vy_task;
 
@@ -89,10 +110,22 @@ struct vy_task_ops {
 };
 
 struct vy_task {
+       /**
+        * CBus message used for sending the task to/from
+        * a worker thread.
+        */
+       struct cmsg cmsg;
        /** Virtual method table. */
        const struct vy_task_ops *ops;
        /** Pointer to the scheduler. */
        struct vy_scheduler *scheduler;
+       /** Worker thread this task is assigned to. */
+       struct vy_worker *worker;
+       /**
+        * Fiber that is currently executing this task in
+        * a worker thread.
+        */
+       struct fiber *fiber;
        /** Return code of ->execute. */
        int status;
        /** If ->execute fails, the error is stored here. */
@@ -126,8 +159,6 @@ struct vy_task {
         */
        double bloom_fpr;
        int64_t page_size;
-       /** Link in vy_scheduler::pending_tasks. */
-       struct stailq_entry in_pending;
        /** Link in vy_scheduler::processed_tasks. */
        struct stailq_entry in_processed;
 };
@@ -241,16 +272,6 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node 
*b)
 #undef HEAP_NAME
 
 static void
-vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events)
-{
-       (void)loop;
-       (void)events;
-       struct vy_scheduler *scheduler = container_of(watcher,
-                       struct vy_scheduler, scheduler_async);
-       fiber_cond_signal(&scheduler->scheduler_cond);
-}
-
-static void
 vy_scheduler_start_workers(struct vy_scheduler *scheduler)
 {
        assert(!scheduler->is_worker_pool_running);
@@ -260,17 +281,19 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
        scheduler->is_worker_pool_running = true;
        scheduler->idle_worker_count = scheduler->worker_pool_size;
        scheduler->worker_pool = calloc(scheduler->worker_pool_size,
-                                       sizeof(struct cord));
+                                       sizeof(*scheduler->worker_pool));
        if (scheduler->worker_pool == NULL)
                panic("failed to allocate vinyl worker pool");
 
-       ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async);
        for (int i = 0; i < scheduler->worker_pool_size; i++) {
                char name[FIBER_NAME_MAX];
                snprintf(name, sizeof(name), "vinyl.writer.%d", i);
-               if (cord_start(&scheduler->worker_pool[i], name,
-                                vy_worker_f, scheduler) != 0)
+               struct vy_worker *worker = &scheduler->worker_pool[i];
+               if (cord_costart(&worker->cord, name, vy_worker_f, worker) != 0)
                        panic("failed to start vinyl worker thread");
+               cpipe_create(&worker->worker_pipe, name);
+               stailq_add_tail_entry(&scheduler->idle_workers,
+                                     worker, in_idle);
        }
 }
 
@@ -280,16 +303,12 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler)
        assert(scheduler->is_worker_pool_running);
        scheduler->is_worker_pool_running = false;
 
-       /* Wake up worker threads. */
-       tt_pthread_mutex_lock(&scheduler->mutex);
-       pthread_cond_broadcast(&scheduler->worker_cond);
-       tt_pthread_mutex_unlock(&scheduler->mutex);
-
-       /* Wait for worker threads to exit. */
-       for (int i = 0; i < scheduler->worker_pool_size; i++)
-               cord_join(&scheduler->worker_pool[i]);
-       ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async);
-
+       for (int i = 0; i < scheduler->worker_pool_size; i++) {
+               struct vy_worker *worker = &scheduler->worker_pool[i];
+               cbus_stop_loop(&worker->worker_pipe);
+               cpipe_destroy(&worker->worker_pipe);
+               cord_join(&worker->cord);
+       }
        free(scheduler->worker_pool);
        scheduler->worker_pool = NULL;
 }
@@ -310,19 +329,14 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int 
write_threads,
        if (scheduler->scheduler_fiber == NULL)
                panic("failed to allocate vinyl scheduler fiber");
 
-       scheduler->scheduler_loop = loop();
        fiber_cond_create(&scheduler->scheduler_cond);
-       ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb);
 
        scheduler->worker_pool_size = write_threads;
        mempool_create(&scheduler->task_pool, cord_slab_cache(),
                       sizeof(struct vy_task));
-       stailq_create(&scheduler->pending_tasks);
+       stailq_create(&scheduler->idle_workers);
        stailq_create(&scheduler->processed_tasks);
 
-       tt_pthread_cond_init(&scheduler->worker_cond, NULL);
-       tt_pthread_mutex_init(&scheduler->mutex, NULL);
-
        vy_dump_heap_create(&scheduler->dump_heap);
        vy_compact_heap_create(&scheduler->compact_heap);
 
@@ -344,9 +358,6 @@ vy_scheduler_destroy(struct vy_scheduler *scheduler)
        if (scheduler->is_worker_pool_running)
                vy_scheduler_stop_workers(scheduler);
 
-       tt_pthread_cond_destroy(&scheduler->worker_cond);
-       tt_pthread_mutex_destroy(&scheduler->mutex);
-
        diag_destroy(&scheduler->diag);
        mempool_destroy(&scheduler->task_pool);
        fiber_cond_destroy(&scheduler->dump_cond);
@@ -647,6 +658,8 @@ vy_run_discard(struct vy_run *run)
 static int
 vy_task_write_run(struct vy_task *task)
 {
+       enum { YIELD_LOOPS = 32 };
+
        struct vy_lsm *lsm = task->lsm;
        struct vy_stmt_stream *wi = task->wi;
 
@@ -668,6 +681,7 @@ vy_task_write_run(struct vy_task *task)
        if (wi->iface->start(wi) != 0)
                goto fail_abort_writer;
        int rc;
+       int loops = 0;
        struct tuple *stmt = NULL;
        while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) {
                inj = errinj(ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT, ERRINJ_DOUBLE);
@@ -678,7 +692,9 @@ vy_task_write_run(struct vy_task *task)
                if (rc != 0)
                        break;
 
-               if (!task->scheduler->is_worker_pool_running) {
+               if (++loops % YIELD_LOOPS == 0)
+                       fiber_sleep(0);
+               if (fiber_is_cancelled()) {
                        diag_set(FiberIsCancelled);
                        rc = -1;
                        break;
@@ -1316,6 +1332,62 @@ err_task:
 }
 
 /**
+ * Fiber function that actually executes a vinyl task.
+ * After finishing a task, it sends it back to tx.
+ */
+static int
+vy_task_f(va_list va)
+{
+       struct vy_task *task = va_arg(va, struct vy_task *);
+       task->status = task->ops->execute(task);
+       if (task->status != 0) {
+               struct diag *diag = diag_get();
+               assert(!diag_is_empty(diag));
+               diag_move(diag, &task->diag);
+       }
+       cmsg_init(&task->cmsg, vy_task_complete_route);
+       cpipe_push(&task->worker->tx_pipe, &task->cmsg);
+       task->fiber = NULL;
+       return 0;
+}
+
+/**
+ * Callback invoked by a worker thread upon receiving a task.
+ * It schedules a fiber which actually executes the task, so
+ * as not to block the event loop.
+ */
+static void
+vy_task_execute_f(struct cmsg *cmsg)
+{
+       struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
+       assert(task->fiber == NULL);
+       task->fiber = fiber_new("task", vy_task_f);
+       if (task->fiber == NULL) {
+               task->status = -1;
+               diag_move(diag_get(), &task->diag);
+               cmsg_init(&task->cmsg, vy_task_complete_route);
+               cpipe_push(&task->worker->tx_pipe, &task->cmsg);
+       } else {
+               fiber_start(task->fiber, task);
+       }
+}
+
+/**
+ * Callback invoked by the tx thread upon receiving an executed
+ * task from a worker thread. It adds the task to the processed
+ * task queue and wakes up the scheduler so that it can complete
+ * it.
+ */
+static void
+vy_task_complete_f(struct cmsg *cmsg)
+{
+       struct vy_task *task = container_of(cmsg, struct vy_task, cmsg);
+       stailq_add_tail_entry(&task->scheduler->processed_tasks,
+                             task, in_processed);
+       fiber_cond_signal(&task->scheduler->scheduler_cond);
+}
+
+/**
  * Create a task for dumping an LSM tree. The new task is returned
  * in @ptask. If there's no LSM tree that needs to be dumped @ptask
  * is set to NULL.
@@ -1503,13 +1575,10 @@ vy_scheduler_f(va_list va)
                struct stailq processed_tasks;
                struct vy_task *task, *next;
                int tasks_failed = 0, tasks_done = 0;
-               bool was_empty;
 
                /* Get the list of processed tasks. */
                stailq_create(&processed_tasks);
-               tt_pthread_mutex_lock(&scheduler->mutex);
                stailq_concat(&processed_tasks, &scheduler->processed_tasks);
-               tt_pthread_mutex_unlock(&scheduler->mutex);
 
                /* Complete and delete all processed tasks. */
                stailq_foreach_entry_safe(task, next, &processed_tasks,
@@ -1518,6 +1587,8 @@ vy_scheduler_f(va_list va)
                                tasks_failed++;
                        else
                                tasks_done++;
+                       stailq_add_entry(&scheduler->idle_workers,
+                                        task->worker, in_idle);
                        vy_task_delete(task);
                        scheduler->idle_worker_count++;
                        assert(scheduler->idle_worker_count <=
@@ -1553,15 +1624,13 @@ vy_scheduler_f(va_list va)
                        goto wait;
 
                /* Queue the task and notify workers if necessary. */
-               tt_pthread_mutex_lock(&scheduler->mutex);
-               was_empty = stailq_empty(&scheduler->pending_tasks);
-               stailq_add_tail_entry(&scheduler->pending_tasks,
-                                     task, in_pending);
-               if (was_empty)
-                       tt_pthread_cond_signal(&scheduler->worker_cond);
-               tt_pthread_mutex_unlock(&scheduler->mutex);
-
+               assert(!stailq_empty(&scheduler->idle_workers));
+               task->worker = stailq_shift_entry(&scheduler->idle_workers,
+                                                 struct vy_worker, in_idle);
                scheduler->idle_worker_count--;
+               cmsg_init(&task->cmsg, vy_task_execute_route);
+               cpipe_push(&task->worker->worker_pipe, &task->cmsg);
+
                fiber_reschedule();
                continue;
 error:
@@ -1597,41 +1666,17 @@ wait:
        return 0;
 }
 
-static void *
-vy_worker_f(void *arg)
+static int
+vy_worker_f(va_list ap)
 {
-       struct vy_scheduler *scheduler = arg;
-       struct vy_task *task = NULL;
-
-       tt_pthread_mutex_lock(&scheduler->mutex);
-       while (scheduler->is_worker_pool_running) {
-               /* Wait for a task */
-               if (stailq_empty(&scheduler->pending_tasks)) {
-                       /* Wake scheduler up if there are no more tasks */
-                       ev_async_send(scheduler->scheduler_loop,
-                                     &scheduler->scheduler_async);
-                       tt_pthread_cond_wait(&scheduler->worker_cond,
-                                            &scheduler->mutex);
-                       continue;
-               }
-               task = stailq_shift_entry(&scheduler->pending_tasks,
-                                         struct vy_task, in_pending);
-               tt_pthread_mutex_unlock(&scheduler->mutex);
-               assert(task != NULL);
-
-               /* Execute task */
-               task->status = task->ops->execute(task);
-               if (task->status != 0) {
-                       struct diag *diag = diag_get();
-                       assert(!diag_is_empty(diag));
-                       diag_move(diag, &task->diag);
-               }
-
-               /* Return processed task to scheduler */
-               tt_pthread_mutex_lock(&scheduler->mutex);
-               stailq_add_tail_entry(&scheduler->processed_tasks,
-                                     task, in_processed);
-       }
-       tt_pthread_mutex_unlock(&scheduler->mutex);
-       return NULL;
+       struct vy_worker *worker = va_arg(ap, struct vy_worker *);
+       struct cbus_endpoint endpoint;
+
+       cpipe_create(&worker->tx_pipe, "tx");
+       cbus_endpoint_create(&endpoint, cord_name(&worker->cord),
+                            fiber_schedule_cb, fiber());
+       cbus_loop(&endpoint);
+       cbus_endpoint_destroy(&endpoint, cbus_process);
+       cpipe_destroy(&worker->tx_pipe);
+       return 0;
 }
diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h
index 284f666e..a235aa6f 100644
--- a/src/box/vy_scheduler.h
+++ b/src/box/vy_scheduler.h
@@ -42,16 +42,15 @@
 #define HEAP_FORWARD_DECLARATION
 #include "salad/heap.h"
 #include "salad/stailq.h"
-#include "tt_pthread.h"
 
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
 
-struct cord;
 struct fiber;
 struct vy_lsm;
 struct vy_run_env;
+struct vy_worker;
 struct vy_scheduler;
 
 typedef void
@@ -61,42 +60,26 @@ typedef void
 struct vy_scheduler {
        /** Scheduler fiber. */
        struct fiber *scheduler_fiber;
-       /** Scheduler event loop. */
-       struct ev_loop *scheduler_loop;
        /** Used to wake up the scheduler fiber from TX. */
        struct fiber_cond scheduler_cond;
-       /** Used to wake up the scheduler from a worker thread. */
-       struct ev_async scheduler_async;
        /**
         * Array of worker threads used for performing
         * dump/compaction tasks.
         */
-       struct cord *worker_pool;
+       struct vy_worker *worker_pool;
        /** Set if the worker threads are running. */
        bool is_worker_pool_running;
        /** Total number of worker threads. */
        int worker_pool_size;
        /** Number worker threads that are currently idle. */
        int idle_worker_count;
+       /** List of idle workers, linked by vy_worker::in_idle. */
+       struct stailq idle_workers;
        /** Memory pool used for allocating vy_task objects. */
        struct mempool task_pool;
-       /** Queue of pending tasks, linked by vy_task::in_pending. */
-       struct stailq pending_tasks;
        /** Queue of processed tasks, linked by vy_task::in_processed. */
        struct stailq processed_tasks;
        /**
-        * Signaled to wake up a worker when there is
-        * a pending task in the input queue. Also used
-        * to stop worker threads on shutdown.
-        */
-       pthread_cond_t worker_cond;
-       /**
-        * Mutex protecting input and output queues and
-        * the condition variable used to wake up worker
-        * threads.
-        */
-       pthread_mutex_t mutex;
-       /**
         * Heap of LSM trees, ordered by dump priority,
         * linked by vy_lsm::in_dump.
         */
-- 
2.11.0


Other related posts: