[PATCH] lib: Support for rudimentary retransmission

  • From: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Fri, 27 Jul 2018 00:18:20 +0200

This adds rudimentary support for sending and processing
acknowledgments and doing retransmission.

It replaces the generic timerwheel with a specific one for
retransmission. This is currently a fixed wheel allowing
retransmissions to be scheduled up to about 32 seconds into the
future. It currently has an 8ms resolution. This could be made
configurable in the future. Failures of the flow (i.e. rtx not
working) are indicated by the rxmwheel_move() function returning a fd.
This is currently not yet handled (maybe just setting the state of the
flow to FLOWDOWN is a better solution).

The shm_rdrbuff tracks the number of users of a du_buff. One user is
the full stack, each retransmission will increment the refs counter
(which effectively acts as a semaphore). The refs counter is
decremented when a packet is acked. The du_buff is only allowed to be
removed if there is only one user left (the "stack").

When a packet is retransmitted, it is copied in the rdrbuff. This is
to ensure integrity of the packet when multiple layers do
retransmission and it is passed down the stack again.

Signed-off-by: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
---
 include/ouroboros/shm_du_buff.h |   4 +
 include/ouroboros/timerwheel.h  |  47 ------
 src/lib/CMakeLists.txt          |   1 -
 src/lib/dev.c                   |  15 +-
 src/lib/frct.c                  |  73 +++++-----
 src/lib/rxmwheel.c              | 249 ++++++++++++++++++++++++++++++++
 src/lib/shm_rdrbuff.c           |  45 ++++--
 src/lib/tests/CMakeLists.txt    |   7 -
 src/lib/tests/timerwheel_test.c | 104 -------------
 src/lib/timerwheel.c            | 232 -----------------------------
 10 files changed, 327 insertions(+), 450 deletions(-)
 delete mode 100644 include/ouroboros/timerwheel.h
 create mode 100644 src/lib/rxmwheel.c
 delete mode 100644 src/lib/tests/timerwheel_test.c
 delete mode 100644 src/lib/timerwheel.c

diff --git a/include/ouroboros/shm_du_buff.h b/include/ouroboros/shm_du_buff.h
index 31090cd..066898d 100644
--- a/include/ouroboros/shm_du_buff.h
+++ b/include/ouroboros/shm_du_buff.h
@@ -49,4 +49,8 @@ uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb,
 void      shm_du_buff_truncate(struct shm_du_buff * sdb,
                                size_t               len);
 
+int       shm_du_buff_wait_ack(struct shm_du_buff * sdb);
+
+int       shm_du_buff_ack(struct shm_du_buff * sdb);
+
 #endif /* OUROBOROS_SHM_DU_BUFF_H */
diff --git a/include/ouroboros/timerwheel.h b/include/ouroboros/timerwheel.h
deleted file mode 100644
index 231e810..0000000
--- a/include/ouroboros/timerwheel.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Ring buffer for incoming SDUs
- *
- *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
- *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_LIB_TIMERWHEEL_H
-#define OUROBOROS_LIB_TIMERWHEEL_H
-
-struct timerwheel;
-
-struct timerwheel * timerwheel_create(time_t resolution,
-                                      time_t max_delay);
-
-void                timerwheel_destroy(struct timerwheel * tw);
-
-struct tw_f *       timerwheel_start(struct timerwheel * tw,
-                                     void (* func)(void *),
-                                     void *              arg,
-                                     time_t              delay); /* ms */
-
-int                 timerwheel_restart(struct timerwheel * tw,
-                                       struct tw_f *       f,
-                                       time_t              delay); /* ms */
-
-void                timerwheel_stop(struct timerwheel * tw,
-                                    struct tw_f *       f);
-
-void                timerwheel_move(struct timerwheel * tw);
-
-#endif /* OUROBOROS_LIB_TIMERWHEEL_H */
diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index e7e0780..47e93d6 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -179,7 +179,6 @@ set(SOURCE_FILES_DEV
   # Add source files here
   cacep.c
   dev.c
-  timerwheel.c
   )
 
 set(SOURCE_FILES_IRM
diff --git a/src/lib/dev.c b/src/lib/dev.c
index dd908f7..e69fec2 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -40,7 +40,6 @@
 #include <ouroboros/utils.h>
 #include <ouroboros/fqueue.h>
 #include <ouroboros/qoscube.h>
-#include <ouroboros/timerwheel.h>
 
 #include <stdlib.h>
 #include <string.h>
@@ -83,6 +82,9 @@ struct port {
         pthread_cond_t  state_cond;
 };
 
+#define frcti_to_flow(frcti) \
+        ((struct flow *)((uint8_t *) frcti - offsetof(struct flow, frcti)))
+
 struct flow {
         struct shm_rbuff *    rx_rb;
         struct shm_rbuff *    tx_rb;
@@ -396,12 +398,12 @@ static void init(int     argc,
         if (pthread_rwlock_init(&ai.lock, NULL))
                 goto fail_lock;
 
-        if (frct_init())
-                goto fail_frct;
+        if (rxmwheel_init())
+                goto fail_rxmwheel;
 
         return;
 
- fail_frct:
+ fail_rxmwheel:
         pthread_rwlock_destroy(&ai.lock);
  fail_lock:
         for (i = 0; i < SYS_MAX_FLOWS; ++i)
@@ -437,7 +439,7 @@ static void fini(void)
         if (ai.fds == NULL)
                 return;
 
-        frct_fini();
+        rxmwheel_fini();
 
         if (ai.prog != NULL)
                 free(ai.prog);
@@ -463,9 +465,6 @@ static void fini(void)
 
         shm_rdrbuff_close(ai.rdrb);
 
-        if (ai.tw != NULL)
-                timerwheel_destroy(ai.tw);
-
         free(ai.flows);
         free(ai.ports);
 
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 296d5b2..0f3173c 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -42,8 +42,9 @@ struct frct_cr {
         bool     conf;
         uint8_t  cflags;
 
-        time_t   act;
-        time_t   inact;
+        time_t   rto;     /* ms */
+        time_t   act;     /* s */
+        time_t   inact;   /* s */
 };
 
 struct frcti {
@@ -57,16 +58,9 @@ struct frcti {
         struct frct_cr   rcv_cr;
 
         ssize_t          rq[RQ_SIZE];
-
-        struct timespec  rtt;
-
         pthread_rwlock_t lock;
 };
 
-struct {
-        struct timerwheel * tw;
-} frct;
-
 enum frct_flags {
         FRCT_DATA = 0x01, /* PDU carries data */
         FRCT_DRF  = 0x02, /* Data run flag    */
@@ -89,21 +83,7 @@ struct frct_pci {
         uint32_t ackno;
 } __attribute__((packed));
 
-static int frct_init(void)
-{
-        frct.tw = timerwheel_create(TW_RESOLUTION, TW_RESOLUTION * 
TW_ELEMENTS);
-        if (frct.tw == NULL)
-                return -1;
-
-        return 0;
-}
-
-static void frct_fini(void)
-{
-        assert(frct.tw);
-
-        timerwheel_destroy(frct.tw);
-}
+#include <rxmwheel.c>
 
 static struct frcti * frcti_create(int       fd,
                                    qoscube_t qc)
@@ -140,6 +120,8 @@ static struct frcti * frcti_create(int       fd,
         frcti->snd_cr.conf   = true;
         frcti->snd_cr.inact  = 3 * delta_t + 1;
         frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1);
+        /* Initial rto. FIXME: recalc using Karn algorithm. */
+        frcti->snd_cr.rto    = 120;
 
         frcti->rcv_cr.inact  = 2 * delta_t + 1;
         frcti->rcv_cr.act    = now.tv_sec - (frcti->rcv_cr.inact + 1);
@@ -159,6 +141,8 @@ static void frcti_destroy(struct frcti * frcti)
          * make sure everything is acked.
          */
 
+        rxmwheel_clear(frcti->fd);
+
         pthread_rwlock_destroy(&frcti->lock);
 
         free(frcti);
@@ -254,7 +238,7 @@ static void frct_add_crc(uint8_t * head,
 
 static struct frct_pci * frcti_alloc_head(struct shm_du_buff * sdb)
 {
-        struct frct_pci * pci = NULL;
+        struct frct_pci * pci;
 
         pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
         if (pci != NULL)
@@ -269,10 +253,14 @@ static int __frcti_snd(struct frcti *       frcti,
         struct frct_pci * pci;
         struct timespec   now;
         struct frct_cr *  snd_cr;
+        struct frct_cr *  rcv_cr;
 
         assert(frcti);
 
         snd_cr = &frcti->snd_cr;
+        rcv_cr = &frcti->rcv_cr;
+
+        rxmwheel_move();
 
         pci = frcti_alloc_head(sdb);
         if (pci == NULL)
@@ -310,7 +298,7 @@ static int __frcti_snd(struct frcti *       frcti,
         if (now.tv_sec - snd_cr->act > snd_cr->inact) {
                 /* There are no unacknowledged packets. */
                 assert(snd_cr->seqno == snd_cr->lwe);
-#ifdef OUROBOROS_CONFIG_DEBUG
+#ifdef CONFIG_OUROBOROS_DEBUG
                 frcti->snd_cr.seqno = 0;
 #else
                 random_buffer(&snd_cr->seqno, sizeof(snd_cr->seqno));
@@ -318,13 +306,16 @@ static int __frcti_snd(struct frcti *       frcti,
                 frcti->snd_cr.lwe = frcti->snd_cr.seqno;
         }
 
-        pci->seqno = hton32(snd_cr->seqno++);
-        if (!(snd_cr->cflags & FRCTFRTX))
-                snd_cr->lwe++;
-        else
-                /* TODO: update on ACK */
+        pci->seqno = hton32(snd_cr->seqno);
+        if (!(snd_cr->cflags & FRCTFRTX)) {
                 snd_cr->lwe++;
+        } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+                rxmwheel_add(frcti, snd_cr->seqno, sdb);
+                pci->flags |= FRCT_ACK;
+                pci->ackno = hton32(rcv_cr->lwe);
+        }
 
+        snd_cr->seqno++;
         snd_cr->act  = now.tv_sec;
         snd_cr->conf = false;
 
@@ -340,6 +331,7 @@ static int __frcti_rcv(struct frcti *       frcti,
         ssize_t           idx;
         struct frct_pci * pci;
         struct timespec   now;
+        struct frct_cr *  snd_cr;
         struct frct_cr *  rcv_cr;
         uint32_t          seqno;
         int               ret = 0;
@@ -347,6 +339,7 @@ static int __frcti_rcv(struct frcti *       frcti,
         assert(frcti);
 
         rcv_cr = &frcti->rcv_cr;
+        snd_cr = &frcti->snd_cr;
 
         pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
 
@@ -369,18 +362,18 @@ static int __frcti_rcv(struct frcti *       frcti,
         if (now.tv_sec - rcv_cr->act > rcv_cr->inact) {
                 /* Inactive receiver, check for DRF. */
                 if (pci->flags & FRCT_DRF) /* New run. */
-                        rcv_cr->lwe = seqno;
+                        rcv_cr->lwe = seqno - 1;
                 else
                         goto drop_packet;
         }
 
-        if (seqno == rcv_cr->lwe) {
-                ++rcv_cr->lwe;
+        if (seqno == rcv_cr->lwe + 1) {
+                rcv_cr->lwe = seqno;
                 /* Check for online reconfiguration. */
                 if (pci->flags & FRCT_CFG)
                         rcv_cr->cflags = pci->cflags;
         } else { /* Out of order. */
-                if ((int32_t)(seqno - rcv_cr->lwe) < 0) /* Duplicate. */
+                if ((int32_t)(seqno - rcv_cr->lwe) <= 0) /* Duplicate. */
                         goto drop_packet;
 
                 if (rcv_cr->cflags & FRCTFRTX) {
@@ -396,6 +389,13 @@ static int __frcti_rcv(struct frcti *       frcti,
                 }
         }
 
+        if (rcv_cr->cflags & FRCTFRTX && pci->flags & FRCT_ACK) {
+                uint32_t ackno = ntoh32(pci->ackno);
+                /* Check for duplicate (old) acks. */
+                if ((int32_t)(ackno - snd_cr->lwe) >= 0)
+                        snd_cr->lwe = ackno;
+        }
+
         rcv_cr->act = now.tv_sec;
 
         if (!(pci->flags & FRCT_DATA))
@@ -403,10 +403,13 @@ static int __frcti_rcv(struct frcti *       frcti,
 
         pthread_rwlock_unlock(&frcti->lock);
 
+        rxmwheel_move();
+
         return ret;
 
  drop_packet:
         shm_rdrbuff_remove(ai.rdrb, idx);
         pthread_rwlock_unlock(&frcti->lock);
+        rxmwheel_move();
         return -EAGAIN;
 }
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
new file mode 100644
index 0000000..9540cb0
--- /dev/null
+++ b/src/lib/rxmwheel.c
@@ -0,0 +1,249 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2018
+ *
+ * Timerwheel
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/list.h>
+
+#define RXMQ_S     12                 /* defines #slots     */
+#define RXMQ_M     15                 /* defines max delay  */
+#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution */
+#define RXMQ_SLOTS (1 << RXMQ_S)
+#define RXMQ_MAX   (1 << RXMQ_M)      /* ms                 */
+
+/* Small inacurracy to avoid slow division by MILLION. */
+#define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
+#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
+
+struct rxm {
+        struct list_head     next;
+        uint32_t             seqno;
+        struct shm_du_buff * sdb;
+        uint8_t *            head;
+        uint8_t *            tail;
+        time_t               t0;    /* Time when original was sent (s).  */
+        size_t               mul;   /* RTO multiplier.                   */
+        struct frcti *       frcti;
+};
+
+struct {
+        struct list_head wheel[RXMQ_SLOTS];
+
+        size_t           prv; /* Last processed slot. */
+        pthread_mutex_t  lock;
+} rw;
+
+static void rxmwheel_fini(void)
+{
+        size_t             i;
+        struct list_head * p;
+        struct list_head * h;
+
+        for (i = 0; i < RXMQ_SLOTS; ++i) {
+                list_for_each_safe(p, h, &rw.wheel[i]) {
+                        struct rxm * rxm = list_entry(p, struct rxm, next);
+                        list_del(&rxm->next);
+                        free(rxm);
+                }
+        }
+}
+
+static int rxmwheel_init(void)
+{
+        struct timespec now;
+        size_t          i;
+
+        if (pthread_mutex_init(&rw.lock, NULL))
+                return -1;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        /* Mark the previous timeslot as the last one processed. */
+        rw.prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
+
+        for (i = 0; i < RXMQ_SLOTS; ++i)
+                list_head_init(&rw.wheel[i]);
+
+        return 0;
+}
+
+static void rxmwheel_clear(int fd)
+{
+        size_t i;
+
+        /* FIXME: Add list element to avoid looping over full rxmwheel */
+        pthread_mutex_lock(&rw.lock);
+
+        for (i = 0; i < RXMQ_SLOTS; ++i) {
+                struct list_head * p;
+                struct list_head * h;
+
+                list_for_each_safe(p, h, &rw.wheel[i]) {
+                        struct rxm * r = list_entry(p, struct rxm, next);
+                        if (r->frcti->fd == fd) {
+                                list_del(&r->next);
+                                shm_du_buff_ack(r->sdb);
+                                ipcp_sdb_release(r->sdb);
+                                free(r);
+                        }
+                }
+        }
+
+        pthread_mutex_unlock(&rw.lock);
+}
+
+/* Return fd on r-timer expiry. */
+static int rxmwheel_move(void)
+{
+        struct timespec    now;
+        struct list_head * p;
+        struct list_head * h;
+        size_t             slot;
+        size_t             i;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        slot = ts_to_slot(now);
+
+        pthread_mutex_lock(&rw.lock);
+
+        for (i = rw.prv; (ssize_t) (i - slot) <= 0; ++i) {
+                list_for_each_safe(p, h, &rw.wheel[i]) {
+                        struct rxm *         r;
+                        struct frct_cr *     snd_cr;
+                        struct frct_cr *     rcv_cr;
+                        size_t               rslot;
+                        time_t               newtime;
+                        ssize_t              idx;
+                        struct shm_du_buff * sdb;
+                        uint8_t *            head;
+                        struct flow *        f;
+
+                        r = list_entry(p, struct rxm, next);
+                        list_del(&r->next);
+
+                        snd_cr = &r->frcti->snd_cr;
+                        rcv_cr = &r->frcti->rcv_cr;
+                        /* Has been ack'd, remove. */
+                        if ((int) (r->seqno - snd_cr->lwe) <= 0) {
+                                shm_du_buff_ack(r->sdb);
+                                ipcp_sdb_release(r->sdb);
+                                free(r);
+                                continue;
+                        }
+                        /* Check for r-timer expiry. */
+                        if (now.tv_sec - r->t0 > snd_cr->inact) {
+                                int fd = r->frcti->fd;
+                                pthread_mutex_unlock(&rw.lock);
+                                shm_du_buff_ack(r->sdb);
+                                ipcp_sdb_release(r->sdb);
+                                free(r);
+                                return fd;
+                        }
+
+                        /* Copy the payload, safe rtx in other layers. */
+                        if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
+                                /* FIXME: reschedule send? */
+                                int fd = r->frcti->fd;
+                                pthread_mutex_unlock(&rw.lock);
+                                shm_du_buff_ack(r->sdb);
+                                ipcp_sdb_release(r->sdb);
+                                free(r);
+                                return fd;
+                        }
+
+                        idx = shm_du_buff_get_idx(sdb);
+
+                        head = shm_du_buff_head(sdb);
+                        memcpy(head, r->head, r->tail - r->head);
+
+                        /* Release the old copy */
+                        shm_du_buff_ack(r->sdb);
+                        ipcp_sdb_release(r->sdb);
+
+                        /* Update ackno */
+                        ((struct frct_pci *) head)->ackno = 
ntoh32(rcv_cr->lwe);
+
+                        f = &ai.flows[r->frcti->fd];
+
+                        /* Retransmit the copy. */
+                        if (shm_rbuff_write(f->tx_rb, idx)) {
+                                ipcp_sdb_release(sdb);
+                                continue;
+                        }
+
+                        shm_flow_set_notify(f->set, f->port_id, FLOW_PKT);
+
+                        /* Reschedule. */
+                        shm_du_buff_wait_ack(sdb);
+
+                        r->head = head;
+                        r->tail = shm_du_buff_tail(sdb);
+                        r->sdb  = sdb;
+
+                        newtime = ts_to_ms(now) + (snd_cr->rto << ++r->mul);
+                        rslot   = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);
+
+                        list_add_tail(&r->next, &rw.wheel[rslot]);
+                }
+        }
+
+        rw.prv = slot;
+
+        pthread_mutex_unlock(&rw.lock);
+
+        return 0;
+}
+
+static int rxmwheel_add(struct frcti *       frcti,
+                        uint32_t             seqno,
+                        struct shm_du_buff * sdb)
+{
+        struct timespec now;
+        struct rxm *    r;
+        size_t          slot;
+
+        r = malloc(sizeof(*r));
+        if (r == NULL)
+                return -ENOMEM;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        pthread_mutex_lock(&rw.lock);
+
+        r->t0    = now.tv_sec;
+        r->mul   = 0;
+        r->seqno = seqno;
+        r->sdb   = sdb;
+        r->head  = shm_du_buff_head(sdb);
+        r->tail  = shm_du_buff_tail(sdb);
+        r->frcti = frcti;
+
+        slot = ((ts_to_ms(now) + frcti->snd_cr.rto) >> RXMQ_R)
+                & (RXMQ_SLOTS - 1);
+
+        list_add_tail(&r->next, &rw.wheel[slot]);
+
+        pthread_mutex_unlock(&rw.lock);
+
+        shm_du_buff_wait_ack(sdb);
+
+        return 0;
+}
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index 5ae2085..182ad08 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -65,11 +65,6 @@
 #define shm_rdrb_empty(rdrb)                                                   
\
         (*rdrb->tail == *rdrb->head)
 
-enum shm_du_buff_flags {
-        SDB_VALID = 0,
-        SDB_NULL
-};
-
 struct shm_du_buff {
         size_t size;
 #ifdef SHM_RDRB_MULTI_BLOCK
@@ -77,7 +72,7 @@ struct shm_du_buff {
 #endif
         size_t du_head;
         size_t du_tail;
-        size_t flags;
+        size_t refs;
         size_t idx;
 };
 
@@ -96,11 +91,11 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
 #ifdef SHM_RDRB_MULTI_BLOCK
         struct shm_du_buff * sdb;
         while (!shm_rdrb_empty(rdrb) &&
-               (sdb = get_tail_ptr(rdrb))->flags == SDB_NULL)
+               (sdb = get_tail_ptr(rdrb))->refs == 0)
                 *rdrb->tail = (*rdrb->tail + sdb->blocks)
                         & ((SHM_BUFFER_SIZE) - 1);
 #else
-        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->flags == SDB_NULL)
+        while (!shm_rdrb_empty(rdrb) && get_tail_ptr(rdrb)->refs == 0)
                 *rdrb->tail = (*rdrb->tail + 1) & ((SHM_BUFFER_SIZE) - 1);
 #endif
         pthread_cond_broadcast(rdrb->healthy);
@@ -108,7 +103,7 @@ static void garbage_collect(struct shm_rdrbuff * rdrb)
 
 static void sanitize(struct shm_rdrbuff * rdrb)
 {
-        get_head_ptr(rdrb)->flags = SDB_NULL;
+        --get_head_ptr(rdrb)->refs;
         garbage_collect(rdrb);
         pthread_mutex_consistent(rdrb->lock);
 }
@@ -338,7 +333,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
                 sdb = get_head_ptr(rdrb);
                 sdb->size    = 0;
                 sdb->blocks  = padblocks;
-                sdb->flags   = SDB_NULL;
+                sdb->refs    = 0;
                 sdb->du_head = 0;
                 sdb->du_tail = 0;
                 sdb->idx     = *rdrb->head;
@@ -347,7 +342,7 @@ ssize_t shm_rdrbuff_write(struct shm_rdrbuff * rdrb,
         }
 #endif
         sdb        = get_head_ptr(rdrb);
-        sdb->flags = SDB_VALID;
+        sdb->refs  = 1;
         sdb->idx   = *rdrb->head;
 #ifdef SHM_RDRB_MULTI_BLOCK
         sdb->blocks  = blocks;
@@ -434,7 +429,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff *    rdrb,
                         sdb = get_head_ptr(rdrb);
                         sdb->size    = 0;
                         sdb->blocks  = padblocks;
-                        sdb->flags   = SDB_NULL;
+                        sdb->refs    = 0;
                         sdb->du_head = 0;
                         sdb->du_tail = 0;
                         sdb->idx     = *rdrb->head;
@@ -443,7 +438,7 @@ ssize_t shm_rdrbuff_write_b(struct shm_rdrbuff *    rdrb,
                 }
 #endif
                 sdb        = get_head_ptr(rdrb);
-                sdb->flags = SDB_VALID;
+                sdb->refs  = 1;
                 sdb->idx   = *rdrb->head;
 #ifdef SHM_RDRB_MULTI_BLOCK
                 sdb->blocks  = blocks;
@@ -497,6 +492,8 @@ struct shm_du_buff * shm_rdrbuff_get(struct shm_rdrbuff * 
rdrb,
 int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
                        size_t               idx)
 {
+        struct shm_du_buff * sdb;
+
         assert(rdrb);
         assert(idx < (SHM_BUFFER_SIZE));
 
@@ -508,10 +505,13 @@ int shm_rdrbuff_remove(struct shm_rdrbuff * rdrb,
 #endif
         assert(!shm_rdrb_empty(rdrb));
 
-        idx_to_du_buff_ptr(rdrb, idx)->flags = SDB_NULL;
+        sdb = idx_to_du_buff_ptr(rdrb, idx);
 
-        if (idx == *rdrb->tail)
-                garbage_collect(rdrb);
+        if (sdb->refs == 1) { /* only stack needs it, can be removed */
+                sdb->refs = 0;
+                if (idx == *rdrb->tail)
+                        garbage_collect(rdrb);
+        }
 
         pthread_mutex_unlock(rdrb->lock);
 
@@ -603,3 +603,16 @@ void shm_du_buff_truncate(struct shm_du_buff * sdb,
 
         sdb->du_tail = sdb->du_head + len;
 }
+
+int shm_du_buff_wait_ack(struct shm_du_buff * sdb)
+{
+        __sync_add_and_fetch(&sdb->refs, 1);
+
+        return 0;
+}
+
+int shm_du_buff_ack(struct shm_du_buff * sdb)
+{
+        __sync_sub_and_fetch(&sdb->refs, 1);
+        return 0;
+}
diff --git a/src/lib/tests/CMakeLists.txt b/src/lib/tests/CMakeLists.txt
index 7d5b365..cc51c20 100644
--- a/src/lib/tests/CMakeLists.txt
+++ b/src/lib/tests/CMakeLists.txt
@@ -1,12 +1,6 @@
 get_filename_component(PARENT_PATH ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
 get_filename_component(PARENT_DIR ${PARENT_PATH} NAME)
 
-if (NOT (APPLE OR GNU))
-  set(TIMERWHEEL_TEST "timerwheel_test.c")
-else ()
-  set(TIMERWHEEL_TEST "")
-endif ()
-
 create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
   # Add new tests here
   bitmap_test.c
@@ -16,7 +10,6 @@ create_test_sourcelist(${PARENT_DIR}_tests test_suite.c
   md5_test.c
   sha3_test.c
   time_utils_test.c
-  ${TIMERWHEEL_TEST}
   )
 
 add_executable(${PARENT_DIR}_test EXCLUDE_FROM_ALL ${${PARENT_DIR}_tests})
diff --git a/src/lib/tests/timerwheel_test.c b/src/lib/tests/timerwheel_test.c
deleted file mode 100644
index 0ec9831..0000000
--- a/src/lib/tests/timerwheel_test.c
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Test of the timer wheel
- *
- *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
- *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include "timerwheel.c"
-
-#include <pthread.h>
-#include <time.h>
-#include <stdlib.h>
-#include <stdio.h>
-
-#define MAX_ELEMENTS   100
-#define MAX_RESOLUTION 10  /* ms */
-#define MAX_ADDITIONS  1000
-
-int total;
-
-int add(void * o)
-{
-        total += *((int *) o);
-        return 0;
-}
-
-int timerwheel_test(int argc, char ** argv)
-{
-        struct timerwheel * tw;
-        long resolution;
-        long elements;
-        struct timespec wait;
-
-        int additions;
-
-        int check_total = 0;
-
-        int i;
-        int var = 5;
-
-        struct tw_f * f;
-
-        (void) argc;
-        (void) argv;
-
-        total = 0;
-
-        srand(time(NULL));
-
-        resolution = rand() % (MAX_RESOLUTION - 1) + 1;
-        elements = rand() % (MAX_ELEMENTS - 10) + 10;
-
-        tw = timerwheel_create(resolution, resolution * elements);
-        if (tw == NULL) {
-                printf("Failed to create timerwheel.\n");
-                return -1;
-        }
-
-        wait.tv_sec = (resolution * elements) / 1000;
-        wait.tv_nsec = ((resolution * elements) % 1000) * MILLION;
-
-        additions = rand() % MAX_ADDITIONS + 1000;
-
-        for (i = 0; i < additions; ++i) {
-                int delay = rand() % (resolution * elements);
-                check_total += var;
-                f = timerwheel_start(tw,
-                                     (void (*)(void *)) add,
-                                     (void *) &var,
-                                     delay);
-                if (f == NULL) {
-                        printf("Failed to add function.");
-                        return -1;
-                }
-        }
-
-        nanosleep(&wait, NULL);
-
-        timerwheel_move(tw);
-
-        timerwheel_destroy(tw);
-
-        if (total != check_total) {
-                printf("Totals do not match: %d and %d.\n", total, 
check_total);
-                return -1;
-        }
-
-        return 0;
-}
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
deleted file mode 100644
index ef8489b..0000000
--- a/src/lib/timerwheel.c
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Timerwheel
- *
- *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
- *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#define _POSIX_C_SOURCE 200112L
-
-#include "config.h"
-
-#include <ouroboros/time_utils.h>
-#include <ouroboros/errno.h>
-#include <ouroboros/list.h>
-
-#include <pthread.h>
-#include <stdlib.h>
-#include <assert.h>
-#include <string.h>
-
-#define FRAC 10 /* accuracy of the timer */
-
-#define tw_used(tw) ((tw->head + tw->elements - tw->tail) & (tw->elements - 
1));
-#define tw_free(tw) (tw_used(tw) + 1 < tw->elements)
-#define tw_empty(tw) (tw->head == tw->tail)
-
-struct tw_f {
-        struct list_head next;
-        void (* func)(void *);
-        void * arg;
-};
-
-struct tw_el {
-        struct list_head funcs;
-        struct timespec  expiry;
-};
-
-struct timerwheel {
-        struct tw_el *   wheel;
-
-        struct timespec  intv;
-
-        size_t           pos;
-
-        pthread_mutex_t  lock;
-
-        time_t           resolution;
-        size_t           elements;
-};
-
-static void tw_el_fini(struct tw_el * e)
-{
-        struct list_head * p;
-        struct list_head * h;
-
-        list_for_each_safe(p, h, &e->funcs) {
-                struct tw_f * f = list_entry(p, struct tw_f, next);
-                list_del(&f->next);
-        }
-}
-
-void timerwheel_move(struct timerwheel * tw)
-{
-        struct timespec now = {0, 0};
-        long ms = tw->resolution * tw->elements;
-        struct timespec total = {ms / 1000,
-                                 (ms % 1000) * MILLION};
-        struct list_head * p;
-        struct list_head * h;
-
-        clock_gettime(CLOCK_MONOTONIC, &now);
-
-        pthread_mutex_lock(&tw->lock);
-
-        while (ts_diff_us(&tw->wheel[tw->pos].expiry, &now) > 0) {
-                list_for_each_safe(p, h, &tw->wheel[tw->pos].funcs) {
-                        struct tw_f * f = list_entry(p, struct tw_f, next);
-                        list_del(&f->next);
-                        f->func(f->arg);
-                        free(f);
-                }
-
-                ts_add(&tw->wheel[tw->pos].expiry,
-                       &total,
-                       &tw->wheel[tw->pos].expiry);
-
-                tw->pos = (tw->pos + 1) & (tw->elements - 1);
-        }
-
-        pthread_mutex_unlock(&tw->lock);
-}
-
-struct timerwheel * timerwheel_create(time_t resolution,
-                                      time_t max_delay)
-{
-        struct timespec now = {0, 0};
-        struct timespec res_ts = {resolution / 1000,
-                                  (resolution % 1000) * MILLION};
-        size_t i;
-
-        struct timerwheel * tw;
-
-        assert(resolution != 0);
-
-        tw = malloc(sizeof(*tw));
-        if (tw == NULL)
-                return NULL;
-
-        if (pthread_mutex_init(&tw->lock, NULL))
-                return NULL;
-
-        tw->elements = 1;
-
-        while (tw->elements < (size_t) max_delay / resolution)
-                tw->elements <<= 1;
-
-        tw->wheel = malloc(sizeof(*tw->wheel) * tw->elements);
-        if (tw->wheel == NULL)
-                goto fail_wheel_malloc;
-
-        tw->resolution = resolution;
-
-        tw->intv.tv_sec = (tw->resolution / FRAC) / 1000;
-        tw->intv.tv_nsec = ((tw->resolution / FRAC) % 1000) * MILLION;
-
-        if (pthread_mutex_init(&tw->lock, NULL))
-                goto fail_lock_init;
-
-        tw->pos = 0;
-
-        clock_gettime(CLOCK_MONOTONIC, &now);
-        now.tv_nsec -= (now.tv_nsec % MILLION);
-
-        for (i = 0; i < tw->elements; ++i) {
-                list_head_init(&tw->wheel[i].funcs);
-                tw->wheel[i].expiry = now;
-                ts_add(&now, &res_ts, &now);
-        }
-
-        return tw;
-
- fail_lock_init:
-         free(tw->wheel);
- fail_wheel_malloc:
-         free(tw);
-         return NULL;
-}
-
-void timerwheel_destroy(struct timerwheel * tw)
-{
-        unsigned long i;
-
-        for (i = 0; i < tw->elements; ++i)
-                tw_el_fini(&tw->wheel[i]);
-
-        pthread_mutex_destroy(&tw->lock);
-        free(tw->wheel);
-        free(tw);
-}
-
-struct tw_f * timerwheel_start(struct timerwheel * tw,
-                               void (* func)(void *),
-                               void *              arg,
-                               time_t              delay)
-{
-        int pos;
-        struct tw_f * f = malloc(sizeof(*f));
-        if (f == NULL)
-                return NULL;
-
-        f->func = func;
-        f->arg = arg;
-
-        assert(delay < (time_t) tw->elements * tw->resolution);
-
-        pthread_mutex_lock(&tw->lock);
-
-        pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
-        list_add(&f->next, &tw->wheel[pos].funcs);
-
-        pthread_mutex_unlock(&tw->lock);
-
-        return f;
-}
-
-int timerwheel_restart(struct timerwheel * tw,
-                       struct tw_f *       f,
-                       time_t              delay)
-{
-        int pos;
-
-        assert(tw);
-        assert(delay < (time_t) tw->elements * tw->resolution);
-
-        pthread_mutex_lock(&tw->lock);
-
-        list_del(&f->next);
-        pos = (tw->pos + delay / tw->resolution) & (tw->elements - 1);
-        list_add(&f->next, &tw->wheel[pos].funcs);
-
-        pthread_mutex_unlock(&tw->lock);
-
-        return 0;
-}
-
-void timerwheel_stop(struct timerwheel * tw,
-                     struct tw_f *       f)
-{
-        assert(tw);
-
-        pthread_mutex_lock(&tw->lock);
-
-        list_del(&f->next);
-        free(f);
-
-        pthread_mutex_unlock(&tw->lock);
-}
-- 
2.18.0


Other related posts:

  • » [PATCH] lib: Support for rudimentary retransmission - Dimitri Staessens