[PATCH v2] lib: Add initial rtt estimator to FRCT

  • From: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Fri, 8 Feb 2019 10:47:42 +0100

This adds an simple round-trip time estimator to FRCT. The estimate is
a weighted average with deviation. The retransmission is scheduled
after rtt + 2 times the deviation. A retransmit doubles the rtt
estimate to avoid the no-update case when rtt suddenly increases.  The
rtt is estimated in microseconds and the granularity for retransmits
is 256 microseconds.

Signed-off-by: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
---
 include/ouroboros/utils.h |  1 +
 src/lib/frct.c            | 75 ++++++++++++++++++++++++++++++++-------
 src/lib/rxmwheel.c        | 32 +++++++++++++----
 3 files changed, 90 insertions(+), 18 deletions(-)

diff --git a/include/ouroboros/utils.h b/include/ouroboros/utils.h
index f5b6686..d40a178 100644
--- a/include/ouroboros/utils.h
+++ b/include/ouroboros/utils.h
@@ -28,6 +28,7 @@
 
 #define MIN(a,b) (((a) < (b)) ? (a) : (b))
 #define MAX(a,b) (((a) > (b)) ? (a) : (b))
+#define ABS(a)   ((a) > 0 ? (a) : -(a))
 
 typedef struct {
         uint8_t * data;
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 02c101c..bd08d86 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -50,7 +50,11 @@ struct frcti {
         time_t           a;
         time_t           r;
 
-        time_t           rto; /* ms */
+        time_t           srtt_us;     /* smoothed rtt */
+        time_t           mdev_us;     /* mdev         */
+        uint32_t         rttseq;
+        struct timespec  t_probe;     /* probe time   */
+        bool             probe;       /* probe active */
 
         struct frct_cr   snd_cr;
         struct frct_cr   rcv_cr;
@@ -110,8 +114,11 @@ static struct frcti * frcti_create(int fd)
 
         frcti->snd_cr.inact  = 3 * delta_t;
         frcti->snd_cr.act    = now.tv_sec - (frcti->snd_cr.inact + 1);
-        /* Initial rto. FIXME: recalc using Karn algorithm. */
-        frcti->rto           = 120;
+        /* rtt estimator. rto is currently srtt + 2 * mdev */
+        frcti->srtt_us       = 0;      /* updated on first ACK */
+        frcti->mdev_us       = 100000; /* initial rxm will be after 200 ms */
+        frcti->rttseq        = 0;
+        frcti->probe         = false;
 
         if (ai.flows[fd].spec.loss == 0) {
                 frcti->snd_cr.cflags |= FRCTFRTX;
@@ -200,6 +207,18 @@ static struct frct_pci * frcti_alloc_head(struct 
shm_du_buff * sdb)
         return pci;
 }
 
+static bool before(uint32_t seq1,
+                   uint32_t seq2)
+{
+        return (int32_t)(seq1 - seq2) < 0;
+}
+
+static bool after(uint32_t seq1,
+                  uint32_t seq2)
+{
+        return (int32_t)(seq2 - seq1) < 0;
+}
+
 static int __frcti_snd(struct frcti *       frcti,
                        struct shm_du_buff * sdb)
 {
@@ -219,7 +238,7 @@ static int __frcti_snd(struct frcti *       frcti,
         if (pci == NULL)
                 return -1;
 
-        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+        clock_gettime(CLOCK_REALTIME, &now);
 
         pthread_rwlock_wrlock(&frcti->lock);
 
@@ -244,12 +263,20 @@ static int __frcti_snd(struct frcti *       frcti,
         pci->seqno = hton32(snd_cr->seqno);
         if (!(snd_cr->cflags & FRCTFRTX)) {
                 snd_cr->lwe++;
-        } else if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
-                rxmwheel_add(frcti, snd_cr->seqno, sdb);
-                if (rcv_cr->lwe <= rcv_cr->seqno) {
-                        pci->flags |= FRCT_ACK;
-                        pci->ackno = hton32(rcv_cr->seqno);
-                        rcv_cr->lwe = rcv_cr->seqno;
+        } else {
+                if (!frcti->probe) {
+                        frcti->rttseq  = snd_cr->seqno;
+                        frcti->t_probe = now;
+                        frcti->probe   = true;
+                }
+
+                if (now.tv_sec - rcv_cr->act <= rcv_cr->inact) {
+                        rxmwheel_add(frcti, snd_cr->seqno, sdb);
+                        if (rcv_cr->lwe <= rcv_cr->seqno) {
+                                pci->flags |= FRCT_ACK;
+                                pci->ackno = hton32(rcv_cr->seqno);
+                                rcv_cr->lwe = rcv_cr->seqno;
+                        }
                 }
         }
 
@@ -261,6 +288,26 @@ static int __frcti_snd(struct frcti *       frcti,
         return 0;
 }
 
+static void rtt_estimator(struct frcti * frcti,
+                          time_t         mrtt_us)
+{
+        time_t srtt = frcti->srtt_us;
+        time_t mdev = frcti->mdev_us;
+
+        if (srtt != 0) {
+                srtt -= (srtt >> 3);
+                srtt += mrtt_us >> 3;   /* rtt = 7/8 rtt + 1/8 new */
+                mdev -=  (mdev >> 2);
+                mdev += ABS(srtt - mrtt_us) >> 2;
+        } else {
+                srtt = mrtt_us << 3;    /* take the measured time to be rtt */
+                mdev = mrtt_us >> 1;    /* take half mrtt_us as deviation */
+        }
+
+        frcti->srtt_us = MAX(1U, srtt);
+        frcti->mdev_us = MAX(1U, mdev);
+}
+
 /* Returns 0 when idx contains a packet for the application. */
 static int __frcti_rcv(struct frcti *       frcti,
                        struct shm_du_buff * sdb)
@@ -280,7 +327,7 @@ static int __frcti_rcv(struct frcti *       frcti,
 
         pci = (struct frct_pci *) shm_du_buff_head_release(sdb, FRCT_PCILEN);
 
-        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+        clock_gettime(CLOCK_REALTIME, &now);
 
         pthread_rwlock_wrlock(&frcti->lock);
 
@@ -300,7 +347,7 @@ static int __frcti_rcv(struct frcti *       frcti,
         if (seqno == rcv_cr->seqno) {
                 ++rcv_cr->seqno;
         } else { /* Out of order. */
-                if ((int32_t)(seqno - rcv_cr->seqno) < 0)
+                if (before(seqno, rcv_cr->seqno))
                         goto drop_packet;
 
                 if (rcv_cr->cflags & FRCTFRTX) {
@@ -321,6 +368,10 @@ static int __frcti_rcv(struct frcti *       frcti,
                 /* Check for duplicate (old) acks. */
                 if ((int32_t)(ackno - snd_cr->lwe) >= 0)
                         snd_cr->lwe = ackno;
+                if (frcti->probe && after(ackno, frcti->rttseq)) {
+                        rtt_estimator(frcti, ts_diff_us(&frcti->t_probe, 
&now));
+                        frcti->probe = false;
+                }
         }
 
         rcv_cr->act = now.tv_sec;
diff --git a/src/lib/rxmwheel.c b/src/lib/rxmwheel.c
index 395a5d8..56ac729 100644
--- a/src/lib/rxmwheel.c
+++ b/src/lib/rxmwheel.c
@@ -22,15 +22,16 @@
 
 #include <ouroboros/list.h>
 
-#define RXMQ_S     12                 /* defines #slots     */
-#define RXMQ_M     15                 /* defines max delay  */
+#define RXMQ_S     16                 /* defines #slots     */
+#define RXMQ_M     24                 /* defines max delay  */
 #define RXMQ_R     (RXMQ_M - RXMQ_S)  /* defines resolution */
 #define RXMQ_SLOTS (1 << RXMQ_S)
 #define RXMQ_MAX   (1 << RXMQ_M)      /* ms                 */
 
 /* Small inacurracy to avoid slow division by MILLION. */
 #define ts_to_ms(ts) (ts.tv_sec * 1000 + (ts.tv_nsec >> 20))
-#define ts_to_slot(ts) ((ts_to_ms(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
+#define ts_to_us(ts) (ts.tv_sec * MILLION + (ts.tv_nsec >> 10))
+#define ts_to_slot(ts) ((ts_to_us(ts) >> RXMQ_R) & (RXMQ_SLOTS - 1))
 
 struct rxm {
         struct list_head     next;
@@ -109,6 +110,21 @@ static void rxmwheel_clear(int fd)
         pthread_mutex_unlock(&rw.lock);
 }
 
+static void check_probe(struct frcti * frcti,
+                        uint32_t       seqno)
+{
+        /* disable rtt probe if this packet */
+
+        /* TODO: This should be locked, but lock reversal! */
+
+        if (frcti->probe && ((frcti->rttseq + 1) == seqno)) {
+                /* Backoff to avoid never updating rtt */
+                frcti->srtt_us <<= 1;
+                frcti->probe = false;
+        }
+}
+
+#define rto(frcti) (frcti->srtt_us + (frcti->mdev_us << 1))
 /* Return fd on r-timer expiry. */
 static int rxmwheel_move(void)
 {
@@ -148,6 +164,10 @@ static int rxmwheel_move(void)
                                 free(r);
                                 continue;
                         }
+
+                        /* Disable using this seqno as rto probe. */
+                        check_probe(r->frcti, r->seqno);
+
                         /* Check for r-timer expiry. */
                         if (ts_to_ms(now) - r->t0 > r->frcti->r) {
                                 int fd = r->frcti->fd;
@@ -201,7 +221,7 @@ static int rxmwheel_move(void)
                         r->tail = shm_du_buff_tail(sdb);
                         r->sdb  = sdb;
 
-                        newtime = ts_to_ms(now) + (f->frcti->rto << ++r->mul);
+                        newtime = ts_to_us(now) + rto(f->frcti);
                         rslot   = (newtime >> RXMQ_R) & (RXMQ_SLOTS - 1);
 
                         list_add_tail(&r->next, &rw.wheel[rslot]);
@@ -231,7 +251,7 @@ static int rxmwheel_add(struct frcti *       frcti,
 
         pthread_mutex_lock(&rw.lock);
 
-        r->t0    = ts_to_ms(now);
+        r->t0    = ts_to_us(now);
         r->mul   = 0;
         r->seqno = seqno;
         r->sdb   = sdb;
@@ -239,7 +259,7 @@ static int rxmwheel_add(struct frcti *       frcti,
         r->tail  = shm_du_buff_tail(sdb);
         r->frcti = frcti;
 
-        slot = ((r->t0 + frcti->rto) >> RXMQ_R) & (RXMQ_SLOTS - 1);
+        slot = ((r->t0 + rto(frcti)) >> RXMQ_R) & (RXMQ_SLOTS - 1);
 
         list_add_tail(&r->next, &rw.wheel[slot]);
 
-- 
2.20.1


Other related posts: