[PATCH] lib: Refactor reading packet from rbuff

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sat, 26 Mar 2022 09:28:41 +0100

Reading packets from the rbuff and checking their validity (non-zero
size, pass crc check, pass decryption) is now extracted into a
function.

Also adds a function to get the length of an sdu_du_buff instead of
subtracting the tail and head pointers.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 include/ouroboros/shm_du_buff.h |   2 +
 src/ipcpd/eth/eth.c             |   4 +-
 src/ipcpd/udp/main.c            |   2 +-
 src/ipcpd/unicast/dir/dht.c     |   2 +-
 src/ipcpd/unicast/dt.c          |   6 +-
 src/ipcpd/unicast/fa.c          |   6 +-
 src/lib/crypt.c                 |   3 +-
 src/lib/dev.c                   | 133 ++++++++++++++++++--------------
 src/lib/frct.c                  |   2 +-
 src/lib/shm_flow_set.c          |  12 +--
 src/lib/shm_rdrbuff.c           |   7 ++
 src/lib/timerwheel.c            |   2 +-
 12 files changed, 102 insertions(+), 79 deletions(-)

diff --git a/include/ouroboros/shm_du_buff.h b/include/ouroboros/shm_du_buff.h
index da350055..0b83f913 100644
--- a/include/ouroboros/shm_du_buff.h
+++ b/include/ouroboros/shm_du_buff.h
@@ -34,6 +34,8 @@ uint8_t * shm_du_buff_head(struct shm_du_buff * sdb);
 
 uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb);
 
+size_t    shm_du_buff_len(struct shm_du_buff * sdb);
+
 uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
                                  size_t               size);
 
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 25f42fc8..53dc3b69 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -950,7 +950,7 @@ static void * eth_ipcp_packet_reader(void * o)
                 deid = ntohs(e_frame->eid);
                 if (deid == MGMT_EID) {
 #elif defined (BUILD_ETH_LLC)
-                if (length > 0x05FF) {/* DIX */
+                if (length > 0x05FF) { /* DIX */
 #ifndef HAVE_NETMAP
                         ipcp_sdb_release(sdb);
 #endif
@@ -1067,7 +1067,7 @@ static void * eth_ipcp_packet_writer(void * o)
                                 continue;
                         }
 
-                        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+                        len = shm_du_buff_len(sdb);
 
                         if (shm_du_buff_head_alloc(sdb, ETH_HEADER_TOT_SIZE)
                             == NULL) {
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index 7def856b..d3104163 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -541,7 +541,7 @@ static void * ipcp_udp_packet_writer(void * o)
                                 continue;
                         }
 
-                        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+                        len = shm_du_buff_len(sdb);
                         if (len > IPCP_UDP_MAX_PACKET_SIZE) {
                                 log_dbg("Packet length exceeds MTU.");
                                 ipcp_sdb_release(sdb);
diff --git a/src/ipcpd/unicast/dir/dht.c b/src/ipcpd/unicast/dir/dht.c
index a847edc0..65d7c3ec 100644
--- a/src/ipcpd/unicast/dir/dht.c
+++ b/src/ipcpd/unicast/dir/dht.c
@@ -2461,7 +2461,7 @@ static void * dht_handle_packet(void * o)
 
                 pthread_cleanup_pop(true);
 
-                i = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+                i = shm_du_buff_len(cmd->sdb);
 
                 msg = kad_msg__unpack(NULL, i, shm_du_buff_head(cmd->sdb));
 #ifndef __DHT_TEST__
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index f2013809..9c16e5d0 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -435,7 +435,7 @@ static void packet_handler(int                  fd,
         uint8_t *     head;
         size_t        len;
 
-        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+        len = shm_du_buff_len(sdb);
 
 #ifndef IPCP_FLOW_STATS
         (void)        fd;
@@ -781,7 +781,7 @@ int dt_write_packet(uint64_t             dst_addr,
         assert(sdb);
         assert(dst_addr != ipcpi.dt_addr);
 
-        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+        len = shm_du_buff_len(sdb);
 
 #ifdef IPCP_FLOW_STATS
         if (eid < PROG_RES_FDS) {
@@ -815,7 +815,7 @@ int dt_write_packet(uint64_t             dst_addr,
                 goto fail_write;
         }
 
-        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+        len = shm_du_buff_len(sdb);
 
         dt_pci.dst_addr = dst_addr;
         dt_pci.qc       = qc;
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 508f2d73..5f3dd1a7 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -341,7 +341,7 @@ static void packet_handler(int                  fd,
 
         pthread_rwlock_wrlock(&fa.flows_lock);
 
-        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+        len = shm_du_buff_len(sdb);
 
 #ifdef IPCP_FLOW_STATS
         ++flow->p_snd;
@@ -453,7 +453,7 @@ static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
 
         pthread_cleanup_pop(true);
 
-        len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+        len = shm_du_buff_len(cmd->sdb);
         if (len > MSGBUFSZ || len < sizeof(*msg)) {
                 log_warn("Invalid flow allocation message (len: %zd)\n", len);
                 free(cmd);
@@ -988,7 +988,7 @@ void  fa_np1_rcv(uint64_t             eid,
         int              fd;
         size_t           len;
 
-        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+        len = shm_du_buff_len(sdb);
 
         pthread_rwlock_wrlock(&fa.flows_lock);
 
diff --git a/src/lib/crypt.c b/src/lib/crypt.c
index e19981bc..2985fc6a 100644
--- a/src/lib/crypt.c
+++ b/src/lib/crypt.c
@@ -282,8 +282,7 @@ static int openssl_decrypt(struct flow *        f,
         int       in_sz;
         int       tmp_sz;
 
-        in = shm_du_buff_head(sdb);
-        in_sz = shm_du_buff_tail(sdb) - in;
+        in_sz = shm_du_buff_len(sdb);
         if (in_sz < IVSZ)
                 return -ECRYPT;
 
diff --git a/src/lib/dev.c b/src/lib/dev.c
index d73205e2..b6fa57aa 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -28,6 +28,10 @@
 
 #include "config.h"
 
+#define OUROBOROS_PREFIX "dev"
+
+#include <ouroboros/logs.h>
+
 #include <ouroboros/hash.h>
 #include <ouroboros/cacep.h>
 #include <ouroboros/errno.h>
@@ -93,7 +97,7 @@ struct flow {
         struct shm_rbuff *    tx_rb;
         struct shm_flow_set * set;
         int                   flow_id;
-        int                   oflags;
+        uint16_t              oflags;
         qosspec_t             qs;
         ssize_t               part_idx;
 
@@ -1214,6 +1218,52 @@ ssize_t flow_write(int          fd,
         return -ENOMEM;
 }
 
+static bool invalid_pkt(struct flow *        flow,
+                        struct shm_du_buff * sdb)
+{
+        if (shm_du_buff_len(sdb) == 0)
+                return true;
+
+        if (flow->qs.ber == 0 && chk_crc(sdb) != 0)
+                return true;
+
+        if (flow->qs.cypher_s > 0 && crypt_decrypt(flow, sdb) < 0)
+                return true;
+
+        return false;
+}
+
+static ssize_t flow_rx_sdb(struct flow *         flow,
+                           struct shm_du_buff ** sdb,
+                           bool                  block,
+                           struct timespec *     abstime)
+{
+        ssize_t         idx;
+        struct timespec now;
+
+        idx = block ? shm_rbuff_read_b(flow->rx_rb, abstime) :
+                shm_rbuff_read(flow->rx_rb);
+        if (idx < 0)
+                return idx;
+
+        *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+        if (invalid_pkt(flow, *sdb)) {
+                shm_rdrbuff_remove(ai.rdrb, idx);
+                return -EAGAIN;
+        }
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        pthread_rwlock_wrlock(&ai.lock);
+
+        flow->rcv_act = now;
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        return idx;
+}
+
 ssize_t flow_read(int    fd,
                   void * buf,
                   size_t count)
@@ -1221,7 +1271,6 @@ ssize_t flow_read(int    fd,
         ssize_t              idx;
         ssize_t              n;
         uint8_t *            packet;
-        struct shm_rbuff *   rb;
         struct shm_du_buff * sdb;
         struct timespec      abs;
         struct timespec      now;
@@ -1229,10 +1278,10 @@ ssize_t flow_read(int    fd,
         struct timespec      tictime;
         struct timespec *    abstime = NULL;
         struct flow *        flow;
-        bool                 noblock;
+        bool                 block;
         bool                 partrd;
 
-        if (fd < 0 || fd > PROG_MAX_FLOWS)
+        if (fd < 0 || fd >= PROG_MAX_FLOWS)
                 return -EBADF;
 
         flow = &ai.flows[fd];
@@ -1241,24 +1290,23 @@ ssize_t flow_read(int    fd,
 
         pthread_rwlock_rdlock(&ai.lock);
 
+        if (flow->flow_id < 0) {
+                pthread_rwlock_unlock(&ai.lock);
+                return -ENOTALLOC;
+        }
+
         if (flow->part_idx == DONE_PART) {
                 pthread_rwlock_unlock(&ai.lock);
                 flow->part_idx = NO_PART;
                 return 0;
         }
 
-        if (flow->flow_id < 0) {
-                pthread_rwlock_unlock(&ai.lock);
-                return -ENOTALLOC;
-        }
-
-        rb   = flow->rx_rb;
-        noblock = flow->oflags & FLOWFRNOBLOCK;
+        block  = !(flow->oflags & FLOWFRNOBLOCK);
         partrd = !(flow->oflags & FLOWFRNOPART);
 
         ts_add(&now, &tic, &tictime);
 
-        if (ai.flows[fd].rcv_timesout) {
+        if (flow->rcv_timesout) {
                 ts_add(&now, &flow->rcv_timeo, &abs);
                 abstime = &abs;
         }
@@ -1268,8 +1316,7 @@ ssize_t flow_read(int    fd,
                 while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
                         pthread_rwlock_unlock(&ai.lock);
 
-                        idx = noblock ? shm_rbuff_read(rb) :
-                                shm_rbuff_read_b(rb, &tictime);
+                        idx = flow_rx_sdb(flow, &sdb, block, &tictime);
                         if (idx < 0) {
                                 frcti_tick(flow->frcti);
 
@@ -1285,45 +1332,31 @@ ssize_t flow_read(int    fd,
 
                                 ts_add(&tictime, &tic, &tictime);
 
-                                pthread_rwlock_wrlock(&ai.lock);
+                                pthread_rwlock_rdlock(&ai.lock);
                                 continue;
                         }
 
-                        sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
-                        pthread_rwlock_wrlock(&ai.lock);
-
-                        flow->rcv_act = tictime;
-
-                        if ((flow->qs.ber == 0 && chk_crc(sdb) != 0) ||
-                            shm_du_buff_head(sdb) == shm_du_buff_tail(sdb)) {
-                                shm_rdrbuff_remove(ai.rdrb, idx);
-                                idx = -EAGAIN;
-                                continue;
-                        }
-
-                        if (flow->qs.cypher_s > 0
-                            && crypt_decrypt(flow, sdb) < 0) {
-                                pthread_rwlock_unlock(&ai.lock);
-                                shm_rdrbuff_remove(ai.rdrb, idx);
-                                return -ENOMEM;
-                        }
+                        pthread_rwlock_rdlock(&ai.lock);
 
                         frcti_rcv(flow->frcti, sdb);
                 }
         }
 
+        sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
         frcti_tick(flow->frcti);
 
         pthread_rwlock_unlock(&ai.lock);
 
-        n = shm_rdrbuff_read(&packet, ai.rdrb, idx);
+        packet = shm_du_buff_head(sdb);
+
+        n = shm_du_buff_len(sdb);
 
         assert(n >= 0);
 
         if (n <= (ssize_t) count) {
                 memcpy(buf, packet, n);
-                shm_rdrbuff_remove(ai.rdrb, idx);
+                ipcp_sdb_release(sdb);
 
                 pthread_rwlock_wrlock(&ai.lock);
 
@@ -1337,7 +1370,6 @@ ssize_t flow_read(int    fd,
         } else {
                 if (partrd) {
                         memcpy(buf, packet, count);
-                        sdb = shm_rdrbuff_get(ai.rdrb, idx);
                         shm_du_buff_head_release(sdb, n);
                         pthread_rwlock_wrlock(&ai.lock);
                         flow->part_idx = idx;
@@ -1347,7 +1379,7 @@ ssize_t flow_read(int    fd,
                         pthread_rwlock_unlock(&ai.lock);
                         return count;
                 } else {
-                        shm_rdrbuff_remove(ai.rdrb, idx);
+                        ipcp_sdb_release(sdb);
                         return -EMSGSIZE;
                 }
         }
@@ -1870,10 +1902,8 @@ int ipcp_flow_alloc_reply(int          fd,
 int ipcp_flow_read(int                   fd,
                    struct shm_du_buff ** sdb)
 {
-        struct timespec    now;
-        struct flow *      flow;
-        struct shm_rbuff * rb;
-        ssize_t            idx = -1;
+        struct flow *   flow;
+        ssize_t         idx = -1;
 
         assert(fd >= 0 && fd < SYS_MAX_FLOWS);
         assert(sdb);
@@ -1884,27 +1914,14 @@ int ipcp_flow_read(int                   fd,
 
         assert(flow->flow_id >= 0);
 
-        rb = flow->rx_rb;
-
         while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
                 pthread_rwlock_unlock(&ai.lock);
 
-                idx = shm_rbuff_read(rb);
+                idx = flow_rx_sdb(flow, sdb, false, NULL);
                 if (idx < 0)
                         return idx;
 
-                clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-                pthread_rwlock_wrlock(&ai.lock);
-
-                *sdb = shm_rdrbuff_get(ai.rdrb, idx);
-                if ((flow->qs.ber == 0 && chk_crc(*sdb) != 0) ||
-                    (shm_du_buff_head(*sdb) == shm_du_buff_tail(*sdb))) {
-                        shm_rdrbuff_remove(ai.rdrb, idx);
-                        continue;
-                }
-
-                flow->rcv_act = now;
+                pthread_rwlock_rdlock(&ai.lock);
 
                 frcti_rcv(flow->frcti, *sdb);
         }
@@ -1913,8 +1930,6 @@ int ipcp_flow_read(int                   fd,
 
         pthread_rwlock_unlock(&ai.lock);
 
-        *sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
         return 0;
 }
 
diff --git a/src/lib/frct.c b/src/lib/frct.c
index 99962868..a93a1006 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -681,7 +681,7 @@ static int __frcti_snd(struct frcti *       frcti,
         bool              rtx;
 
         assert(frcti);
-        assert(shm_du_buff_head(sdb) != shm_du_buff_tail(sdb));
+        assert(shm_du_buff_len(sdb) != 0);
 
         snd_cr = &frcti->snd_cr;
         rcv_cr = &frcti->rcv_cr;
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index 5a9bee6c..d325a253 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -96,10 +96,8 @@ static struct shm_flow_set * flow_set_create(pid_t pid,
         if (shm_fd == -1)
                 goto fail_shm_open;
 
-        if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0) {
-                close(shm_fd);
-                goto fail_shm_open;
-        }
+        if (ftruncate(shm_fd, SHM_FLOW_SET_FILE_SIZE - 1) < 0)
+                goto fail_truncate;
 
         shm_base = mmap(NULL,
                         SHM_FLOW_SET_FILE_SIZE,
@@ -108,11 +106,11 @@ static struct shm_flow_set * flow_set_create(pid_t pid,
                         shm_fd,
                         0);
 
-        close(shm_fd);
-
         if (shm_base == MAP_FAILED)
                 goto fail_mmap;
 
+        close(shm_fd);
+
         set->mtable  = shm_base;
         set->heads   = (size_t *) (set->mtable + SYS_MAX_FLOWS);
         set->conds   = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
@@ -125,6 +123,8 @@ static struct shm_flow_set * flow_set_create(pid_t pid,
  fail_mmap:
         if (flags & O_CREAT)
                 shm_unlink(fn);
+ fail_truncate:
+        close(shm_fd);
  fail_shm_open:
         free(set);
  fail_malloc:
diff --git a/src/lib/shm_rdrbuff.c b/src/lib/shm_rdrbuff.c
index dfa45af6..e283388f 100644
--- a/src/lib/shm_rdrbuff.c
+++ b/src/lib/shm_rdrbuff.c
@@ -532,6 +532,13 @@ uint8_t * shm_du_buff_tail(struct shm_du_buff * sdb)
         return (uint8_t *) (sdb + 1) + sdb->du_tail;
 }
 
+size_t shm_du_buff_len(struct shm_du_buff * sdb)
+{
+        assert(sdb);
+
+        return sdb->du_tail - sdb->du_head;
+}
+
 uint8_t * shm_du_buff_head_alloc(struct shm_du_buff * sdb,
                                  size_t               size)
 {
diff --git a/src/lib/timerwheel.c b/src/lib/timerwheel.c
index 580f838d..c3be08e0 100644
--- a/src/lib/timerwheel.c
+++ b/src/lib/timerwheel.c
@@ -337,7 +337,7 @@ static int timerwheel_rxm(struct frcti *       frcti,
         r->mul   = 0;
         r->seqno = seqno;
         r->frcti = frcti;
-        r->len  = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+        r->len  = shm_du_buff_len(sdb);
 #ifdef RXM_BUFFER_ON_HEAP
         r->pkt = malloc(r->len);
         if (r->pkt == NULL) {
-- 
2.35.1


Other related posts: