[PATCH] lib: Add np1_flow_read and np1_flow_write calls

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sun, 27 Mar 2022 11:09:43 +0200

Reading/writing to (N + 1)-flows from the IPCP was using a raw QoS flow
to bypass some functions in the ipcp_flow_read call. But this call was
broken for keepalive packets.  Fixing the ipcp_flow_read call for
(N - 1) flows causes the IPCPs to drop 0-byte keepalive packets coming from
(N + 1) client flows.

From now on, there is a dedicated call for (N + 1) reads/writes from
the IPCPs that's more efficient and cleaner. The (N + 1) flow internal
QoS is now also defaulted to a qos_np1 qosspec, instead of tampering
with the qosspec requested by the (N + 1) client.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 include/ouroboros/ipcp-dev.h |  6 +++
 include/ouroboros/np1_flow.h | 19 ++++++--
 src/ipcpd/eth/eth.c          | 11 +++--
 src/ipcpd/ipcp.c             | 10 ++--
 src/ipcpd/udp/main.c         | 20 ++++++--
 src/ipcpd/unicast/dt.c       |  2 +-
 src/ipcpd/unicast/fa.c       |  2 +-
 src/ipcpd/unicast/psched.c   |  7 ++-
 src/ipcpd/unicast/psched.h   |  6 ++-
 src/lib/dev.c                | 91 +++++++++++++++++++++++++++++++-----
 10 files changed, 140 insertions(+), 34 deletions(-)

diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index 6472c9fe..307cf3a2 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -47,6 +47,12 @@ int    ipcp_flow_read(int                   fd,
 int    ipcp_flow_write(int                  fd,
                        struct shm_du_buff * sdb);
 
+int    np1_flow_read(int                   fd,
+                     struct shm_du_buff ** sdb);
+
+int    np1_flow_write(int                  fd,
+                      struct shm_du_buff * sdb);
+
 int    ipcp_flow_fini(int fd);
 
 int    ipcp_flow_get_qoscube(int         fd,
diff --git a/include/ouroboros/np1_flow.h b/include/ouroboros/np1_flow.h
index b764de91..fdef443b 100644
--- a/include/ouroboros/np1_flow.h
+++ b/include/ouroboros/np1_flow.h
@@ -27,13 +27,24 @@
 
 #include <unistd.h>
 
-int  np1_flow_alloc(pid_t     n_pid,
-                    int       flow_id,
-                    qosspec_t qs);
+int  np1_flow_alloc(pid_t n_pid,
+                    int   flow_id);
 
 int  np1_flow_resp(int flow_id);
 
-int  np1_flow_dealloc(int flow_id,
+int  np1_flow_dealloc(int    flow_id,
                       time_t timeo);
 
+static const qosspec_t qos_np1 = {
+        .delay        = UINT32_MAX,
+        .bandwidth    = 0,
+        .availability = 0,
+        .loss         = UINT32_MAX,
+        .ber          = UINT32_MAX,
+        .in_order     = 0,
+        .max_gap      = UINT32_MAX,
+        .cypher_s     = 0,
+        .timeout      = 0
+};
+
 #endif /* OUROBOROS_NP1_FLOW_H */
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index 53dc3b69..b7b3a41d 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -1016,10 +1016,15 @@ static void * eth_ipcp_packet_reader(void * o)
 #ifndef HAVE_NETMAP
                         shm_du_buff_head_release(sdb, ETH_HEADER_TOT_SIZE);
                         shm_du_buff_truncate(sdb, length);
-                        ipcp_flow_write(fd, sdb);
 #else
-                        flow_write(fd, &e_frame->payload, length);
+                        if (ipcp_sdb_reserve(&sdb, length))
+                                continue;
+
+                        buf = shm_du_buff_head(sdb);
+                        memcpy(buf, &e_frame->payload, length);
 #endif
+                        if (np1_flow_write(fd, sdb) < 0)
+                                ipcp_sdb_release(sdb);
                 }
         }
 
@@ -1062,7 +1067,7 @@ static void * eth_ipcp_packet_writer(void * o)
                         if (fqueue_type(fq) != FLOW_PKT)
                                 continue;
 
-                        if (ipcp_flow_read(fd, &sdb)) {
+                        if (np1_flow_read(fd, &sdb)) {
                                 log_dbg("Bad read from fd %d.", fd);
                                 continue;
                         }
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index d19d8e43..2426fbab 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -514,10 +514,8 @@ static void * mainloop(void * o)
                                 break;
                         }
 
-                        qs = msg_to_spec(msg->qosspec);
                         fd = np1_flow_alloc(msg->pid,
-                                            msg->flow_id,
-                                            qs);
+                                            msg->flow_id);
                         if (fd < 0) {
                                 log_err("Failed allocating fd on flow_id %d.",
                                         msg->flow_id);
@@ -525,6 +523,7 @@ static void * mainloop(void * o)
                                 break;
                         }
 
+                        qs = msg_to_spec(msg->qosspec);
                         ret_msg.result =
                                 ipcpi.ops->ipcp_flow_alloc(fd,
                                                            msg->hash.data,
@@ -549,10 +548,8 @@ static void * mainloop(void * o)
                                 break;
                         }
 
-                        qs = msg_to_spec(msg->qosspec);
                         fd = np1_flow_alloc(msg->pid,
-                                            msg->flow_id,
-                                            qs);
+                                            msg->flow_id);
                         if (fd < 0) {
                                 log_err("Failed allocating fd on flow_id %d.",
                                         msg->flow_id);
@@ -560,6 +557,7 @@ static void * mainloop(void * o)
                                 break;
                         }
 
+                        qs = msg_to_spec(msg->qosspec);
                         ret_msg.result =
                                 ipcpi.ops->ipcp_flow_join(fd,
                                                           msg->hash.data,
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index d3104163..6e32638d 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -450,9 +450,11 @@ static void * ipcp_udp_packet_reader(void * o)
         eid_p = (uint32_t *) buf;
 
         while (true) {
-                struct mgmt_frame * frame;
-                struct sockaddr_in  r_saddr;
-                socklen_t           len;
+                struct mgmt_frame *  frame;
+                struct sockaddr_in   r_saddr;
+                socklen_t            len;
+                struct shm_du_buff * sdb;
+                uint8_t *            head;
 
                 len = sizeof(r_saddr);
 
@@ -493,7 +495,15 @@ static void * ipcp_udp_packet_reader(void * o)
                         continue;
                 }
 
-                flow_write(eid, data, n - sizeof(eid));
+                n-= sizeof(eid);
+
+                if (ipcp_sdb_reserve(&sdb, n))
+                        continue;
+
+                head = shm_du_buff_head(sdb);
+                memcpy(head, data, n);
+                if (np1_flow_write(eid, sdb) < 0)
+                        ipcp_sdb_release(sdb);
         }
 
         return 0;
@@ -536,7 +546,7 @@ static void * ipcp_udp_packet_writer(void * o)
                         if (fqueue_type(fq) != FLOW_PKT)
                                 continue;
 
-                        if (ipcp_flow_read(fd, &sdb)) {
+                        if (np1_flow_read(fd, &sdb)) {
                                 log_dbg("Bad read from fd %d.", fd);
                                 continue;
                         }
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 9c16e5d0..9cc53edc 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -713,7 +713,7 @@ void dt_fini(void)
 
 int dt_start(void)
 {
-        dt.psched = psched_create(packet_handler);
+        dt.psched = psched_create(packet_handler, ipcp_flow_read);
         if (dt.psched == NULL) {
                 log_err("Failed to create N-1 packet scheduler.");
                 return -1;
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index 5f3dd1a7..345d4031 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -756,7 +756,7 @@ int fa_start(void)
         int                 pol;
         int                 max;
 
-        fa.psched = psched_create(packet_handler);
+        fa.psched = psched_create(packet_handler, np1_flow_read);
         if (fa.psched == NULL) {
                 log_err("Failed to start packet scheduler.");
                 goto fail_psched;
diff --git a/src/ipcpd/unicast/psched.c b/src/ipcpd/unicast/psched.c
index 33ac5afe..bb452726 100644
--- a/src/ipcpd/unicast/psched.c
+++ b/src/ipcpd/unicast/psched.c
@@ -50,6 +50,7 @@ static int qos_prio [] = {
 struct psched {
         fset_t *         set[QOS_CUBE_MAX];
         next_packet_fn_t callback;
+        read_fn_t        read;
         pthread_t        readers[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
 };
 
@@ -101,7 +102,7 @@ static void * packet_reader(void * o)
                                 notifier_event(NOTIFY_DT_FLOW_UP, &fd);
                                 break;
                         case FLOW_PKT:
-                                if (ipcp_flow_read(fd, &sdb))
+                                if (sched->read(fd, &sdb) < 0)
                                         continue;
 
                                 sched->callback(fd, qc, sdb);
@@ -117,7 +118,8 @@ static void * packet_reader(void * o)
         return (void *) 0;
 }
 
-struct psched * psched_create(next_packet_fn_t callback)
+struct psched * psched_create(next_packet_fn_t callback,
+                              read_fn_t        read)
 {
         struct psched *       psched;
         struct sched_info *   infos[QOS_CUBE_MAX * IPCP_SCHED_THR_MUL];
@@ -131,6 +133,7 @@ struct psched * psched_create(next_packet_fn_t callback)
                 goto fail_malloc;
 
         psched->callback = callback;
+        psched->read     = read;
 
         for (i = 0; i < QOS_CUBE_MAX; ++i) {
                 psched->set[i] = fset_create();
diff --git a/src/ipcpd/unicast/psched.h b/src/ipcpd/unicast/psched.h
index 1f22b34b..654d73d9 100644
--- a/src/ipcpd/unicast/psched.h
+++ b/src/ipcpd/unicast/psched.h
@@ -30,7 +30,11 @@ typedef void (* next_packet_fn_t)(int                  fd,
                                   qoscube_t            qc,
                                   struct shm_du_buff * sdb);
 
-struct psched * psched_create(next_packet_fn_t callback);
+typedef int (* read_fn_t)(int                   fd,
+                          struct shm_du_buff ** sdb);
+
+struct psched * psched_create(next_packet_fn_t callback,
+                              read_fn_t        read);
 
 void            psched_destroy(struct psched * psched);
 
diff --git a/src/lib/dev.c b/src/lib/dev.c
index b9f098dc..845999a0 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -38,6 +38,7 @@
 #include <ouroboros/sockets.h>
 #include <ouroboros/fccntl.h>
 #include <ouroboros/bitmap.h>
+#include <ouroboros/np1_flow.h>
 #include <ouroboros/pthread.h>
 #include <ouroboros/random.h>
 #include <ouroboros/shm_flow_set.h>
@@ -1328,7 +1329,7 @@ ssize_t flow_read(int    fd,
 
                         idx = flow_rx_sdb(flow, &sdb, block, &tictime);
                         if (idx < 0) {
-                                if (idx != -ETIMEDOUT)
+                                if (idx != -ETIMEDOUT && idx != -EAGAIN)
                                         return idx;
 
                                 if (abstime != NULL
@@ -1738,12 +1739,9 @@ ssize_t fevent(struct flow_set *       set,
 /* ipcp-dev functions. */
 
 int np1_flow_alloc(pid_t     n_pid,
-                   int       flow_id,
-                   qosspec_t qs)
+                   int       flow_id)
 {
-        qs.cypher_s = 0; /* No encryption ctx for np1 */
-        qs.in_order = 0; /* No frct for np1           */
-        return flow_init(flow_id, n_pid, qs, NULL, 0);
+        return flow_init(flow_id, n_pid, qos_np1, NULL, 0);
 }
 
 int np1_flow_dealloc(int    flow_id,
@@ -1853,9 +1851,7 @@ int ipcp_flow_req_arr(const uint8_t * dst,
                 return -1;
         }
 
-        qs.cypher_s = 0; /* No encryption ctx for np1 */
-        qs.in_order = 0; /* No frct for np1           */
-        fd = flow_init(recv_msg->flow_id, recv_msg->pid, qs, NULL, 0);
+        fd = flow_init(recv_msg->flow_id, recv_msg->pid, qos_np1, NULL, 0);
 
         irm_msg__free_unpacked(recv_msg, NULL);
 
@@ -1926,8 +1922,14 @@ int ipcp_flow_read(int                   fd,
                 pthread_rwlock_unlock(&ai.lock);
 
                 idx = flow_rx_sdb(flow, sdb, false, NULL);
-                if (idx < 0)
+                if (idx < 0) {
+                        if (idx == -EAGAIN) {
+                                pthread_rwlock_rdlock(&ai.lock);
+                                continue;
+                        }
+
                         return idx;
+                }
 
                 pthread_rwlock_rdlock(&ai.lock);
 
@@ -1962,7 +1964,74 @@ int ipcp_flow_write(int                  fd,
                 return -EPERM;
         }
 
-        ret = flow_tx_sdb(flow, sdb, false, NULL);
+        pthread_rwlock_unlock(&ai.lock);
+
+        ret = flow_tx_sdb(flow, sdb, true, NULL);
+
+        return ret;
+}
+
+int np1_flow_read(int                   fd,
+                  struct shm_du_buff ** sdb)
+{
+        struct flow *    flow;
+        ssize_t          idx = -1;
+
+        assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+        assert(sdb);
+
+        flow = &ai.flows[fd];
+
+        assert(flow->flow_id >= 0);
+
+        pthread_rwlock_rdlock(&ai.lock);
+
+        idx = shm_rbuff_read(flow->rx_rb);;
+        if (idx < 0) {
+                pthread_rwlock_unlock(&ai.lock);
+                return idx;
+        }
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        *sdb = shm_rdrbuff_get(ai.rdrb, idx);
+
+        return 0;
+}
+
+int np1_flow_write(int                  fd,
+                   struct shm_du_buff * sdb)
+{
+        struct flow * flow;
+        int           ret;
+        ssize_t       idx;
+
+        assert(fd >= 0 && fd < SYS_MAX_FLOWS);
+        assert(sdb);
+
+        flow = &ai.flows[fd];
+
+        pthread_rwlock_rdlock(&ai.lock);
+
+        if (flow->flow_id < 0) {
+                pthread_rwlock_unlock(&ai.lock);
+                return -ENOTALLOC;
+        }
+
+        if ((flow->oflags & FLOWFACCMODE) == FLOWFRDONLY) {
+                pthread_rwlock_unlock(&ai.lock);
+                return -EPERM;
+        }
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        idx = shm_du_buff_get_idx(sdb);
+
+        ret = shm_rbuff_write_b(flow->tx_rb, idx, NULL);
+        if (ret < 0)
+                shm_rdrbuff_remove(ai.rdrb, idx);
+        else
+                shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
 
         return ret;
 }
-- 
2.35.1


Other related posts:

  • » [PATCH] lib: Add np1_flow_read and np1_flow_write calls - Dimitri Staessens