[tarantool-patches] [PATCH v3 1/5] wal: preallocate disk space before writing rows

  • From: Vladimir Davydov <vdavydov.dev@xxxxxxxxx>
  • To: kostja@xxxxxxxxxxxxx
  • Date: Wed, 24 Oct 2018 16:43:13 +0300

This function introduces a new xlog method xlog_fallocate() that makes
sure that the requested amount of disk space is available at the current
write position. It does that with posix_fallocate(). The new method is
called before writing anything to WAL, see wal_fallocate(). In order not
to invoke the system call too often, wal_fallocate() allocates disk
space in big chunks (1 MB).

The reason why I'm doing this is that I want to have a single and
clearly defined point in the code to handle ENOSPC errors, where
I could delete old WALs and retry (this is what #3397 is about).

Needed for #3397
---
 CMakeLists.txt            |  1 +
 src/box/journal.c         |  1 +
 src/box/journal.h         |  4 ++++
 src/box/txn.c             |  1 +
 src/box/wal.c             | 46 ++++++++++++++++++++++++++++++++++++++++++++++
 src/box/xlog.c            | 45 +++++++++++++++++++++++++++++++++++++++++++++
 src/box/xlog.h            | 16 ++++++++++++++++
 src/box/xrow.h            | 13 +++++++++++++
 src/trivia/config.h.cmake |  1 +
 9 files changed, 128 insertions(+)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 439a2750..c61d5569 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -82,6 +82,7 @@ check_symbol_exists(fdatasync unistd.h HAVE_FDATASYNC)
 check_symbol_exists(pthread_yield pthread.h HAVE_PTHREAD_YIELD)
 check_symbol_exists(sched_yield sched.h HAVE_SCHED_YIELD)
 check_symbol_exists(posix_fadvise fcntl.h HAVE_POSIX_FADVISE)
+check_symbol_exists(posix_fallocate fcntl.h HAVE_POSIX_FALLOCATE)
 check_symbol_exists(mremap sys/mman.h HAVE_MREMAP)
 
 check_function_exists(sync_file_range HAVE_SYNC_FILE_RANGE)
diff --git a/src/box/journal.c b/src/box/journal.c
index fd4f9539..7498ba19 100644
--- a/src/box/journal.c
+++ b/src/box/journal.c
@@ -66,6 +66,7 @@ journal_entry_new(size_t n_rows)
                diag_set(OutOfMemory, size, "region", "struct journal_entry");
                return NULL;
        }
+       entry->approx_len = 0;
        entry->n_rows = n_rows;
        entry->res = -1;
        entry->fiber = fiber();
diff --git a/src/box/journal.h b/src/box/journal.h
index 1d64a7bd..e5231688 100644
--- a/src/box/journal.h
+++ b/src/box/journal.h
@@ -59,6 +59,10 @@ struct journal_entry {
         */
        struct fiber *fiber;
        /**
+        * Approximate size of this request when encoded.
+        */
+       size_t approx_len;
+       /**
         * The number of rows in the request.
         */
        int n_rows;
diff --git a/src/box/txn.c b/src/box/txn.c
index 17d97d76..1c75ed6b 100644
--- a/src/box/txn.c
+++ b/src/box/txn.c
@@ -272,6 +272,7 @@ txn_write_to_wal(struct txn *txn)
                if (stmt->row == NULL)
                        continue; /* A read (e.g. select) request */
                *row++ = stmt->row;
+               req->approx_len += xrow_approx_len(stmt->row);
        }
        assert(row == req->rows + req->n_rows);
 
diff --git a/src/box/wal.c b/src/box/wal.c
index f87b40ae..b7acb50e 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -44,6 +44,17 @@
 #include "coio_task.h"
 #include "replication.h"
 
+enum {
+       /**
+        * Size of disk space to preallocate with xlog_fallocate().
+        * Obviously, we want to call this function as infrequent as
+        * possible to avoid the overhead associated with a system
+        * call, however at the same time we do not want to call it
+        * to allocate too big chunks, because this may increase tx
+        * latency. 1 MB seems to be a well balanced choice.
+        */
+       WAL_FALLOCATE_LEN = 1024 * 1024,
+};
 
 const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
 
@@ -126,6 +137,8 @@ struct wal_writer
 
 struct wal_msg {
        struct cmsg base;
+       /** Approximate size of this request when encoded. */
+       size_t approx_len;
        /** Input queue, on output contains all committed requests. */
        struct stailq commit;
        /**
@@ -168,6 +181,7 @@ static void
 wal_msg_create(struct wal_msg *batch)
 {
        cmsg_init(&batch->base, wal_request_route);
+       batch->approx_len = 0;
        stailq_create(&batch->commit);
        stailq_create(&batch->rollback);
 }
@@ -603,6 +617,31 @@ wal_opt_rotate(struct wal_writer *writer)
        return 0;
 }
 
+/**
+ * Make sure there's enough disk space to append @len bytes
+ * of data to the current WAL.
+ */
+static int
+wal_fallocate(struct wal_writer *writer, size_t len)
+{
+       struct xlog *l = &writer->current_wal;
+
+       /*
+        * The actual write size can be greater than the sum size
+        * of encoded rows (compression, fixheaders). Double the
+        * given length to get a rough upper bound estimate.
+        */
+       len *= 2;
+
+       if (l->allocated >= len)
+               return 0;
+       if (xlog_fallocate(l, MAX(len, WAL_FALLOCATE_LEN)) == 0)
+               return 0;
+
+       diag_log();
+       return -1;
+}
+
 static void
 wal_writer_clear_bus(struct cmsg *msg)
 {
@@ -689,6 +728,12 @@ wal_write_to_disk(struct cmsg *msg)
                return wal_writer_begin_rollback(writer);
        }
 
+       /* Ensure there's enough disk space before writing anything. */
+       if (wal_fallocate(writer, wal_msg->approx_len) != 0) {
+               stailq_concat(&wal_msg->rollback, &wal_msg->commit);
+               return wal_writer_begin_rollback(writer);
+       }
+
        /*
         * This code tries to write queued requests (=transactions) using as
         * few I/O syscalls and memory copies as possible. For this reason
@@ -858,6 +903,7 @@ wal_write(struct journal *journal, struct journal_entry 
*entry)
                stailq_add_tail_entry(&batch->commit, entry, fifo);
                cpipe_push(&wal_thread.wal_pipe, &batch->base);
        }
+       batch->approx_len += entry->approx_len;
        wal_thread.wal_pipe.n_input += entry->n_rows * XROW_IOVMAX;
        cpipe_flush_input(&wal_thread.wal_pipe);
        /**
diff --git a/src/box/xlog.c b/src/box/xlog.c
index cd2d6e1d..0f2f97b7 100644
--- a/src/box/xlog.c
+++ b/src/box/xlog.c
@@ -990,6 +990,26 @@ xdir_create_xlog(struct xdir *dir, struct xlog *xlog,
        return 0;
 }
 
+ssize_t
+xlog_fallocate(struct xlog *log, size_t len)
+{
+#ifdef HAVE_POSIX_FALLOCATE
+       int rc = posix_fallocate(log->fd, log->offset + log->allocated, len);
+       if (rc != 0) {
+               errno = rc;
+               diag_set(SystemError, "%s: can't allocate disk space",
+                        log->filename);
+               return -1;
+       }
+       log->allocated += len;
+       return 0;
+#else
+       (void)log;
+       (void)len;
+       return 0;
+#endif /* HAVE_POSIX_FALLOCATE */
+}
+
 /**
  * Write a sequence of uncompressed xrow objects.
  *
@@ -1179,8 +1199,13 @@ xlog_tx_write(struct xlog *log)
                if (lseek(log->fd, log->offset, SEEK_SET) < 0 ||
                    ftruncate(log->fd, log->offset) != 0)
                        panic_syserror("failed to truncate xlog after write 
error");
+               log->allocated = 0;
                return -1;
        }
+       if (log->allocated > (size_t)written)
+               log->allocated -= written;
+       else
+               log->allocated = 0;
        log->offset += written;
        log->rows += log->tx_rows;
        log->tx_rows = 0;
@@ -1378,6 +1403,17 @@ xlog_write_eof(struct xlog *l)
                diag_set(ClientError, ER_INJECTION, "xlog write injection");
                return -1;
        });
+
+       /*
+        * Free disk space preallocated with xlog_fallocate().
+        * Don't write the eof marker if this fails, otherwise
+        * we'll get "data after eof marker" error on recovery.
+        */
+       if (l->allocated > 0 && ftruncate(l->fd, l->offset) < 0) {
+               diag_set(SystemError, "ftruncate() failed");
+               return -1;
+       }
+
        if (fio_writen(l->fd, &eof_marker, sizeof(eof_marker)) < 0) {
                diag_set(SystemError, "write() failed");
                return -1;
@@ -1793,6 +1829,15 @@ xlog_cursor_next_tx(struct xlog_cursor *i)
                return -1;
        if (rc > 0)
                return 1;
+       if (load_u32(i->rbuf.rpos) == 0) {
+               /*
+                * Space preallocated with xlog_fallocate().
+                * Treat as eof and clear the buffer.
+                */
+               i->read_offset -= ibuf_used(&i->rbuf);
+               ibuf_reset(&i->rbuf);
+               return 1;
+       }
        if (load_u32(i->rbuf.rpos) == eof_marker) {
                /* eof marker found */
                goto eof_found;
diff --git a/src/box/xlog.h b/src/box/xlog.h
index e3c38486..e2fdfd74 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -314,6 +314,11 @@ struct xlog {
        /** The current offset in the log file, for writing. */
        off_t offset;
        /**
+        * Size of disk space preallocated at @offset with
+        * xlog_fallocate().
+        */
+       size_t allocated;
+       /**
         * Output buffer, works as row accumulator for
         * compression.
         */
@@ -434,6 +439,17 @@ int
 xlog_rename(struct xlog *l);
 
 /**
+ * Allocate @size bytes of disk space at the end of the given
+ * xlog file.
+ *
+ * Returns -1 on fallocate error and sets both diag and errno
+ * accordingly. On success returns 0. If the underlying OS
+ * does not support fallocate, this function also returns 0.
+ */
+ssize_t
+xlog_fallocate(struct xlog *log, size_t size);
+
+/**
  * Write a row to xlog, 
  *
  * @retval count of writen bytes
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 3fc007a8..32d4d54b 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -68,6 +68,19 @@ struct xrow_header {
 };
 
 /**
+ * Return the max size which the given row is going to take when
+ * encoded into a binary packet.
+ */
+static inline size_t
+xrow_approx_len(struct xrow_header *row)
+{
+       size_t len = XROW_HEADER_LEN_MAX;
+       for (int i = 0; i < row->bodycnt; i++)
+               len += row->body[i].iov_len;
+       return len;
+}
+
+/**
  * Encode xrow into a binary packet
  *
  * @param header xrow
diff --git a/src/trivia/config.h.cmake b/src/trivia/config.h.cmake
index 8894b436..53eae2fe 100644
--- a/src/trivia/config.h.cmake
+++ b/src/trivia/config.h.cmake
@@ -166,6 +166,7 @@
 #cmakedefine HAVE_PTHREAD_YIELD 1
 #cmakedefine HAVE_SCHED_YIELD 1
 #cmakedefine HAVE_POSIX_FADVISE 1
+#cmakedefine HAVE_POSIX_FALLOCATE 1
 #cmakedefine HAVE_MREMAP 1
 #cmakedefine HAVE_SYNC_FILE_RANGE 1
 
-- 
2.11.0


Other related posts: