[PATCH] lib: Fix buffer allocation when retransmitting

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Thu, 10 Mar 2022 08:23:15 +0100

The timerwheel was retransmitting packets and the error check for
negative values of the rbuff allocation was instead checking for
non-zero values, causing a buffer allocation to succeed but the
program to continue down the unhappy path leaving that packet stuck in
the buffer unattended.

Also fixes wrongly scheduled retransmissions that cause packet storms.
FRCP is much more stable now. Still needs some work for high
bandwidth-delay products (fast-retransmit).

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/irmd/main.c       |  3 +++
 src/lib/config.h.in   |  5 ++---
 src/lib/frct.c        | 20 +++++++++++++++-----
 src/lib/shm_rdrbuff.c |  2 +-
 src/lib/timerwheel.c  | 38 +++++++++++++++++++++++++++++---------
 5 files changed, 50 insertions(+), 18 deletions(-)

diff --git a/src/irmd/main.c b/src/irmd/main.c
index fab9497d..6d7f7a2a 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -1401,6 +1401,9 @@ static int flow_alloc(pid_t              pid,
         int                 state;
         uint8_t *           hash;
 
+        log_info("Allocating flow for %d to %s.\n",
+                 pid, dst);
+
         ipcp = join ? get_ipcp_entry_by_layer(dst)
                     : get_ipcp_by_dst_name(dst, pid);
         if (ipcp == NULL) {
diff --git a/src/lib/config.h.in b/src/lib/config.h.in
index 5c5b6caf..d534cf77 100644
--- a/src/lib/config.h.in
+++ b/src/lib/config.h.in
@@ -69,9 +69,8 @@
 #define DU_BUFF_TAILSPACE   @DU_BUFF_TAILSPACE@
 
 /* Default Delta-t parameters */
-#define DELT_MPL            (@DELTA_T_MPL@ * BILLION)        /* ns */
-#define DELT_A              (@DELTA_T_ACK@ * BILLION)        /* ns */
-#define DELT_R              (@DELTA_T_RTX@ * BILLION)        /* ns */
+#define DELT_A              (@DELTA_T_ACK@)                  /* ns */
+#define DELT_R              (@DELTA_T_RTX@)                  /* ns */
 
 #define DELT_ACK            (@DELTA_T_ACK_DELAY@ * MILLION)  /* ns */
 
diff --git a/src/lib/frct.c b/src/lib/frct.c
index e9ee7718..5540ad2e 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -53,6 +53,8 @@ struct frcti {
         struct timespec   t_probe;     /* Probe time             */
         bool              probe;       /* Probe active           */
 
+        size_t            n_rtx;       /* Number of rxm packets  */
+
         struct frct_cr    snd_cr;
         struct frct_cr    rcv_cr;
 
@@ -130,7 +132,8 @@ static int frct_rib_read(const char * path,
                 "Receiver left window edge:       %20u\n"
                 "Receiver right window edge:      %20u\n"
                 "Receiver inactive (ns):          %20ld\n"
-                "Receiver last ack:               %20u\n",
+                "Receiver last ack:               %20u\n"
+                "Number of pkt retransmissions:   %20zu\n",
                 frcti->mpl,
                 frcti->a,
                 frcti->r,
@@ -144,7 +147,8 @@ static int frct_rib_read(const char * path,
                 frcti->rcv_cr.lwe,
                 frcti->rcv_cr.rwe,
                 ts_diff_ns(&frcti->rcv_cr.act, &now),
-                frcti->rcv_cr.seqno);
+                frcti->rcv_cr.seqno,
+                frcti->n_rtx);
 
         pthread_rwlock_unlock(&flow->frcti->lock);
 
@@ -310,6 +314,10 @@ static struct frcti * frcti_create(int    fd,
 #ifdef PROC_FLOW_STATS
         char                frctstr[FRCT_NAME_STRLEN + 1];
 #endif
+        mpl *= BILLION;
+        a   *= BILLION;
+        r   *= BILLION;
+
         frcti = malloc(sizeof(*frcti));
         if (frcti == NULL)
                 goto fail_malloc;
@@ -354,7 +362,9 @@ static struct frcti * frcti_create(int    fd,
 
         frcti->srtt = 0;            /* Updated on first ACK */
         frcti->mdev = 10 * MILLION; /* Initial rxm will be after 20 ms */
-        frcti->rto  = 20 * MILLION; /* Initial rxm will be after 20 ms */
+        frcti->rto  = BILLION;      /* Initial rxm will be after 1 s   */
+
+        frcti->n_rtx = 0;
 
         if (ai.flows[fd].qs.loss == 0) {
                 frcti->snd_cr.cflags |= FRCTFRTX | FRCTFLINGER;
@@ -739,7 +749,7 @@ static void rtt_estimator(struct frcti * frcti,
 
         frcti->srtt     = MAX(1000U, srtt);
         frcti->mdev     = MAX(100U, rttvar);
-        frcti->rto      = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 1));
+        frcti->rto      = MAX(RTO_MIN, frcti->srtt + (frcti->mdev << 2));
 }
 
 static void __frcti_tick(void)
@@ -803,7 +813,7 @@ static void __frcti_rcv(struct frcti *       frcti,
                 if (after(ackno, frcti->snd_cr.lwe))
                         frcti->snd_cr.lwe = ackno;
 
-                if (frcti->probe && !before(frcti->rttseq, ackno)) {
+                if (frcti->probe && after(ackno, frcti->rttseq)) {
                         rtt_estimator(frcti, ts_diff_ns(&frcti->t_probe, 
&now));
                         frcti->probe = false;
                 }
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index e3552100..dfa45af6 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -578,7 +578,7 @@ uint8_t * shm_du_buff_head_release(struct shm_du_buff * sdb,
 }
 
 uint8_t * shm_du_buff_tail_release(struct shm_du_buff * sdb,
-                              size_t               size)
+                                   size_t               size)
 {
         assert(sdb);
         assert(!(size > sdb->du_tail - sdb->du_head));
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 5c0fbfa0..0a6e48e1 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -174,6 +174,7 @@ static void timerwheel_move(void)
                                 uint32_t             snd_lwe;
                                 uint32_t             rcv_lwe;
                                 time_t               rto;
+                                size_t               new_i;
 
                                 r = list_entry(p, struct rxm, next);
 
@@ -205,26 +206,36 @@ static void timerwheel_move(void)
                                 if (ts_to_ns(now) - r->t0 > r->frcti->r)
                                         goto flow_down;
 
+                                pthread_rwlock_wrlock(&r->frcti->lock);
+
                                 if (r->frcti->probe
-                                    && (r->frcti->rttseq + 1) == r->seqno)
+                                    && (r->frcti->rttseq == r->seqno)) {
                                         r->frcti->probe = false;
+                                        r->frcti->rto += (rto >> 3);
+                                }
+
+                                r->frcti->n_rtx++;
+
+                                pthread_rwlock_unlock(&r->frcti->lock);
 #ifdef RXM_BLOCKING
   #ifdef RXM_BUFFER_ON_HEAP
-                                if (ipcp_sdb_reserve(&sdb, r->pkt_len))
+                                if (ipcp_sdb_reserve(&sdb, r->pkt_len) < 0)
   #else
-                                if (ipcp_sdb_reserve(&sdb, r->tail - r->head))
+                                if (ipcp_sdb_reserve(&sdb,
+                                                     r->tail - r->head) < 0)
   #endif
 #else
   #ifdef RXM_BUFFER_ON_HEAP
                                 if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, 
NULL,
-                                                      &sdb))
+                                                      &sdb) < 0)
   #else
                                 if (shm_rdrbuff_alloc(ai.rdrb,
                                                       r->tail - r->head, NULL,
-                                                      &sdb))
+                                                      &sdb) < 0)
   #endif
 #endif
                                         goto reschedule; /* rbuff full */
+
                                 idx = shm_du_buff_get_idx(sdb);
 
                                 head = shm_du_buff_head(sdb);
@@ -248,15 +259,24 @@ static void timerwheel_move(void)
 #endif
                                         shm_flow_set_notify(f->set, f->flow_id,
                                                             FLOW_PKT);
+
                         reschedule:
-                                r->mul++;
+                                rslot = (rto << r->mul++) >> (RXMQ_RES * i);
+
+                                new_i = i;
+                                while (rslot >= RXMQ_SLOTS) {
+                                        ++ new_i;
+                                        rslot >>= RXMQ_BUMP;
+                                }
+
+                                if (new_i >= RXMQ_LVLS) /* Can't reschedule */
+                                        continue;
 
                                 /* Schedule at least in the next time slot. */
-                                rslot = (rxm_slot
-                                         + MAX(((rto * r->mul) >> RXMQ_RES), 
1))
+                                rslot = ((rxm_slot >> (RXMQ_BUMP * (new_i - 
1))) + MAX(rslot, 1))
                                         & (RXMQ_SLOTS - 1);
 
-                                list_add_tail(&r->next, &rw.rxms[i][rslot]);
+                                list_add_tail(&r->next, 
&rw.rxms[new_i][rslot]);
 
                                 continue;
 
-- 
2.35.1


Other related posts:

  • » [PATCH] lib: Fix buffer allocation when retransmitting - Dimitri Staessens