[PATCH v3] lib: Complete retransmission logic

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sat, 19 Sep 2020 16:49:04 +0200

This completes the retransmission (automated repeat-request,
ARQ)logic, sending (delayed) ACK messages when needed.

On deallocation, flows will ACK try to retransmit any remaining
unacknowledged messages (unless the FRCTFLINGER flag is turned off;
this is on by default). Applications can safely shut down as soon as
everything is ACK'd (i.e. the current Delta-t run is done). The
activity timeout is now passed to the IPCP for it to sleep before
completing deallocation (and releasing the flow_id). That should be
moved to the IRMd in due time.

The timerwheel is revised to be multi-level to reduce memory
consumption. The resolution bumps by a factor of 1 << RXMQ_BUMP (16)
and each level has RXMQ_SLOTS (1 << 8) slots.  The lowest level has a
resolution of (1 << RXMQ_RES) (20) ns, which is roughly a
millisecond. Currently, 3 levels are defined, so the largest delay we
can schedule at each level is:

Level 0: 256ms
Level 1: 4s
Level 2: about a minute.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 include/ouroboros/fccntl.h   |   4 +-
 src/irmd/ipcp.c              |  13 +-
 src/irmd/ipcp.h              |   5 +-
 src/irmd/main.c              |  20 +-
 src/lib/dev.c                | 131 +++++++++--
 src/lib/frct.c               | 240 ++++++++++++--------
 src/lib/ipcpd_messages.proto |   3 +-
 src/lib/rxmwheel.c           | 261 ----------------------
 src/lib/timerwheel.c         | 409 +++++++++++++++++++++++++++++++++++
 9 files changed, 689 insertions(+), 397 deletions(-)
 delete mode 100644 src/lib/rxmwheel.c
 create mode 100644 src/lib/timerwheel.c

diff --git a/include/ouroboros/fccntl.h b/include/ouroboros/fccntl.h
index 965e281..ccd74b6 100644
--- a/include/ouroboros/fccntl.h
+++ b/include/ouroboros/fccntl.h
@@ -48,6 +48,7 @@
 /* FRCT flags */
 #define FRCTFRESCNTRL 00000001 /* Feedback from receiver */
 #define FRCTFRTX      00000002 /* Reliable flow          */
+#define FRCTFLINGER   00000004 /* Sent unsent data       */
 
 /* Flow operations */
 #define FLOWSRCVTIMEO 00000001 /* Set read timeout       */
@@ -61,7 +62,8 @@
 #define FLOWGTXQLEN   00000011 /* Get queue length on tx */
 
 /* FRCT operations */
-#define FRCTGFLAGS    00001000 /* Get flags for FRCT     */
+#define FRCTSFLAGS    00001000 /* Set flags for FRCT     */
+#define FRCTGFLAGS    00002000 /* Get flags for FRCT     */
 
 __BEGIN_DECLS
 
diff --git a/src/irmd/ipcp.c b/src/irmd/ipcp.c
index 7840818..cbd9ee1 100644
--- a/src/irmd/ipcp.c
+++ b/src/irmd/ipcp.c
@@ -543,16 +543,19 @@ int ipcp_flow_alloc_resp(pid_t        pid,
         return ret;
 }
 
-int ipcp_flow_dealloc(pid_t pid,
-                      int   flow_id)
+int ipcp_flow_dealloc(pid_t  pid,
+                      int    flow_id,
+                      time_t timeo)
 {
         ipcp_msg_t   msg      = IPCP_MSG__INIT;
         ipcp_msg_t * recv_msg = NULL;
         int          ret      = -1;
 
-        msg.code        = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
-        msg.has_flow_id = true;
-        msg.flow_id     = flow_id;
+        msg.code          = IPCP_MSG_CODE__IPCP_FLOW_DEALLOC;
+        msg.has_flow_id   = true;
+        msg.flow_id       = flow_id;
+        msg.has_timeo_sec = true;
+        msg.timeo_sec     = timeo;
 
         recv_msg = send_recv_ipcp_msg(pid, &msg);
         if (recv_msg == NULL)
diff --git a/src/irmd/ipcp.h b/src/irmd/ipcp.h
index ae00792..652316b 100644
--- a/src/irmd/ipcp.h
+++ b/src/irmd/ipcp.h
@@ -85,7 +85,8 @@ int   ipcp_flow_alloc_resp(pid_t        pid,
                            const void * data,
                            size_t       len);
 
-int   ipcp_flow_dealloc(pid_t pid,
-                        int   flow_id);
+int   ipcp_flow_dealloc(pid_t  pid,
+                        int    flow_id,
+                        time_t timeo);
 
 #endif /* OUROBOROS_IRMD_IPCP_H */
diff --git a/src/irmd/main.c b/src/irmd/main.c
index 3709a3e..3a0ad54 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -68,10 +68,11 @@
 #endif
 
 #define IRMD_CLEANUP_TIMER ((IRMD_FLOW_TIMEOUT / 20) * MILLION) /* ns */
-#define SHM_SAN_HOLDOFF 1000 /* ms */
-#define IPCP_HASH_LEN(e) hash_len(e->dir_hash_algo)
-#define IB_LEN SOCK_BUF_SIZE
-#define BIND_TIMEOUT  10 /* ms */
+#define SHM_SAN_HOLDOFF    1000 /* ms */
+#define IPCP_HASH_LEN(e)   hash_len(e->dir_hash_algo)
+#define IB_LEN             SOCK_BUF_SIZE
+#define BIND_TIMEOUT       10   /* ms */
+#define DEALLOC_TIME       300  /*  s */
 
 enum init_state {
         IPCP_NULL = 0,
@@ -1475,7 +1476,8 @@ static int flow_alloc(pid_t              pid,
 }
 
 static int flow_dealloc(pid_t pid,
-                        int   flow_id)
+                        int   flow_id,
+                        time_t timeo)
 {
         pid_t n_1_pid = -1;
         int   ret = 0;
@@ -1521,7 +1523,7 @@ static int flow_dealloc(pid_t pid,
         pthread_rwlock_unlock(&irmd.flows_lock);
 
         if (n_1_pid != -1)
-                ret = ipcp_flow_dealloc(n_1_pid, flow_id);
+                ret = ipcp_flow_dealloc(n_1_pid, flow_id, timeo);
 
         return ret;
 }
@@ -1927,7 +1929,7 @@ void * irm_sanitize(void * o)
                                 ipcpi   = f->n_1_pid;
                                 flow_id = f->flow_id;
                                 pthread_rwlock_unlock(&irmd.flows_lock);
-                                ipcp_flow_dealloc(ipcpi, flow_id);
+                                ipcp_flow_dealloc(ipcpi, flow_id, 
DEALLOC_TIME);
                                 pthread_rwlock_wrlock(&irmd.flows_lock);
                                 continue;
                         }
@@ -2190,7 +2192,9 @@ static void * mainloop(void * o)
                         }
                         break;
                 case IRM_MSG_CODE__IRM_FLOW_DEALLOC:
-                        result = flow_dealloc(msg->pid, msg->flow_id);
+                        result = flow_dealloc(msg->pid,
+                                              msg->flow_id,
+                                              msg->timeo_sec);
                         break;
                 case IRM_MSG_CODE__IPCP_FLOW_REQ_ARR:
                         assert(msg->pk.len > 0 ? msg->pk.data != NULL
diff --git a/src/lib/dev.c b/src/lib/dev.c
index df616ea..083368e 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -63,6 +63,7 @@
 #define SECMEMSZ  16384
 #define SYMMKEYSZ 32
 #define MSGBUFSZ  2048
+#define TICTIME   1000000  /* ns */
 
 struct flow_set {
         size_t idx;
@@ -255,6 +256,9 @@ static void flow_fini(int fd)
                 bmp_release(ai.fds, fd);
         }
 
+        if (ai.flows[fd].frcti != NULL)
+                frcti_destroy(ai.flows[fd].frcti);
+
         if (ai.flows[fd].rx_rb != NULL) {
                 shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
                 shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -272,9 +276,6 @@ static void flow_fini(int fd)
                 shm_flow_set_close(ai.flows[fd].set);
         }
 
-        if (ai.flows[fd].frcti != NULL)
-                frcti_destroy(ai.flows[fd].frcti);
-
         if (ai.flows[fd].ctx != NULL)
                 crypt_fini(ai.flows[fd].ctx);
 
@@ -433,8 +434,13 @@ static void init(int     argc,
         if (ai.fqset == NULL)
                 goto fail_fqset;
 
+        if (timerwheel_init() < 0)
+                goto fail_timerwheel;
+
         return;
 
+ fail_timerwheel:
+        shm_flow_set_close(ai.fqset);
  fail_fqset:
         pthread_rwlock_destroy(&ai.lock);
  fail_lock:
@@ -491,6 +497,8 @@ static void fini(void)
                 pthread_cond_destroy(&ai.ports[i].state_cond);
         }
 
+        timerwheel_fini();
+
         shm_rdrbuff_close(ai.rdrb);
 
         free(ai.flows);
@@ -747,25 +755,57 @@ int flow_join(const char *            dst,
 
 int flow_dealloc(int fd)
 {
-        irm_msg_t   msg = IRM_MSG__INIT;
-        irm_msg_t * recv_msg;
+        irm_msg_t     msg = IRM_MSG__INIT;
+        irm_msg_t *   recv_msg;
+        struct flow * f;
+        time_t        timeo;
 
         if (fd < 0 || fd >= SYS_MAX_FLOWS )
                 return -EINVAL;
 
-        msg.code         = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
-        msg.has_flow_id  = true;
-        msg.has_pid      = true;
-        msg.pid          = ai.pid;
+        msg.code           = IRM_MSG_CODE__IRM_FLOW_DEALLOC;
+        msg.has_flow_id    = true;
+        msg.has_pid        = true;
+        msg.pid            = ai.pid;
+        msg.has_timeo_sec  = true;
+        msg.has_timeo_nsec = true;
+        msg.timeo_nsec     = 0;
+
+        f = &ai.flows[fd];
 
         pthread_rwlock_rdlock(&ai.lock);
 
-        if (ai.flows[fd].flow_id < 0) {
+        if (f->flow_id < 0) {
                 pthread_rwlock_unlock(&ai.lock);
                 return -ENOTALLOC;
         }
 
-        msg.flow_id = ai.flows[fd].flow_id;
+        msg.flow_id = f->flow_id;
+
+        timeo = frcti_dealloc(f->frcti);
+        while (timeo < 0) { /* keep the flow active for rtx */
+                ssize_t ret;
+                uint8_t buf[128];
+
+                f->oflags = FLOWFDEFAULT | FLOWFRNOPART;
+
+                f->rcv_timesout = true;
+                f->rcv_timeo.tv_sec = -timeo;
+                f->rcv_timeo.tv_nsec = 0;
+
+                pthread_rwlock_unlock(&ai.lock);
+
+                ret = flow_read(fd, buf, 128);
+
+                timeo = frcti_dealloc(f->frcti);
+
+                if (ret == -ETIMEDOUT && timeo < 0)
+                        timeo = -timeo;
+        }
+
+        msg.timeo_sec = timeo;
+
+        shm_rbuff_fini(ai.flows[fd].tx_rb);
 
         pthread_rwlock_unlock(&ai.lock);
 
@@ -904,13 +944,21 @@ int fccntl(int fd,
                         goto einval;
                 *fflags = flow->oflags;
                 break;
+        case FRCTSFLAGS:
+                cflags = va_arg(l, uint16_t *);
+                if (cflags == NULL)
+                        goto einval;
+                if (flow->frcti == NULL)
+                        goto eperm;
+                frcti_setflags(flow->frcti, *cflags);
+                break;
         case FRCTGFLAGS:
                 cflags = (uint16_t *) va_arg(l, int *);
                 if (cflags == NULL)
                         goto einval;
                 if (flow->frcti == NULL)
                         goto eperm;
-                *cflags = frcti_getconf(flow->frcti);
+                *cflags = frcti_getflags(flow->frcti);
                 break;
         default:
                 pthread_rwlock_unlock(&ai.lock);
@@ -1067,6 +1115,8 @@ ssize_t flow_read(int    fd,
         struct shm_rbuff *   rb;
         struct shm_du_buff * sdb;
         struct timespec      abs;
+        struct timespec      tic = {0, TICTIME};
+        struct timespec      tictime;
         struct timespec *    abstime = NULL;
         struct flow *        flow;
         bool                 noblock;
@@ -1096,6 +1146,8 @@ ssize_t flow_read(int    fd,
         noblock = flow->oflags & FLOWFRNOBLOCK;
         partrd = !(flow->oflags & FLOWFRNOPART);
 
+        ts_add(&tic, &abs, &tictime);
+
         if (ai.flows[fd].rcv_timesout) {
                 ts_add(&abs, &flow->rcv_timeo, &abs);
                 abstime = &abs;
@@ -1108,9 +1160,21 @@ ssize_t flow_read(int    fd,
                         pthread_rwlock_unlock(&ai.lock);
 
                         idx = noblock ? shm_rbuff_read(rb) :
-                                shm_rbuff_read_b(rb, abstime);
-                        if (idx < 0)
-                                return idx;
+                                shm_rbuff_read_b(rb, &tictime);
+                        if (idx < 0) {
+                                frcti_tick(flow->frcti);
+
+                                if (idx != -ETIMEDOUT)
+                                        return idx;
+
+                                if (abstime != NULL
+                                    && ts_diff_ns(&tictime, &abs) < 0)
+                                        return -ETIMEDOUT;
+
+                                ts_add(&tictime, &tic, &tictime);
+                                pthread_rwlock_rdlock(&ai.lock);
+                                continue;
+                        }
 
                         sdb = shm_rdrbuff_get(ai.rdrb, idx);
                         if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
@@ -1339,7 +1403,9 @@ ssize_t fevent(struct flow_set *       set,
                const struct timespec * timeo)
 {
         ssize_t           ret = 0;
-        struct timespec   abstime;
+        struct timespec   tic = {0, TICTIME};
+        struct timespec   tictime;
+        struct timespec   abs;
         struct timespec * t = NULL;
 
         if (set == NULL || fq == NULL)
@@ -1348,17 +1414,25 @@ ssize_t fevent(struct flow_set *       set,
         if (fq->fqsize > 0 && fq->next != fq->fqsize)
                 return fq->fqsize;
 
-        if (timeo != NULL) {
-                clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-                ts_add(&abstime, timeo, &abstime);
-                t = &abstime;
-        }
+        clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+        ts_add(&tic, &abs, &tictime);
+        t = &tictime;
+
+        if (timeo != NULL)
+                ts_add(&abs, timeo, &abs);
 
         while (ret == 0) {
                 ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
                 if (ret == -ETIMEDOUT) {
-                        fq->fqsize = 0;
-                        return -ETIMEDOUT;
+                        if (timeo != NULL && ts_diff_ns(t, &abs) < 0) {
+                                fq->fqsize = 0;
+                                return -ETIMEDOUT;
+                        }
+                        ret = 0;
+                        ts_add(t, &tic, t);
+                        timerwheel_move();
+                        continue;
                 }
 
                 fq->fqsize = ret << 1;
@@ -1382,10 +1456,19 @@ int np1_flow_alloc(pid_t     n_pid,
         return flow_init(flow_id, n_pid, qs, NULL);
 }
 
-int np1_flow_dealloc(int flow_id)
+int np1_flow_dealloc(int    flow_id,
+                     time_t timeo)
 {
         int fd;
 
+        /*
+         * TODO: Don't pass timeo to the IPCP but wait in IRMd.
+         * This will need async ops, waiting until we bootstrap
+         * the IRMd over ouroboros.
+         */
+
+        sleep(timeo);
+
         pthread_rwlock_rdlock(&ai.lock);
 
         fd = ai.ports[flow_id].fd;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 2bd126f..0e6f019 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -21,7 +21,7 @@
  */
 
 /* Default Delta-t parameters */
-#define DELT_MPL       (60 * BILLION) /* ns */
+#define DELT_MPL        (5 * BILLION) /* ns */
 #define DELT_A          (1 * BILLION) /* ns */
 #define DELT_R         (20 * BILLION) /* ns */
 
@@ -59,8 +59,6 @@ struct frcti {
         struct frct_cr    snd_cr;
         struct frct_cr    rcv_cr;
 
-        struct rxmwheel * rw;
-
         ssize_t           rq[RQ_SIZE];
         pthread_rwlock_t  lock;
 };
@@ -86,7 +84,84 @@ struct frct_pci {
         uint32_t ackno;
 } __attribute__((packed));
 
-#include <rxmwheel.c>
+static bool before(uint32_t seq1,
+                   uint32_t seq2)
+{
+        return (int32_t)(seq1 - seq2) < 0;
+}
+
+static bool after(uint32_t seq1,
+                  uint32_t seq2)
+{
+        return (int32_t)(seq2 - seq1) < 0;
+}
+
+static void __send_ack(int fd,
+                       int ackno)
+{
+        struct shm_du_buff * sdb;
+        struct frct_pci *    pci;
+        ssize_t              idx;
+        struct flow *        f;
+
+        /* Raw calls needed to bypass frcti. */
+        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
+        if (idx < 0)
+                return;
+
+        pci = (struct frct_pci *) shm_du_buff_head(sdb);
+        memset(pci, 0, sizeof(*pci));
+
+        pci->flags = FRCT_ACK;
+        pci->ackno = hton32(ackno);
+
+        f = &ai.flows[fd];
+
+        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+                ipcp_sdb_release(sdb);
+                return;
+        }
+
+        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
+}
+
+static void frct_send_ack(struct frcti * frcti)
+{
+        struct timespec      now;
+        time_t               diff;
+        uint32_t             ackno;
+        int                  fd;
+
+        assert(frcti);
+
+        pthread_rwlock_rdlock(&frcti->lock);
+
+        if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
+                pthread_rwlock_unlock(&frcti->lock);
+                return;
+        }
+
+        ackno = frcti->rcv_cr.lwe;
+        fd    = frcti->fd;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
+
+        pthread_rwlock_unlock(&frcti->lock);
+
+        if (diff > frcti->a || diff < DELT_ACK)
+                return;
+
+        __send_ack(fd, ackno);
+
+        pthread_rwlock_wrlock(&frcti->lock);
+
+        if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
+                frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+
+        pthread_rwlock_unlock(&frcti->lock);
+}
 
 static struct frcti * frcti_create(int fd)
 {
@@ -123,14 +198,10 @@ static struct frcti * frcti_create(int fd)
         frcti->srtt = 0;            /* Updated on first ACK */
         frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
         frcti->rto  = 20 * MILLION; /* Initial rxm will be after 20 ms */
-        frcti->rw   = NULL;
 
         if (ai.flows[fd].qs.loss == 0) {
-                frcti->snd_cr.cflags |= FRCTFRTX;
+                frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
                 frcti->rcv_cr.cflags |= FRCTFRTX;
-                frcti->rw = rxmwheel_create();
-                if (frcti->rw == NULL)
-                        goto fail_rw;
         }
 
         frcti->snd_cr.inact  = (3 * mpl + a + r) / BILLION + 1; /* s */
@@ -141,8 +212,6 @@ static struct frcti * frcti_create(int fd)
 
         return frcti;
 
- fail_rw:
-        pthread_rwlock_destroy(&frcti->lock);
  fail_lock:
         free(frcti);
  fail_malloc:
@@ -151,20 +220,12 @@ static struct frcti * frcti_create(int fd)
 
 static void frcti_destroy(struct frcti * frcti)
 {
-        /*
-         * FIXME: In case of reliable transmission we should
-         * make sure everything we sent is acked.
-         */
-
-        if (frcti->rw != NULL)
-                rxmwheel_destroy(frcti->rw);
-
         pthread_rwlock_destroy(&frcti->lock);
 
         free(frcti);
 }
 
-static uint16_t frcti_getconf(struct frcti * frcti)
+static uint16_t frcti_getflags(struct frcti * frcti)
 {
         uint16_t ret;
 
@@ -179,6 +240,22 @@ static uint16_t frcti_getconf(struct frcti * frcti)
         return ret;
 }
 
+static void frcti_setflags(struct frcti * frcti,
+                           uint16_t       flags)
+{
+        flags |= FRCTFRESCNTRL | FRCTFRTX; /* Should not be set by command */
+
+        assert (frcti);
+
+        pthread_rwlock_rdlock(&frcti->lock);
+
+        frcti->snd_cr.cflags &= FRCTFRESCNTRL | FRCTFRTX; /* Zero other flags 
*/
+
+        frcti->snd_cr.cflags &= flags;
+
+        pthread_rwlock_unlock(&frcti->lock);
+}
+
 #define frcti_queued_pdu(frcti)                         \
         (frcti == NULL ? idx : __frcti_queued_pdu(frcti))
 
@@ -189,8 +266,10 @@ static uint16_t frcti_getconf(struct frcti * frcti)
         (frcti == NULL ? 0 : __frcti_rcv(frcti, sdb))
 
 #define frcti_tick(frcti)                               \
-        (frcti == NULL ? 0 : __frcti_tick(frcti))
+        (frcti == NULL ? 0 : __frcti_tick())
 
+#define frcti_dealloc(frcti)                            \
+        (frcti == NULL ? 0 : __frcti_dealloc(frcti))
 
 static ssize_t __frcti_queued_pdu(struct frcti * frcti)
 {
@@ -233,78 +312,41 @@ static ssize_t __frcti_pdu_ready(struct frcti * frcti)
         return idx;
 }
 
-static bool before(uint32_t seq1,
-                   uint32_t seq2)
-{
-        return (int32_t)(seq1 - seq2) < 0;
-}
-
-static bool after(uint32_t seq1,
-                  uint32_t seq2)
-{
-        return (int32_t)(seq2 - seq1) < 0;
-}
+#include <timerwheel.c>
 
-static void frct_send_ack(struct frcti * frcti)
+/*
+ * Send a final ACK for everything that has not been ACK'd.
+ * If the flow should be kept active for retransmission,
+ * the returned time will be negative.
+ */
+static time_t __frcti_dealloc(struct frcti * frcti)
 {
-        struct shm_du_buff * sdb;
-        struct frct_pci *    pci;
-        ssize_t              idx;
-        struct timespec      now;
-        time_t               diff;
-        uint32_t             ackno;
-        struct flow *        f;
+        struct timespec now;
+        time_t          wait;
+        int             ackno;
+        int             fd = -1;
 
-        assert(frcti);
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
 
         pthread_rwlock_rdlock(&frcti->lock);
 
-        if (frcti->rcv_cr.lwe == frcti->rcv_cr.seqno) {
-                pthread_rwlock_unlock(&frcti->lock);
-                return;
-        }
-
         ackno = frcti->rcv_cr.lwe;
+        if (frcti->rcv_cr.lwe != frcti->rcv_cr.seqno)
+                fd = frcti->fd;
 
-        pthread_rwlock_unlock(&frcti->lock);
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        diff = ts_diff_ns(&frcti->rcv_cr.act, &now);
-
-        if (diff > frcti->a)
-                return;
-
-        if (diff < DELT_ACK)
-                return;
-
-        /* Raw calls needed to bypass frcti. */
-        idx = shm_rdrbuff_alloc_b(ai.rdrb, sizeof(*pci), NULL, &sdb, NULL);
-        if (idx < 0)
-                return;
+        wait = MAX(frcti->rcv_cr.inact - now.tv_sec + frcti->rcv_cr.act.tv_sec,
+                   frcti->snd_cr.inact - now.tv_sec + 
frcti->snd_cr.act.tv_sec);
 
-        pci = (struct frct_pci *) shm_du_buff_head(sdb);
-        memset(pci, 0, sizeof(*pci));
+        if (frcti->snd_cr.cflags & FRCTFLINGER
+            && before(frcti->snd_cr.lwe, frcti->snd_cr.seqno))
+                wait = -wait;
 
-        pci->flags = FRCT_ACK;
-        pci->ackno = hton32(ackno);
-
-        f = &ai.flows[frcti->fd];
-
-        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
-                pthread_rwlock_rdlock(&ai.lock);
-                ipcp_sdb_release(sdb);
-                return;
-        }
-
-        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
-        pthread_rwlock_wrlock(&frcti->lock);
+        pthread_rwlock_unlock(&frcti->lock);
 
-        if (after(frcti->rcv_cr.lwe, frcti->rcv_cr.seqno))
-                frcti->rcv_cr.seqno = frcti->rcv_cr.lwe;
+        if (fd != -1)
+                __send_ack(fd, ackno);
 
-        pthread_rwlock_unlock(&frcti->lock);
+        return wait;
 }
 
 static int __frcti_snd(struct frcti *       frcti,
@@ -315,14 +357,14 @@ static int __frcti_snd(struct frcti *       frcti,
         struct frct_cr *  snd_cr;
         struct frct_cr *  rcv_cr;
         uint32_t          seqno;
+        bool              rtx;
 
         assert(frcti);
 
         snd_cr = &frcti->snd_cr;
         rcv_cr = &frcti->rcv_cr;
 
-        if (frcti->rw != NULL)
-                rxmwheel_move(frcti->rw);
+        timerwheel_move();
 
         pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
         if (pci == NULL)
@@ -334,6 +376,8 @@ static int __frcti_snd(struct frcti *       frcti,
 
         pthread_rwlock_wrlock(&frcti->lock);
 
+        rtx = snd_cr->cflags & FRCTFRTX;
+
         pci->flags |= FRCT_DATA;
 
         /* Set DRF if there are no unacknowledged packets. */
@@ -351,7 +395,7 @@ static int __frcti_snd(struct frcti *       frcti,
         seqno = snd_cr->seqno;
         pci->seqno = hton32(seqno);
 
-        if (!(snd_cr->cflags & FRCTFRTX)) {
+        if (!rtx) {
                 snd_cr->lwe++;
         } else {
                 if (!frcti->probe) {
@@ -372,8 +416,8 @@ static int __frcti_snd(struct frcti *       frcti,
 
         pthread_rwlock_unlock(&frcti->lock);
 
-        if (frcti->rw != NULL)
-                rxmwheel_add(frcti->rw, frcti, seqno, sdb);
+        if (rtx)
+                timerwheel_rxm(frcti, seqno, sdb);
 
         return 0;
 }
@@ -387,12 +431,10 @@ static void rtt_estimator(struct frcti * frcti,
         if (srtt == 0) { /* first measurement */
                 srtt   = mrtt;
                 rttvar = mrtt >> 1;
-
         } else {
                 time_t delta = mrtt - srtt;
                 srtt += (delta >> 3);
-                rttvar -= rttvar >> 2;
-                rttvar += ABS(delta) >> 2;
+                rttvar += (ABS(delta) - rttvar) >> 2;
         }
 
         frcti->srtt     = MAX(1000U, srtt);
@@ -401,12 +443,9 @@ static void rtt_estimator(struct frcti * frcti,
                               frcti->srtt + (frcti->mdev << 1));
 }
 
-static void __frcti_tick(struct frcti * frcti)
+static void __frcti_tick(void)
 {
-        if (frcti->rw != NULL) {
-                rxmwheel_move(frcti->rw);
-                frct_send_ack(frcti);
-        }
+        timerwheel_move();
 }
 
 /* Always queues the next application packet on the RQ. */
@@ -420,6 +459,7 @@ static void __frcti_rcv(struct frcti *       frcti,
         struct frct_cr *  rcv_cr;
         uint32_t          seqno;
         uint32_t          ackno;
+        int               fd = -1;
 
         assert(frcti);
 
@@ -456,8 +496,10 @@ static void __frcti_rcv(struct frcti *       frcti,
         if (!(pci->flags & FRCT_DATA))
                 goto drop_packet;
 
-        if (before(seqno, rcv_cr->lwe))
+        if (before(seqno, rcv_cr->lwe)) {
+                rcv_cr->seqno = seqno;
                 goto drop_packet;
+        }
 
         if (rcv_cr->cflags & FRCTFRTX) {
                 if ((seqno - rcv_cr->lwe) >= RQ_SIZE)
@@ -465,6 +507,8 @@ static void __frcti_rcv(struct frcti *       frcti,
 
                 if (frcti->rq[pos] != -1)
                         goto drop_packet; /* Duplicate in rq. */
+
+                fd = frcti->fd;
         } else {
                 rcv_cr->lwe = seqno;
         }
@@ -475,10 +519,16 @@ static void __frcti_rcv(struct frcti *       frcti,
 
         pthread_rwlock_unlock(&frcti->lock);
 
+        if (fd != -1)
+                timerwheel_ack(fd, frcti);
+
         return;
 
  drop_packet:
         pthread_rwlock_unlock(&frcti->lock);
+
+        frct_send_ack(frcti);
+
         shm_rdrbuff_remove(ai.rdrb, idx);
         return;
 }
diff --git a/src/lib/ipcpd_messages.proto b/src/lib/ipcpd_messages.proto
index b0efe9a..809117b 100644
--- a/src/lib/ipcpd_messages.proto
+++ b/src/lib/ipcpd_messages.proto
@@ -52,5 +52,6 @@ message ipcp_msg {
         optional layer_info_msg layer_info =  9;
         optional int32 response            = 10;
         optional string comp               = 11;
-        optional int32 result              = 12;
+        optional uint32 timeo_sec          = 12;
+        optional int32 result              = 13;
 };
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
deleted file mode 100644
index 0572c7b..0000000
--- a/src/lib/rxmwheel.c
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2020
- *
- * Timerwheel
- *
- *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
- *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include <ouroboros/list.h>
-
-#define RXMQ_S     14                 /* defines #slots           */
-#define RXMQ_M     34                 /* defines max delay  (ns)  */
-#define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution (ns)  */
-#define RXMQ_SLOTS (1 << RXMQ_S)
-#define RXMQ_MAX   (1 << RXMQ_M)      /* us                       */
-
-/* Overflow limits range to about 6 hours. */
-#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
-#define ts_to_slot(ts) ((ts_to_ns(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
-
-struct rxm {
-        struct list_head     next;
-        uint32_t             seqno;
-        struct shm_du_buff * sdb;
-        uint8_t *            head;
-        uint8_t *            tail;
-        time_t               t0;    /* Time when original was sent (us). */
-        size_t               mul;   /* RTO multiplier.                   */
-        struct frcti *       frcti;
-};
-
-struct rxmwheel {
-        struct list_head wheel[RXMQ_SLOTS];
-
-        size_t           prv; /* Last processed slot. */
-        pthread_mutex_t  lock;
-};
-
-static void rxmwheel_destroy(struct rxmwheel * rw)
-{
-        size_t             i;
-        struct list_head * p;
-        struct list_head * h;
-
-        pthread_mutex_destroy(&rw->lock);
-
-        for (i = 0; i < RXMQ_SLOTS; ++i) {
-                list_for_each_safe(p, h, &rw->wheel[i]) {
-                        struct rxm * rxm = list_entry(p, struct rxm, next);
-                        list_del(&rxm->next);
-                        shm_du_buff_ack(rxm->sdb);
-                        ipcp_sdb_release(rxm->sdb);
-                        free(rxm);
-                }
-        }
-}
-
-static struct rxmwheel * rxmwheel_create(void)
-{
-        struct rxmwheel * rw;
-        struct timespec   now;
-        size_t            i;
-
-        rw = malloc(sizeof(*rw));
-        if (rw == NULL)
-                return NULL;
-
-        if (pthread_mutex_init(&rw->lock, NULL)) {
-                free(rw);
-                return NULL;
-        }
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        /* Mark the previous timeslot as the last one processed. */
-        rw->prv = (ts_to_slot(now) - 1) & (RXMQ_SLOTS - 1);
-
-        for (i = 0; i < RXMQ_SLOTS; ++i)
-                list_head_init(&rw->wheel[i]);
-
-        return rw;
-}
-
-static void rxmwheel_move(struct rxmwheel * rw)
-{
-        struct timespec    now;
-        struct list_head * p;
-        struct list_head * h;
-        size_t             slot;
-        size_t             i;
-
-        pthread_mutex_lock(&rw->lock);
-
-        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
-                             (void *) &rw->lock);
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        slot = ts_to_slot(now);
-
-        i = rw->prv;
-
-        if (slot < i)
-                slot += RXMQ_SLOTS;
-
-        while (i++ < slot) {
-                list_for_each_safe(p, h, &rw->wheel[i & (RXMQ_SLOTS - 1)]) {
-                        struct rxm *         r;
-                        struct frct_cr *     snd_cr;
-                        struct frct_cr *     rcv_cr;
-                        size_t               rslot;
-                        ssize_t              idx;
-                        struct shm_du_buff * sdb;
-                        uint8_t *            head;
-                        struct flow *        f;
-                        int                  fd;
-                        uint32_t             snd_lwe;
-                        uint32_t             rcv_lwe;
-                        time_t               rto;
-
-                        r = list_entry(p, struct rxm, next);
-
-                        list_del(&r->next);
-
-                        snd_cr = &r->frcti->snd_cr;
-                        rcv_cr = &r->frcti->rcv_cr;
-                        fd     = r->frcti->fd;
-                        f      = &ai.flows[fd];
-
-                        shm_du_buff_ack(r->sdb);
-
-                        pthread_rwlock_wrlock(&r->frcti->lock);
-
-                        snd_lwe = snd_cr->lwe;
-                        rcv_lwe = rcv_cr->lwe;
-                        rto     = r->frcti->rto;
-                        /* Assume last RTX is the one that's ACK'd. */
-                        if (r->frcti->probe
-                            && (r->frcti->rttseq + 1) == r->seqno)
-                                r->frcti->t_probe = now;
-
-                        pthread_rwlock_unlock(&r->frcti->lock);
-
-                        /* Has been ack'd, remove. */
-                        if ((int) (r->seqno - snd_lwe) < 0) {
-                                ipcp_sdb_release(r->sdb);
-                                free(r);
-                                continue;
-                        }
-
-                        /* Check for r-timer expiry. */
-                        if (ts_to_ns(now) - r->t0 > r->frcti->r) {
-                                ipcp_sdb_release(r->sdb);
-                                free(r);
-                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
-                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
-                                continue;
-                        }
-
-                        /* Copy the payload, safe rtx in other layers. */
-                        if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) {
-                                ipcp_sdb_release(r->sdb);
-                                free(r);
-                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
-                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
-                                continue;
-                        }
-
-                        idx = shm_du_buff_get_idx(sdb);
-
-                        head = shm_du_buff_head(sdb);
-                        memcpy(head, r->head, r->tail - r->head);
-
-                        ipcp_sdb_release(r->sdb);
-
-                        ((struct frct_pci *) head)->ackno = hton32(rcv_lwe);
-
-                        /* Retransmit the copy. */
-                        if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
-                                ipcp_sdb_release(sdb);
-                                free(r);
-                                shm_rbuff_set_acl(f->rx_rb, ACL_FLOWDOWN);
-                                shm_rbuff_set_acl(f->tx_rb, ACL_FLOWDOWN);
-                                continue;
-                        }
-
-                        /* Reschedule. */
-                        shm_du_buff_wait_ack(sdb);
-
-                        shm_flow_set_notify(f->set, f->flow_id, FLOW_PKT);
-
-                        r->head = head;
-                        r->tail = shm_du_buff_tail(sdb);
-                        r->sdb  = sdb;
-
-                        /* Schedule at least in the next time slot */
-                        rslot = (slot + MAX((rto >> RXMQ_R), 1))
-                                & (RXMQ_SLOTS - 1);
-
-                        list_add_tail(&r->next, &rw->wheel[rslot]);
-                }
-        }
-
-        rw->prv = slot & (RXMQ_SLOTS - 1);
-
-        pthread_cleanup_pop(true);
-}
-
-static int rxmwheel_add(struct rxmwheel *    rw,
-                        struct frcti *       frcti,
-                        uint32_t             seqno,
-                        struct shm_du_buff * sdb)
-{
-        struct timespec now;
-        struct rxm *    r;
-        size_t          slot;
-
-        r = malloc(sizeof(*r));
-        if (r == NULL)
-                return -ENOMEM;
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        r->t0    = ts_to_ns(now);
-        r->mul   = 0;
-        r->seqno = seqno;
-        r->sdb   = sdb;
-        r->head  = shm_du_buff_head(sdb);
-        r->tail  = shm_du_buff_tail(sdb);
-        r->frcti = frcti;
-
-        pthread_rwlock_rdlock(&r->frcti->lock);
-
-        slot = (((r->t0 + frcti->rto) >> RXMQ_R) + 1) & (RXMQ_SLOTS - 1);
-
-        pthread_rwlock_unlock(&r->frcti->lock);
-
-        pthread_mutex_lock(&rw->lock);
-
-        list_add_tail(&r->next, &rw->wheel[slot]);
-
-        shm_du_buff_wait_ack(sdb);
-
-        pthread_mutex_unlock(&rw->lock);
-
-        return 0;
-}
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
new file mode 100644
index 0000000..33fcbc1
--- /dev/null
+++ b/src/lib/timerwheel.c
@@ -0,0 +1,409 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Timerwheel
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public License
+ * version 2.1 as published by the Free Software Foundation.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include <ouroboros/list.h>
+
+#define RXMQ_SLOTS (1 << 8)           /* #slots / level.           */
+#define RXMQ_LVLS  3                  /* #levels, bump for DTN.    */
+#define RXMQ_BUMP  4                  /* factor to bump lvl.       */
+#define RXMQ_RES   20                 /* res (ns) of lowest lvl.   */
+
+#define ACKQ_SLOTS (1 << 7)           /* #slots for delayed ACK.   */
+#define ACKQ_RES   20                 /* resolution for dACK.      */
+
+/* Overflow limits range to about 6 hours. */
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+#define ts_to_rxm_slot(ts) (ts_to_ns(ts) >> RXMQ_RES)
+#define ts_to_ack_slot(ts) (ts_to_ns(ts) >> ACKQ_RES)
+
+struct rxm {
+        struct list_head     next;
+        uint32_t             seqno;
+        struct shm_du_buff * sdb;
+        uint8_t *            head;
+        uint8_t *            tail;
+        time_t               t0;      /* Time when original was sent (us). */
+        size_t               mul;     /* RTO multiplier.                   */
+        struct frcti *       frcti;
+        int                  fd;
+        int                  flow_id; /* Prevent rtx when fd reused.       */
+};
+
+struct ack {
+        struct list_head next;
+        struct frcti *   frcti;
+        int              fd;
+        int              flow_id;
+};
+
+struct {
+        /*
+         * At a 1 ms min resolution, every level bumps the
+         * resolution by a factor of 16.
+         */
+        struct list_head rxms[RXMQ_LVLS][RXMQ_SLOTS];
+
+        struct list_head acks[ACKQ_SLOTS];
+        bool             map[ACKQ_SLOTS][PROG_MAX_FLOWS];
+
+        size_t           prv_rxm; /* Last processed rxm slot at lvl 0. */
+        size_t           prv_ack; /* Last processed ack slot.          */
+        pthread_mutex_t  lock;
+} rw;
+
+static void timerwheel_fini(void)
+{
+        size_t             i;
+        size_t             j;
+        struct list_head * p;
+        struct list_head * h;
+
+        pthread_mutex_lock(&rw.lock);
+
+        for (i = 0; i < RXMQ_LVLS; ++i) {
+                for (j = 0; j < RXMQ_SLOTS; j++) {
+                        list_for_each_safe(p, h, &rw.rxms[i][j]) {
+                                struct rxm * rxm;
+                                rxm = list_entry(p, struct rxm, next);
+                                list_del(&rxm->next);
+                                shm_du_buff_ack(rxm->sdb);
+                                ipcp_sdb_release(rxm->sdb);
+                                free(rxm);
+                        }
+                }
+        }
+
+        for (i = 0; i < ACKQ_SLOTS; ++i) {
+                list_for_each_safe(p, h, &rw.acks[i]) {
+                        struct ack * a = list_entry(p, struct ack, next);
+                        list_del(&a->next);
+                        free(a);
+                }
+        }
+
+        pthread_mutex_unlock(&rw.lock);
+
+        pthread_mutex_destroy(&rw.lock);
+}
+
+static int timerwheel_init(void)
+{
+        struct timespec   now;
+        size_t            i;
+        size_t            j;
+
+        if (pthread_mutex_init(&rw.lock, NULL))
+                return -1;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1);
+        for (i = 0; i < RXMQ_LVLS; ++i) {
+                for (j = 0; j < RXMQ_SLOTS; ++j)
+                        list_head_init(&rw.rxms[i][j]);
+        }
+
+        rw.prv_ack = (ts_to_ack_slot(now) - 1) & (ACKQ_SLOTS - 1);
+        for (i = 0; i < ACKQ_SLOTS; ++i)
+                list_head_init(&rw.acks[i]);
+
+        return 0;
+}
+
+static void timerwheel_move(void)
+{
+        struct timespec    now;
+        struct list_head * p;
+        struct list_head * h;
+        size_t             rxm_slot;
+        size_t             ack_slot;
+        size_t             i;
+        size_t             j;
+
+        pthread_mutex_lock(&rw.lock);
+
+        pthread_cleanup_push((void (*) (void *)) pthread_mutex_unlock,
+                             (void *) &rw.lock);
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        rxm_slot = ts_to_ns(now) >> RXMQ_RES;
+        j = rw.prv_rxm;
+        rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1);
+
+        for (i = 0; i < RXMQ_LVLS; ++i) {
+                size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
+                if (j_max_slot < j)
+                        j_max_slot += RXMQ_SLOTS;
+
+                while (j++ < j_max_slot) {
+                        list_for_each_safe(p,
+                                           h,
+                                           &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
+                                struct rxm *         r;
+                                struct frct_cr *     snd_cr;
+                                struct frct_cr *     rcv_cr;
+                                size_t               rslot;
+                                ssize_t              idx;
+                                struct shm_du_buff * sdb;
+                                uint8_t *            head;
+                                struct flow *        f;
+                                uint32_t             snd_lwe;
+                                uint32_t             rcv_lwe;
+                                time_t               rto;
+
+                                r = list_entry(p, struct rxm, next);
+
+                                list_del(&r->next);
+
+                                snd_cr = &r->frcti->snd_cr;
+                                rcv_cr = &r->frcti->rcv_cr;
+                                f      = &ai.flows[r->fd];
+
+                                shm_du_buff_ack(r->sdb);
+
+                                if (f->frcti == NULL
+                                    || f->flow_id != r->flow_id) {
+                                        ipcp_sdb_release(r->sdb);
+                                        free(r);
+                                        continue;
+                                }
+
+                                pthread_rwlock_wrlock(&r->frcti->lock);
+
+                                snd_lwe = snd_cr->lwe;
+                                rcv_lwe = rcv_cr->lwe;
+                                rto     = r->frcti->rto;
+
+                                pthread_rwlock_unlock(&r->frcti->lock);
+
+                                /* Has been ack'd, remove. */
+                                if ((int) (r->seqno - snd_lwe) < 0) {
+                                        ipcp_sdb_release(r->sdb);
+                                        free(r);
+                                        continue;
+                                }
+
+                                /* Check for r-timer expiry. */
+                                if (ts_to_ns(now) - r->t0 > r->frcti->r) {
+                                        ipcp_sdb_release(r->sdb);
+                                        free(r);
+                                        shm_rbuff_set_acl(f->rx_rb,
+                                                          ACL_FLOWDOWN);
+                                        shm_rbuff_set_acl(f->tx_rb,
+                                                          ACL_FLOWDOWN);
+                                        continue;
+                                }
+
+                                if (r->frcti->probe
+                                    && (r->frcti->rttseq + 1) == r->seqno)
+                                        r->frcti->probe = false;
+
+                                /* Copy the data, safe rtx in other layers. */
+                                if (ipcp_sdb_reserve(&sdb, r->tail - r->head)) 
{
+                                        ipcp_sdb_release(r->sdb);
+                                        free(r);
+                                        shm_rbuff_set_acl(f->rx_rb,
+                                                          ACL_FLOWDOWN);
+                                        shm_rbuff_set_acl(f->tx_rb,
+                                                          ACL_FLOWDOWN);
+                                        continue;
+                                }
+
+                                idx = shm_du_buff_get_idx(sdb);
+
+                                head = shm_du_buff_head(sdb);
+                                memcpy(head, r->head, r->tail - r->head);
+
+                                ipcp_sdb_release(r->sdb);
+
+                                ((struct frct_pci *) head)->ackno =
+                                        hton32(rcv_lwe);
+
+                                /* Retransmit the copy. */
+                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL)) {
+                                        ipcp_sdb_release(sdb);
+                                        free(r);
+                                        shm_rbuff_set_acl(f->rx_rb,
+                                                          ACL_FLOWDOWN);
+                                        shm_rbuff_set_acl(f->tx_rb,
+                                                          ACL_FLOWDOWN);
+                                        continue;
+                                }
+
+                                /* Reschedule. */
+                                shm_du_buff_wait_ack(sdb);
+
+                                shm_flow_set_notify(f->set,
+                                                    f->flow_id,
+                                                    FLOW_PKT);
+
+                                r->head = head;
+                                r->tail = shm_du_buff_tail(sdb);
+                                r->sdb  = sdb;
+                                r->mul++;
+
+                                /* Schedule at least in the next time slot. */
+                                rslot = (rxm_slot
+                                         + MAX(((rto * r->mul) >> RXMQ_RES), 
1))
+                                        & (RXMQ_SLOTS - 1);
+
+                                list_add_tail(&r->next, &rw.rxms[i][rslot]);
+                        }
+                }
+                /* Move up a level in the wheel. */
+                rxm_slot >>= RXMQ_BUMP;
+        }
+
+        ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
+
+        j = rw.prv_ack;
+
+        if (ack_slot < j)
+                ack_slot += ACKQ_SLOTS;
+
+        while (j++ < ack_slot) {
+                list_for_each_safe(p, h, &rw.acks[j & (ACKQ_SLOTS - 1)]) {
+                        struct ack *  a;
+                        struct flow * f;
+
+                        a = list_entry(p, struct ack, next);
+
+                        list_del(&a->next);
+
+                        f = &ai.flows[a->fd];
+
+                        rw.map[j & (ACKQ_SLOTS - 1)][a->fd] = false;
+
+                        if (f->flow_id == a->flow_id && f->frcti != NULL)
+                                frct_send_ack(a->frcti);
+
+                        free(a);
+
+                }
+        }
+
+        rw.prv_ack = ack_slot & (ACKQ_SLOTS - 1);
+
+        pthread_cleanup_pop(true);
+}
+
+static int timerwheel_rxm(struct frcti *       frcti,
+                          uint32_t             seqno,
+                          struct shm_du_buff * sdb)
+{
+        struct timespec now;
+        struct rxm *    r;
+        size_t          slot;
+        size_t          lvl = 0;
+        time_t          rto_slot;
+
+        r = malloc(sizeof(*r));
+        if (r == NULL)
+                return -ENOMEM;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        r->t0    = ts_to_ns(now);
+        r->mul   = 0;
+        r->seqno = seqno;
+        r->sdb   = sdb;
+        r->head  = shm_du_buff_head(sdb);
+        r->tail  = shm_du_buff_tail(sdb);
+        r->frcti = frcti;
+
+        pthread_rwlock_rdlock(&r->frcti->lock);
+
+        rto_slot = frcti->rto >> RXMQ_RES;
+        slot     = r->t0 >> RXMQ_RES;
+
+        r->fd      = frcti->fd;
+        r->flow_id = ai.flows[r->fd].flow_id;
+
+        pthread_rwlock_unlock(&r->frcti->lock);
+
+        while (rto_slot >= RXMQ_SLOTS) {
+                ++lvl;
+                rto_slot >>= RXMQ_BUMP;
+                slot >>= RXMQ_BUMP;
+        }
+
+        if (lvl >= RXMQ_LVLS) { /* Out of timerwheel range. */
+                free(r);
+                return -EPERM;
+        }
+
+        slot = (slot + rto_slot) & (RXMQ_SLOTS - 1);
+
+        pthread_mutex_lock(&rw.lock);
+
+        list_add_tail(&r->next, &rw.rxms[lvl][slot]);
+
+        shm_du_buff_wait_ack(sdb);
+
+        pthread_mutex_unlock(&rw.lock);
+
+        return 0;
+}
+
+static int timerwheel_ack(int            fd,
+                          struct frcti * frcti)
+{
+        struct timespec now;
+        struct ack *    a;
+        size_t          slot;
+
+        a = malloc(sizeof(*a));
+        if (a == NULL)
+                return -ENOMEM;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        slot = DELT_ACK >> ACKQ_RES;
+        if (slot >= ACKQ_SLOTS) { /* Out of timerwheel range. */
+                free(a);
+                return -EPERM;
+        }
+
+        slot = (((ts_to_ns(now) + DELT_ACK) >> ACKQ_RES) + 1)
+                & (ACKQ_SLOTS - 1);
+
+        a->fd    = fd;
+        a->frcti = frcti;
+        a->flow_id = ai.flows[fd].flow_id;
+
+        pthread_mutex_lock(&rw.lock);
+
+        if (rw.map[slot][fd]) {
+                pthread_mutex_unlock(&rw.lock);
+                free(a);
+                return 0;
+        }
+
+        rw.map[slot][fd] = true;
+
+        list_add_tail(&a->next, &rw.acks[slot]);
+
+        pthread_mutex_unlock(&rw.lock);
+
+        return 0;
+}
-- 
2.28.0


Other related posts: