[PATCH] lib: Fix retransmission scheduling

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sun, 13 Mar 2022 17:45:03 +0100

There still were a couple of bugs in the timerwheel. If the future
schedule was coinciding with the slot currently being processed
(i.e. exactly RXMQ_SLOTS in the future), the list_add_tail caused an
infinite loop. Another bug was causing the slots at higher levels to
be processed too soon.

Retransmissions should now schedule correctly.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/lib/timerwheel.c | 132 ++++++++++++++++++++-----------------------
 1 file changed, 60 insertions(+), 72 deletions(-)

diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 0a6e48e1..661cc456 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -30,14 +30,11 @@
 struct rxm {
         struct list_head     next;
         uint32_t             seqno;
-#ifdef RXM_BUFFER_ON_HEAP
-        uint8_t *            pkt;
-        size_t               pkt_len;
-#else
+#ifndef RXM_BUFFER_ON_HEAP
         struct shm_du_buff * sdb;
-        uint8_t *            head;
-        uint8_t *            tail;
 #endif
+        struct frct_pci *    pkt;
+        size_t               len;
         time_t               t0;      /* Time when original was sent (us). */
         size_t               mul;     /* RTO multiplier.                   */
         struct frcti *       frcti;
@@ -62,8 +59,8 @@ struct {
         struct list_head acks[ACKQ_SLOTS];
         bool             map[ACKQ_SLOTS][PROG_MAX_FLOWS];
 
-        size_t           prv_rxm; /* Last processed rxm slot at lvl 0. */
-        size_t           prv_ack; /* Last processed ack slot.          */
+        size_t           prv_rxm[RXMQ_LVLS]; /* Last processed rxm slots. */
+        size_t           prv_ack;            /* Last processed ack slot.  */
         pthread_mutex_t  lock;
 
         bool             in_use;
@@ -119,8 +116,10 @@ static int timerwheel_init(void)
 
         clock_gettime(PTHREAD_COND_CLOCK, &now);
 
-        rw.prv_rxm = (ts_to_rxm_slot(now) - 1) & (RXMQ_SLOTS - 1);
         for (i = 0; i < RXMQ_LVLS; ++i) {
+                rw.prv_rxm[i] = (ts_to_rxm_slot(now) - 1);
+                rw.prv_rxm[i] >>= (RXMQ_BUMP * i);
+                rw.prv_rxm[i] &= (RXMQ_SLOTS - 1);
                 for (j = 0; j < RXMQ_SLOTS; ++j)
                         list_head_init(&rw.rxms[i][j]);
         }
@@ -151,30 +150,30 @@ static void timerwheel_move(void)
 
         clock_gettime(PTHREAD_COND_CLOCK, &now);
 
-        rxm_slot = ts_to_ns(now) >> RXMQ_RES;
-        j = rw.prv_rxm;
-        rw.prv_rxm = rxm_slot & (RXMQ_SLOTS - 1);
+        rxm_slot = ts_to_rxm_slot(now);
 
         for (i = 0; i < RXMQ_LVLS; ++i) {
                 size_t j_max_slot = rxm_slot & (RXMQ_SLOTS - 1);
+                j = rw.prv_rxm[i];
                 if (j_max_slot < j)
                         j_max_slot += RXMQ_SLOTS;
-
                 while (j++ < j_max_slot) {
                         list_for_each_safe(p, h,
                                            &rw.rxms[i][j & (RXMQ_SLOTS - 1)]) {
                                 struct rxm *         r;
                                 struct frct_cr *     snd_cr;
                                 struct frct_cr *     rcv_cr;
+                                size_t               slot;
                                 size_t               rslot;
                                 ssize_t              idx;
                                 struct shm_du_buff * sdb;
-                                uint8_t *            head;
+                                struct frct_pci *    pci;
                                 struct flow *        f;
                                 uint32_t             snd_lwe;
                                 uint32_t             rcv_lwe;
                                 time_t               rto;
-                                size_t               new_i;
+                                size_t               lvl = 0;
+                                time_t               act;
 
                                 r = list_entry(p, struct rxm, next);
 
@@ -195,6 +194,7 @@ static void timerwheel_move(void)
                                 snd_lwe = snd_cr->lwe;
                                 rcv_lwe = rcv_cr->lwe;
                                 rto     = r->frcti->rto;
+                                act     = ts_to_ns(r->frcti->rcv_cr.act);
 
                                 pthread_rwlock_unlock(&r->frcti->lock);
 
@@ -209,75 +209,64 @@ static void timerwheel_move(void)
                                 pthread_rwlock_wrlock(&r->frcti->lock);
 
                                 if (r->frcti->probe
-                                    && (r->frcti->rttseq == r->seqno)) {
+                                    && (r->frcti->rttseq == r->seqno))
                                         r->frcti->probe = false;
-                                        r->frcti->rto += (rto >> 3);
-                                }
 
                                 r->frcti->n_rtx++;
 
                                 pthread_rwlock_unlock(&r->frcti->lock);
+
+                                if (ts_to_ns(now) - act > (rto << 2))
+                                        rto <<= r->mul++;
+                                else
+                                        r->mul = 0;
+
+                                /* Schedule at least in the next time slot. */
+                                slot = ts_to_ns(now) >> RXMQ_RES;
+                                rslot = rto >> RXMQ_RES;
+
+                                while (rslot >= RXMQ_SLOTS) {
+                                        ++lvl;
+                                        rslot >>= RXMQ_BUMP;
+                                        slot >>= RXMQ_BUMP;
+                                }
+
+                                if (lvl >= RXMQ_LVLS) /* Can't reschedule */
+                                        goto flow_down;
+
+                                rslot = (rslot + slot) & (RXMQ_SLOTS - 1);
+
 #ifdef RXM_BLOCKING
-  #ifdef RXM_BUFFER_ON_HEAP
-                                if (ipcp_sdb_reserve(&sdb, r->pkt_len) < 0)
-  #else
-                                if (ipcp_sdb_reserve(&sdb,
-                                                     r->tail - r->head) < 0)
-  #endif
+                                if (ipcp_sdb_reserve(&sdb, r->len) < 0)
 #else
-  #ifdef RXM_BUFFER_ON_HEAP
-                                if (shm_rdrbuff_alloc(ai.rdrb, r->pkt_len, 
NULL,
+                                if (shm_rdrbuff_alloc(ai.rdrb, r->len, NULL,
                                                       &sdb) < 0)
-  #else
-                                if (shm_rdrbuff_alloc(ai.rdrb,
-                                                      r->tail - r->head, NULL,
-                                                      &sdb) < 0)
-  #endif
 #endif
-                                        goto reschedule; /* rbuff full */
+                                        goto reschedule; /* rdrbuff full */
 
-                                idx = shm_du_buff_get_idx(sdb);
-
-                                head = shm_du_buff_head(sdb);
-#ifdef RXM_BUFFER_ON_HEAP
-                                memcpy(head, r->pkt, r->pkt_len);
-#else
-                                memcpy(head, r->head, r->tail - r->head);
+                                pci = (struct frct_pci *) 
shm_du_buff_head(sdb);
+                                memcpy(pci, r->pkt, r->len);
+#ifndef RXM_BUFFER_ON_HEAP
                                 ipcp_sdb_release(r->sdb);
-                                r->sdb  = sdb;
-                                r->head = head;
-                                r->tail = shm_du_buff_tail(sdb);
+                                r->sdb = sdb;
+                                r->pkt = pci;
                                 shm_du_buff_wait_ack(sdb);
 #endif
+                                idx = shm_du_buff_get_idx(sdb);
+
                                 /* Retransmit the copy. */
-                                ((struct frct_pci *) head)->ackno =
-                                        hton32(rcv_lwe);
+                                pci->ackno = hton32(rcv_lwe);
 #ifdef RXM_BLOCKING
-                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL) == 
0)
+                                if (shm_rbuff_write_b(f->tx_rb, idx, NULL) < 0)
 #else
-                                if (shm_rbuff_write(f->tx_rb, idx) == 0)
+                                if (shm_rbuff_write(f->tx_rb, idx) < 0)
 #endif
-                                        shm_flow_set_notify(f->set, f->flow_id,
-                                                            FLOW_PKT);
+                                        goto flow_down;
+                                shm_flow_set_notify(f->set, f->flow_id,
+                                                    FLOW_PKT);
 
                         reschedule:
-                                rslot = (rto << r->mul++) >> (RXMQ_RES * i);
-
-                                new_i = i;
-                                while (rslot >= RXMQ_SLOTS) {
-                                        ++ new_i;
-                                        rslot >>= RXMQ_BUMP;
-                                }
-
-                                if (new_i >= RXMQ_LVLS) /* Can't reschedule */
-                                        continue;
-
-                                /* Schedule at least in the next time slot. */
-                                rslot = ((rxm_slot >> (RXMQ_BUMP * (new_i - 
1))) + MAX(rslot, 1))
-                                        & (RXMQ_SLOTS - 1);
-
-                                list_add_tail(&r->next, 
&rw.rxms[new_i][rslot]);
-
+                                list_add(&r->next, &rw.rxms[lvl][rslot]);
                                 continue;
 
                         flow_down:
@@ -292,9 +281,9 @@ static void timerwheel_move(void)
                                 free(r);
                         }
                 }
+                rw.prv_rxm[i] = rxm_slot & (RXMQ_SLOTS - 1);
                 /* Move up a level in the wheel. */
                 rxm_slot >>= RXMQ_BUMP;
-                j >>= RXMQ_BUMP;
         }
 
         ack_slot = ts_to_ack_slot(now) & (ACKQ_SLOTS - 1) ;
@@ -350,18 +339,17 @@ static int timerwheel_rxm(struct frcti *       frcti,
         r->mul   = 0;
         r->seqno = seqno;
         r->frcti = frcti;
+        r->len  = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
 #ifdef RXM_BUFFER_ON_HEAP
-        r->pkt_len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
-        r->pkt = malloc(r->pkt_len);
+        r->pkt = malloc(r->len);
         if (r->pkt == NULL) {
                 free(r);
                 return -ENOMEM;
         }
-        memcpy(r->pkt, shm_du_buff_head(sdb), r->pkt_len);
+        memcpy(r->pkt, shm_du_buff_head(sdb), r->len);
 #else
-        r->sdb   = sdb;
-        r->head  = shm_du_buff_head(sdb);
-        r->tail  = shm_du_buff_tail(sdb);
+        r->sdb = sdb;
+        r->pkt = (struct frct_pci *) shm_du_buff_head(sdb);
 #endif
         pthread_rwlock_rdlock(&r->frcti->lock);
 
-- 
2.35.1


Other related posts: