[tarantool-patches] [PATCH v6 1/2] replication: rename thread from tx to tx_prio

  • From: Konstantin Belyavskiy <k.belyavskiy@xxxxxxxxxxxxx>
  • To: tarantool-patches@xxxxxxxxxxxxx
  • Date: Thu, 12 Jul 2018 17:44:33 +0300

There are two different threads: 'tx' and 'tx_prio'. The latter
does not support yield(). Rename it to avoid misunderstanding.

Needed for #3397
---
 src/box/wal.c | 30 +++++++++++++++++-------------
 1 file changed, 17 insertions(+), 13 deletions(-)

diff --git a/src/box/wal.c b/src/box/wal.c
index 19c9138ee..b88353f36 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -61,8 +61,11 @@ struct wal_thread {
        struct cord cord;
        /** A pipe from 'tx' thread to 'wal' */
        struct cpipe wal_pipe;
-       /** Return pipe from 'wal' to tx' */
-       struct cpipe tx_pipe;
+       /**
+        * Return pipe from 'wal' to tx'. This is a
+        * priority pipe and DOES NOT support yield.
+        */
+       struct cpipe tx_prio_pipe;
 };
 
 /*
@@ -157,7 +160,7 @@ static void
 tx_schedule_commit(struct cmsg *msg);
 
 static struct cmsg_hop wal_request_route[] = {
-       {wal_write_to_disk, &wal_thread.tx_pipe},
+       {wal_write_to_disk, &wal_thread.tx_prio_pipe},
        {tx_schedule_commit, NULL},
 };
 
@@ -349,7 +352,7 @@ wal_open(struct wal_writer *writer)
         * thread.
         */
        struct cbus_call_msg msg;
-       if (cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+       if (cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
                      wal_open_f, NULL, TIMEOUT_INFINITY) == 0) {
                /*
                 * Success: we can now append to
@@ -491,7 +494,7 @@ wal_checkpoint(struct vclock *vclock, bool rotate)
                return 0;
        }
        static struct cmsg_hop wal_checkpoint_route[] = {
-               {wal_checkpoint_f, &wal_thread.tx_pipe},
+               {wal_checkpoint_f, &wal_thread.tx_prio_pipe},
                {wal_checkpoint_done_f, NULL},
        };
        vclock_create(vclock);
@@ -531,7 +534,7 @@ wal_collect_garbage(int64_t lsn)
        struct wal_gc_msg msg;
        msg.lsn = lsn;
        bool cancellable = fiber_set_cancellable(false);
-       cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg.base,
+       cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg.base,
                  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
        fiber_set_cancellable(cancellable);
 }
@@ -622,7 +625,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
                 * list.
                 */
                { wal_writer_clear_bus, &wal_thread.wal_pipe },
-               { wal_writer_clear_bus, &wal_thread.tx_pipe },
+               { wal_writer_clear_bus, &wal_thread.tx_prio_pipe },
                /*
                 * Step 2: writer->rollback queue contains all
                 * messages which need to be rolled back,
@@ -640,7 +643,7 @@ wal_writer_begin_rollback(struct wal_writer *writer)
         * all input until rollback mode is off.
         */
        cmsg_init(&writer->in_rollback, rollback_route);
-       cpipe_push(&wal_thread.tx_pipe, &writer->in_rollback);
+       cpipe_push(&wal_thread.tx_prio_pipe, &writer->in_rollback);
 }
 
 static void
@@ -770,7 +773,7 @@ wal_thread_f(va_list ap)
         * endpoint, to ensure that WAL messages are delivered
         * even when tx fiber pool is used up by net messages.
         */
-       cpipe_create(&wal_thread.tx_pipe, "tx_prio");
+       cpipe_create(&wal_thread.tx_prio_pipe, "tx_prio");
 
        cbus_loop(&endpoint);
 
@@ -799,7 +802,7 @@ wal_thread_f(va_list ap)
        if (xlog_is_open(&vy_log_writer.xlog))
                xlog_close(&vy_log_writer.xlog, false);
 
-       cpipe_destroy(&wal_thread.tx_pipe);
+       cpipe_destroy(&wal_thread.tx_prio_pipe);
        return 0;
 }
 
@@ -944,8 +947,9 @@ wal_write_vy_log(struct journal_entry *entry)
        struct wal_write_vy_log_msg msg;
        msg.entry= entry;
        bool cancellable = fiber_set_cancellable(false);
-       int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg.base,
-                          wal_write_vy_log_f, NULL, TIMEOUT_INFINITY);
+       int rc = cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe,
+                          &msg.base, wal_write_vy_log_f, NULL,
+                          TIMEOUT_INFINITY);
        fiber_set_cancellable(cancellable);
        return rc;
 }
@@ -964,7 +968,7 @@ wal_rotate_vy_log()
 {
        struct cbus_call_msg msg;
        bool cancellable = fiber_set_cancellable(false);
-       cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_pipe, &msg,
+       cbus_call(&wal_thread.wal_pipe, &wal_thread.tx_prio_pipe, &msg,
                  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
        fiber_set_cancellable(cancellable);
 }
-- 
2.14.3 (Apple Git-98)


Other related posts:

  • » [tarantool-patches] [PATCH v6 1/2] replication: rename thread from tx to tx_prio - Konstantin Belyavskiy