[PATCH v2] lib: Revise FRCT reordering

  • From: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Mon, 4 Jun 2018 15:31:49 +0200

The reordering queue is replaced by a fixed ring buffer for speed and
simplicity.

Signed-off-by: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
---
 src/lib/CMakeLists.txt |   1 -
 src/lib/dev.c          |   3 -
 src/lib/frct.c         |  88 ++++++++++++-----------
 src/lib/rq.c           | 157 -----------------------------------------
 src/lib/rq.h           |  47 ------------
 5 files changed, 47 insertions(+), 249 deletions(-)
 delete mode 100644 src/lib/rq.c
 delete mode 100644 src/lib/rq.h

diff --git a/src/lib/CMakeLists.txt b/src/lib/CMakeLists.txt
index 4757f88..973c245 100644
--- a/src/lib/CMakeLists.txt
+++ b/src/lib/CMakeLists.txt
@@ -201,7 +201,6 @@ set(SOURCE_FILES_COMMON
   qoscube.c
   random.c
   rib.c
-  rq.c
   sha3.c
   shm_flow_set.c
   shm_rbuff.c
diff --git a/src/lib/dev.c b/src/lib/dev.c
index f126448..4a1c668 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -23,7 +23,6 @@
 #include <ouroboros/endian.h>
 
 #define _POSIX_C_SOURCE 200809L
-
 #include "config.h"
 
 #include <ouroboros/hash.h>
@@ -43,8 +42,6 @@
 #include <ouroboros/qoscube.h>
 #include <ouroboros/timerwheel.h>
 
-#include "rq.h"
-
 #include <stdlib.h>
 #include <string.h>
 #include <stdio.h>
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 54f822f..2eb79fb 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -25,7 +25,7 @@
 #define DELT_A         0     /* ms */
 #define DELT_R         2000  /* ms */
 
-#define RQ_SIZE        20
+#define RQ_SIZE        64
 
 #define TW_ELEMENTS    6000
 #define TW_RESOLUTION  1     /* ms */
@@ -56,7 +56,7 @@ struct frcti {
         struct frct_cr   snd_cr;
         struct frct_cr   rcv_cr;
 
-        struct rq *      rq;
+        size_t           rq[RQ_SIZE];
 
         struct timespec  rtt;
 
@@ -108,7 +108,8 @@ static void frct_fini(void)
 static struct frcti * frcti_create(int fd)
 {
         struct frcti * frcti;
-        time_t delta_t;
+        time_t         delta_t;
+        ssize_t        idx;
 
         frcti = malloc(sizeof(*frcti));
         if (frcti == NULL)
@@ -117,9 +118,8 @@ static struct frcti * frcti_create(int fd)
         if (pthread_rwlock_init(&frcti->lock, NULL))
                 goto fail_lock;
 
-        frcti->rq = rq_create(RQ_SIZE);
-        if (frcti->rq == NULL)
-                goto fail_rq;
+        for (idx = 0; idx < RQ_SIZE; ++idx)
+                frcti->rq[idx] = -1;
 
         frcti->mpl = DELT_MPL;
         frcti->a   = DELT_A;
@@ -138,18 +138,16 @@ static struct frcti * frcti_create(int fd)
         frcti->snd_cr.lwe    = 0;
         frcti->snd_cr.rwe    = 0;
         frcti->snd_cr.cflags = 0;
-        frcti->snd_cr.inact  = 2 * delta_t + 1;
+        frcti->snd_cr.inact  = 3 * delta_t + 1;
 
         frcti->rcv_cr.drf    = true;
         frcti->rcv_cr.lwe    = 0;
         frcti->rcv_cr.rwe    = 0;
         frcti->rcv_cr.cflags = 0;
-        frcti->rcv_cr.inact  = 3 * delta_t + 1;
+        frcti->rcv_cr.inact  = 2 * delta_t + 1;
 
         return frcti;
 
- fail_rq:
-        pthread_rwlock_destroy(&frcti->lock);
  fail_lock:
         free(frcti);
  fail_malloc:
@@ -165,7 +163,6 @@ static void frcti_destroy(struct frcti * frcti)
 
         pthread_rwlock_destroy(&frcti->lock);
 
-        rq_destroy(frcti->rq);
         free(frcti);
 }
 
@@ -213,18 +210,29 @@ static uint16_t frcti_getconf(struct frcti * frcti)
 
 static ssize_t __frcti_queued_pdu(struct frcti * frcti)
 {
-        ssize_t idx = -1;
+        ssize_t idx;
+        size_t  pos;
 
         assert(frcti);
 
         /* See if we already have the next PDU. */
         pthread_rwlock_wrlock(&frcti->lock);
 
-        if (!rq_is_empty(frcti->rq)) {
-                if (rq_peek(frcti->rq) == frcti->rcv_cr.lwe) {
-                        ++frcti->rcv_cr.lwe;
-                        idx = rq_pop(frcti->rq);
+        pos = frcti->rcv_cr.lwe & (RQ_SIZE - 1);
+        idx = frcti->rq[pos];
+        if (idx != -1) {
+                struct shm_du_buff * sdb;
+                struct frct_pci *    pci;
+
+                sdb = shm_rdrbuff_get(ai.rdrb, idx);
+                pci = (struct frct_pci *) shm_du_buff_head(sdb) - 1;
+                if (pci->flags & FRCT_CFG) {
+                        assert(pci->flags & FRCT_DRF);
+                        frcti->rcv_cr.cflags = pci->cflags;
                 }
+
+                ++frcti->rcv_cr.lwe;
+                frcti->rq[pos] = -1;
         }
 
         pthread_rwlock_unlock(&frcti->lock);
@@ -343,41 +351,41 @@ static int __frcti_rcv(struct frcti *       frcti,
         if (pci->flags & FRCT_CRC) {
                 uint8_t * tail = shm_du_buff_tail_release(sdb, FRCT_CRCLEN);
                 if (frct_chk_crc((uint8_t *) pci, tail))
-                        goto fail_clean;
+                        goto drop_packet;
         }
 
         /* Check if receiver inactivity is true. */
         if (!rcv_cr->drf && now.tv_sec - rcv_cr->act > rcv_cr->inact)
                 rcv_cr->drf = true;
 
-        /* When there is receiver inactivity and no DRF, drop the PDU. */
-        if (rcv_cr->drf && !(pci->flags & FRCT_DRF))
-                goto fail_clean;
-
         seqno = ntoh32(pci->seqno);
 
+        if (rcv_cr->drf) {
+                /* Inactive receiver, check for DRF. */
+                if (pci->flags & FRCT_DRF) /* New run. */
+                        rcv_cr->lwe = seqno;
+                else
+                        goto drop_packet;
+        }
+
         /* Queue the PDU if needed. */
         if (rcv_cr->cflags & FRCTFORDERING) {
-                if (seqno != frcti->rcv_cr.lwe) {
-                        /* NOTE: queued PDUs head/tail without PCI. */
-                        if (rq_push(frcti->rq, seqno, idx))
-                                shm_rdrbuff_remove(ai.rdrb, idx);
-                        goto fail;
+                if (seqno < rcv_cr->lwe || seqno > rcv_cr->lwe + RQ_SIZE)
+                        goto drop_packet;
+
+                if (seqno == rcv_cr->lwe) {
+                        ++rcv_cr->lwe;
+                        /* Check for online reconfiguration. */
+                        if (pci->flags & FRCT_CFG) {
+                                assert(pci->flags & FRCT_DRF);
+                                rcv_cr->cflags = pci->cflags;
+                        }
                 } else {
-                      ++rcv_cr->lwe;
+                        frcti->rq[seqno & (RQ_SIZE - 1)] = idx;
                 }
         }
 
-        /* If the DRF is set, reset the state of the connection. */
-        if (pci->flags & FRCT_DRF) {
-                rcv_cr->lwe = seqno;
-                if (pci->flags & FRCT_CFG)
-                        rcv_cr->cflags = pci->cflags;
-        }
-
-        if (rcv_cr->drf)
-                rcv_cr->drf = false;
-
+        rcv_cr->drf = false;
         rcv_cr->act = now.tv_sec;
 
         if (!(pci->flags & FRCT_DATA))
@@ -387,10 +395,8 @@ static int __frcti_rcv(struct frcti *       frcti,
 
         return 0;
 
- fail_clean:
-        if (!(pci->flags & FRCT_DATA))
-                shm_rdrbuff_remove(ai.rdrb, idx);
- fail:
+ drop_packet:
+        shm_rdrbuff_remove(ai.rdrb, idx);
         pthread_rwlock_unlock(&frcti->lock);
         return -EAGAIN;
 }
diff --git a/src/lib/rq.c b/src/lib/rq.c
deleted file mode 100644
index a1b832e..0000000
--- a/src/lib/rq.c
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Reordering queue
- *
- *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
- *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#include "rq.h"
-
-#include <assert.h>
-
-struct pdu {
-        uint64_t seqno;
-        size_t   idx;
-};
-
-struct rq {
-        struct pdu * items;
-        int          n_items;
-        int          size;
-};
-
-struct rq * rq_create(int size)
-{
-        struct rq * rq;
-
-        rq = malloc(sizeof(*rq));
-        if (rq == NULL)
-                return NULL;
-
-        rq->items = malloc(sizeof(struct pdu) * (size + 1));
-        if (rq->items == NULL) {
-                free(rq);
-                return NULL;
-        }
-
-        rq->size = size;
-        rq->n_items = 0;
-
-        return rq;
-}
-
-void rq_destroy(struct rq * rq)
-{
-        assert(rq);
-
-        free(rq->items);
-        free(rq);
-}
-
-int rq_push(struct rq * rq,
-            uint64_t    seqno,
-            size_t      idx)
-{
-        int i;
-        int j;
-
-        assert(rq);
-
-        /* Queue is full. */
-        if (rq->n_items == rq->size)
-                return -1;
-
-        i = ++rq->n_items;
-        j = i >> 1;
-        while (i > 1 && rq->items[j].seqno > seqno) {
-                rq->items[i] = rq->items[j];
-                i = j;
-                j >>= 1;
-        }
-
-        rq->items[i].seqno = seqno;
-        rq->items[i].idx = idx;
-
-        return 0;
-}
-
-uint64_t rq_peek(struct rq * rq)
-{
-        assert(rq);
-
-        return rq->items[1].seqno;
-}
-
-bool rq_is_empty(struct rq * rq)
-{
-        assert(rq);
-
-        return (rq->n_items == 0);
-}
-
-size_t rq_pop(struct rq * rq)
-{
-        size_t idx;
-        int    i;
-        int    j;
-        int    k;
-
-        assert(rq);
-
-        idx = rq->items[1].idx;
-
-        rq->items[1] = rq->items[rq->n_items];
-        rq->n_items--;
-
-        i = 1;
-        while (true) {
-                k = i;
-                j = i << 1;
-
-                if (j <= rq->n_items && rq->items[j].seqno < 
rq->items[k].seqno)
-                        k = j;
-
-                if (j + 1 <= rq->n_items &&
-                    rq->items[j + 1].seqno < rq->items[k].seqno)
-                        k = j + 1;
-
-                if (k == i)
-                        break;
-
-                rq->items[i] = rq->items[k];
-                i = k;
-        }
-
-        rq->items[i] = rq->items[rq->n_items + 1];
-
-        return idx;
-}
-
-bool rq_has(struct rq * rq,
-            uint64_t    seqno)
-{
-        int i;
-
-        assert(rq);
-
-        for (i = 1; i <= rq->n_items; i++)
-                if (rq->items[i].seqno == seqno)
-                        return true;
-
-        return false;
-}
diff --git a/src/lib/rq.h b/src/lib/rq.h
deleted file mode 100644
index 25c193d..0000000
--- a/src/lib/rq.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Ouroboros - Copyright (C) 2016 - 2018
- *
- * Reordering queue
- *
- *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
- *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public License
- * version 2.1 as published by the Free Software Foundation.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., http://www.fsf.org/about/contact/.
- */
-
-#ifndef OUROBOROS_LIB_RQ_H
-#define OUROBOROS_LIB_RQ_H
-
-#include <stdint.h>
-#include <stdlib.h>
-#include <stdbool.h>
-
-struct rq * rq_create(int size);
-
-void        rq_destroy(struct rq * rq);
-
-int         rq_push(struct rq * rq,
-                    uint64_t    seqno,
-                    size_t      idx);
-
-uint64_t    rq_peek(struct rq * rq);
-
-bool        rq_is_empty(struct rq * rq);
-
-size_t      rq_pop(struct rq * rq);
-
-bool        rq_has(struct rq * rq,
-                   uint64_t    seqno);
-
-#endif /* OUROBOROS_LIB_RQ_H */
-- 
2.17.1


Other related posts: