[PATCH] lib: Make flow liveness timeout configurable

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Fri, 25 Feb 2022 17:34:29 +0100

The qosspec_t now has a timeout value that sets the timeout value of
the flow. Flows with a peer that has timed out will now return
-EFLOWPEER on flow_read() or flow_write().

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 doc/man/flow_read.3        |  3 ++
 include/ouroboros/errno.h  |  5 +--
 include/ouroboros/fqueue.h |  3 +-
 include/ouroboros/qos.h    | 52 +++++++++++++++++++-----------
 src/ipcpd/eth/eth.c        |  3 ++
 src/ipcpd/udp/main.c       |  4 +++
 src/ipcpd/unicast/fa.c     |  7 ++--
 src/irmd/main.c            |  2 +-
 src/lib/dev.c              | 66 ++++++++++++++++++++++++++------------
 src/lib/qosspec.proto      | 17 +++++-----
 src/lib/sockets.c          |  2 ++
 11 files changed, 111 insertions(+), 53 deletions(-)

diff --git a/doc/man/flow_read.3 b/doc/man/flow_read.3
index 99f96544..e41ee374 100644
--- a/doc/man/flow_read.3
+++ b/doc/man/flow_read.3
@@ -58,6 +58,9 @@ The flow was not allocated.
 .B -EFLOWDOWN
 The flow has been reported down.
 
+.B -EFLOWPEER
+The flow's peer is unresponsive (flow timed out).
+
 .B -EMSGSIZE
 The buffer was too large to be written.
 
diff --git a/include/ouroboros/errno.h b/include/ouroboros/errno.h
index 06f33bef..b9f8dbc0 100644
--- a/include/ouroboros/errno.h
+++ b/include/ouroboros/errno.h
@@ -31,7 +31,8 @@
 #define EIPCP        1003 /* Failed to communicate with IPCP */
 #define EIPCPSTATE   1004 /* Target in wrong state           */
 #define EFLOWDOWN    1005 /* Flow is down                    */
-#define ECRYPT       1006 /* Encryption error                */
-#define ENAME        1007 /* Naming error                    */
+#define EFLOWPEER    1006 /* Flow is down (peer timed out)   */
+#define ECRYPT       1007 /* Encryption error                */
+#define ENAME        1008 /* Naming error                    */
 
 #endif /* OUROBOROS_ERRNO_H */
diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h
index f6828a4d..3c0ebf90 100644
--- a/include/ouroboros/fqueue.h
+++ b/include/ouroboros/fqueue.h
@@ -33,7 +33,8 @@ enum fqtype {
         FLOW_DOWN    = (1 << 1),
         FLOW_UP      = (1 << 2),
         FLOW_ALLOC   = (1 << 3),
-        FLOW_DEALLOC = (1 << 4)
+        FLOW_DEALLOC = (1 << 4),
+        FLOW_PEER    = (1 << 5)
 };
 
 struct flow_set;
diff --git a/include/ouroboros/qos.h b/include/ouroboros/qos.h
index 6391347a..b6b945d9 100644
--- a/include/ouroboros/qos.h
+++ b/include/ouroboros/qos.h
@@ -26,15 +26,18 @@
 #include <stdint.h>
 #include <stdbool.h>
 
+#define DEFAULT_PEER_TIMEOUT 120000
+
 typedef struct qos_spec {
-        uint32_t delay;         /* In ms */
-        uint64_t bandwidth;     /* In bits/s */
-        uint8_t  availability;  /* Class of 9s */
-        uint32_t loss;          /* Packet loss */
-        uint32_t ber;           /* Bit error rate, errors per billion bits */
-        uint8_t  in_order;      /* In-order delivery, enables FRCT */
-        uint32_t max_gap;       /* In ms */
-        uint16_t cypher_s;      /* Cypher strength, 0 = no encryption */
+        uint32_t delay;         /* In ms.                                     
*/
+        uint64_t bandwidth;     /* In bits/s.                                 
*/
+        uint8_t  availability;  /* Class of 9s.                               
*/
+        uint32_t loss;          /* Packet loss.                               
*/
+        uint32_t ber;           /* Bit error rate, errors per billion bits.   
*/
+        uint8_t  in_order;      /* In-order delivery, enables FRCT.           
*/
+        uint32_t max_gap;       /* In ms.                                     
*/
+        uint16_t cypher_s;      /* Cypher strength (bits), 0 = no encryption. 
*/
+        uint32_t timeout;       /* Peer timeout time, in ms, 0 = no timeout.  
*/
 } qosspec_t;
 
 static const qosspec_t qos_raw = {
@@ -45,7 +48,8 @@ static const qosspec_t qos_raw = {
         .ber          = 1,
         .in_order     = 0,
         .max_gap      = UINT32_MAX,
-        .cypher_s     = 0
+        .cypher_s     = 0,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_raw_no_errors = {
@@ -56,7 +60,8 @@ static const qosspec_t qos_raw_no_errors = {
         .ber          = 0,
         .in_order     = 0,
         .max_gap      = UINT32_MAX,
-        .cypher_s     = 0
+        .cypher_s     = 0,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_raw_crypt = {
@@ -67,7 +72,8 @@ static const qosspec_t qos_raw_crypt = {
         .ber          = 0,
         .in_order     = 0,
         .max_gap      = UINT32_MAX,
-        .cypher_s     = 256
+        .cypher_s     = 256,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_best_effort = {
@@ -78,7 +84,8 @@ static const qosspec_t qos_best_effort = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = UINT32_MAX,
-        .cypher_s     = 0
+        .cypher_s     = 0,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_best_effort_crypt = {
@@ -89,7 +96,8 @@ static const qosspec_t qos_best_effort_crypt = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = UINT32_MAX,
-        .cypher_s     = 256
+        .cypher_s     = 256,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_video   = {
@@ -100,7 +108,8 @@ static const qosspec_t qos_video   = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = 100,
-        .cypher_s     = 0
+        .cypher_s     = 0,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_video_crypt   = {
@@ -111,7 +120,8 @@ static const qosspec_t qos_video_crypt   = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = 100,
-        .cypher_s     = 256
+        .cypher_s     = 256,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_voice = {
@@ -122,7 +132,8 @@ static const qosspec_t qos_voice = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = 50,
-        .cypher_s     = 0
+        .cypher_s     = 0,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_voice_crypt = {
@@ -133,7 +144,8 @@ static const qosspec_t qos_voice_crypt = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = 50,
-        .cypher_s     = 256
+        .cypher_s     = 256,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_data = {
@@ -144,7 +156,8 @@ static const qosspec_t qos_data = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = 2000,
-        .cypher_s     = 0
+        .cypher_s     = 0,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 static const qosspec_t qos_data_crypt = {
@@ -155,7 +168,8 @@ static const qosspec_t qos_data_crypt = {
         .ber          = 0,
         .in_order     = 1,
         .max_gap      = 2000,
-        .cypher_s     = 256
+        .cypher_s     = 256,
+        .timeout      = DEFAULT_PEER_TIMEOUT
 };
 
 #endif /* OUROBOROS_QOS_H */
diff --git a/src/ipcpd/eth/eth.c b/src/ipcpd/eth/eth.c
index e22dd7bc..8b34d303 100644
--- a/src/ipcpd/eth/eth.c
+++ b/src/ipcpd/eth/eth.c
@@ -164,6 +164,7 @@ struct mgmt_msg {
         uint32_t ber;
         uint32_t max_gap;
         uint32_t delay;
+        uint32_t timeout;
         uint16_t cypher_s;
         uint8_t  in_order;
 #if defined (BUILD_ETH_DIX)
@@ -492,6 +493,7 @@ static int eth_ipcp_alloc(const uint8_t * dst_addr,
         msg->in_order     = qs.in_order;
         msg->max_gap      = hton32(qs.max_gap);
         msg->cypher_s     = hton16(qs.cypher_s);
+        msg->timeout      = hton32(qs.timeout);
 
         memcpy(msg + 1, hash, ipcp_dir_hash_len());
         memcpy(buf + len + ETH_HEADER_TOT_SIZE, data, dlen);
@@ -753,6 +755,7 @@ static int eth_ipcp_mgmt_frame(const uint8_t * buf,
                 qs.in_order = msg->in_order;
                 qs.max_gap = ntoh32(msg->max_gap);
                 qs.cypher_s = ntoh16(msg->cypher_s);
+                qs.timeout = ntoh32(msg->timeout);
 
                 if (shim_data_reg_has(eth_data.shim_data,
                                       buf + sizeof(*msg))) {
diff --git a/src/ipcpd/udp/main.c b/src/ipcpd/udp/main.c
index b9f97e74..5c57e6b8 100644
--- a/src/ipcpd/udp/main.c
+++ b/src/ipcpd/udp/main.c
@@ -98,7 +98,9 @@ struct mgmt_msg {
         uint32_t loss;
         uint32_t ber;
         uint32_t max_gap;
+        uint32_t timeout;
         uint16_t cypher_s;
+
 } __attribute__((packed));
 
 struct mgmt_frame {
@@ -217,6 +219,7 @@ static int ipcp_udp_port_alloc(const struct sockaddr_in * 
r_saddr,
         msg->in_order     = qs.in_order;
         msg->max_gap      = hton32(qs.max_gap);
         msg->cypher_s     = hton16(qs.cypher_s);
+        msg->timeout      = hton32(qs.timeout);
 
         memcpy(msg + 1, dst, ipcp_dir_hash_len());
         memcpy(buf + len, data, dlen);
@@ -375,6 +378,7 @@ static int ipcp_udp_mgmt_frame(const uint8_t *    buf,
                 qs.in_order     = msg->in_order;
                 qs.max_gap      = ntoh32(msg->max_gap);
                 qs.cypher_s     = ntoh16(msg->cypher_s);
+                qs.timeout      = ntoh32(msg->timeout);
 
                 return ipcp_udp_port_req(&c_saddr, ntoh32(msg->s_eid),
                                          (uint8_t *) (msg + 1), qs,
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index dcc79031..d59b9760 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -72,14 +72,15 @@ struct fa_msg {
         int8_t   response;
         uint16_t ece;
         /* QoS parameters from spec, aligned */
-        uint8_t  availability;
-        uint8_t  in_order;
         uint32_t delay;
         uint64_t bandwidth;
         uint32_t loss;
         uint32_t ber;
         uint32_t max_gap;
+        uint32_t timeout;
         uint16_t cypher_s;
+        uint8_t  availability;
+        uint8_t  in_order;
 } __attribute__((packed));
 
 struct cmd {
@@ -569,6 +570,7 @@ static int fa_handle_flow_req(struct fa_msg * msg,
         qs.in_order     = msg->in_order;
         qs.max_gap      = ntoh32(msg->max_gap);
         qs.cypher_s     = ntoh16(msg->cypher_s);
+        qs.timeout      = ntoh32(msg->timeout);
 
         fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
         if (fd < 0)
@@ -840,6 +842,7 @@ int fa_alloc(int             fd,
         msg->in_order     = qs.in_order;
         msg->max_gap      = hton32(qs.max_gap);
         msg->cypher_s     = hton16(qs.cypher_s);
+        msg->timeout      = hton32(qs.timeout);
 
         memcpy(msg + 1, dst, ipcp_dir_hash_len());
         memcpy(shm_du_buff_head(sdb) + len, data, dlen);
diff --git a/src/irmd/main.c b/src/irmd/main.c
index f83e8e1e..a3acc78a 100644
--- a/src/irmd/main.c
+++ b/src/irmd/main.c
@@ -648,7 +648,7 @@ static int connect_ipcp(pid_t        pid,
         log_dbg("Connecting %s to %s.", component, dst);
 
         if (ipcp_connect(pid, dst, component, qs)) {
-                log_err("Could not connect IPCP.");
+                log_err("Could not connect IPCP %d to %s.", pid, dst);
                 return -EPERM;
         }
 
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 4c21fcdf..5c57a538 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -68,7 +68,6 @@
 #define SECMEMSZ  16384
 #define SYMMKEYSZ 32
 #define MSGBUFSZ  2048
-#define FLOWTIMEO 120 /* seconds */
 
 enum port_state {
         PORT_NULL = 0,
@@ -123,7 +122,8 @@ struct flow_set_entry {
 struct flow_set {
         size_t idx;
 
-        struct timespec  chk;   /* Last keepalive check */
+        struct timespec  chk;   /* Last keepalive check.          */
+        uint32_t         min;   /* Minimum keepalive time in set. */
 
         struct list_head flows;
         pthread_rwlock_t lock;
@@ -1056,6 +1056,7 @@ static int flow_keepalive(int fd)
         struct timespec    r_act;
         struct flow *      flow;
         int                flow_id;
+        uint32_t           timeo;
 
         flow = &ai.flows[fd];
 
@@ -1067,15 +1068,19 @@ static int flow_keepalive(int fd)
         r_act = flow->rcv_act;
 
         flow_id = flow->flow_id;
+        timeo   = flow->qs.timeout;
 
         pthread_rwlock_unlock(&ai.lock);
 
-        if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) {
-                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT);
-                return -EFLOWDOWN;
+        if (timeo == 0)
+                return 0;
+
+        if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+                return -EFLOWPEER;
         }
 
-        if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION)
+        if (ts_diff_ns(&s_act, &now) > (timeo >> 2) * MILLION)
                 flow_send_keepalive(fd);
 
         return 0;
@@ -1140,7 +1145,7 @@ ssize_t flow_write(int          fd,
                                 return -ETIMEDOUT;
 
                         if (flow_keepalive(fd))
-                                return -EFLOWDOWN;
+                                return -EFLOWPEER;
 
                         frcti_tick(flow->frcti);
 
@@ -1165,14 +1170,16 @@ ssize_t flow_write(int          fd,
 
         pthread_rwlock_rdlock(&ai.lock);
 
-        if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)
-                goto enomem;
+        if (count > 0) {
+                if (frcti_snd(flow->frcti, sdb) < 0)
+                        goto enomem;
 
-        if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
-                goto enomem;
+                if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
+                        goto enomem;
 
-        if (flow->qs.ber == 0 && add_crc(sdb) != 0)
-                goto enomem;
+                if (flow->qs.ber == 0 && add_crc(sdb) != 0)
+                        goto enomem;
+        }
 
         pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
 
@@ -1263,7 +1270,7 @@ ssize_t flow_read(int    fd,
                                         return -ETIMEDOUT;
 
                                 if (flow_keepalive(fd) < 0)
-                                        return -EFLOWDOWN;
+                                        return -EFLOWPEER;
 
                                 ts_add(&tictime, &tic, &tictime);
 
@@ -1360,6 +1367,7 @@ struct flow_set * fset_create()
                 goto fail_bmp_alloc;
 
         set->chk = now;
+        set->min = UINT32_MAX;
 
         pthread_rwlock_unlock(&ai.lock);
 
@@ -1437,6 +1445,7 @@ void fset_zero(struct flow_set * set)
 int fset_add(struct flow_set * set,
              int               fd)
 {
+        struct flow *           flow;
         struct flow_set_entry * fse;
         int                     ret;
         size_t                  packets;
@@ -1445,13 +1454,17 @@ int fset_add(struct flow_set * set,
         if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
                 return -EINVAL;
 
+        flow = &ai.flows[fd];
+
         fse = malloc(sizeof(*fse));
         if (fse == NULL)
                 return -ENOMEM;
 
-        pthread_rwlock_wrlock(&ai.lock);
+        fse->fd = fd;
 
-        if (ai.flows[fd].flow_id < 0) {
+        pthread_rwlock_rdlock(&ai.lock);
+
+        if (flow->flow_id < 0) {
                 ret = -EINVAL;
                 goto fail;
         }
@@ -1462,6 +1475,9 @@ int fset_add(struct flow_set * set,
 
         pthread_rwlock_wrlock(&set->lock);
 
+        if (flow->qs.timeout != 0 && flow->qs.timeout < set->min)
+                set->min = flow->qs.timeout;
+
         list_add_tail(&fse->next, &set->flows);
 
         pthread_rwlock_unlock(&set->lock);
@@ -1485,14 +1501,20 @@ void fset_del(struct flow_set * set,
 {
         struct list_head * p;
         struct list_head * h;
+        struct flow *      flow;
+        uint32_t           min;
 
         if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
                 return;
 
+        flow = &ai.flows[fd];
+
+        min = UINT32_MAX;
+
         pthread_rwlock_rdlock(&ai.lock);
 
-        if (ai.flows[fd].flow_id >= 0)
-                shm_flow_set_del(ai.fqset, set->idx, ai.flows[fd].flow_id);
+        if (flow->flow_id >= 0)
+                shm_flow_set_del(ai.fqset, set->idx, flow->flow_id);
 
         pthread_rwlock_wrlock(&set->lock);
 
@@ -1502,10 +1524,14 @@ void fset_del(struct flow_set * set,
                 if (e->fd == fd) {
                         list_del(&e->next);
                         free(e);
-                        break;
+                } else {
+                        if (flow->qs.timeout != 0 && flow->qs.timeout < min)
+                                min = flow->qs.timeout;
                 }
         }
 
+        set->min = min;
+
         pthread_rwlock_unlock(&set->lock);
 
         pthread_rwlock_unlock(&ai.lock);
@@ -1544,7 +1570,7 @@ static void fset_keepalive(struct flow_set * set)
 
         pthread_rwlock_wrlock(&set->lock);
 
-        if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) {
+        if (ts_diff_ns(&now, &set->chk) < set->min >> 2) {
                 pthread_rwlock_unlock(&set->lock);
                 return;
         }
diff --git a/src/lib/qosspec.proto b/src/lib/qosspec.proto
index 8a355363..3ceedd87 100644
--- a/src/lib/qosspec.proto
+++ b/src/lib/qosspec.proto
@@ -23,12 +23,13 @@
 syntax = "proto2";
 
 message qosspec_msg {
-        required uint32 delay        = 1; /* In ms */
-        required uint64 bandwidth    = 2; /* In bits/s */
-        required uint32 availability = 3; /* Class of 9s */
-        required uint32 loss         = 4; /* Packet loss */
-        required uint32 ber          = 5; /* Bit error rate, ppb */
-        required uint32 in_order     = 6; /* In-order delivery */
-        required uint32 max_gap      = 7; /* In ms */
-        required uint32 cypher_s     = 8; /* Crypto strength in bits */
+        required uint32 delay        = 1; /* In ms.                   */
+        required uint64 bandwidth    = 2; /* In bits/s.               */
+        required uint32 availability = 3; /* Class of 9s.             */
+        required uint32 loss         = 4; /* Packet loss.             */
+        required uint32 ber          = 5; /* Bit error rate, ppb.     */
+        required uint32 in_order     = 6; /* In-order delivery.       */
+        required uint32 max_gap      = 7; /* In ms.                   */
+        required uint32 cypher_s     = 8; /* Crypto strength in bits. */
+        required uint32 timeout      = 9; /* Timeout in ms.           */
 };
diff --git a/src/lib/sockets.c b/src/lib/sockets.c
index 8179d2b3..48e95121 100644
--- a/src/lib/sockets.c
+++ b/src/lib/sockets.c
@@ -181,6 +181,7 @@ qosspec_msg_t spec_to_msg(const qosspec_t * qs)
         msg.in_order     = spec.in_order;
         msg.max_gap      = spec.max_gap;
         msg.cypher_s     = spec.cypher_s;
+        msg.timeout      = spec.timeout;
 
         return msg;
 }
@@ -199,6 +200,7 @@ qosspec_t msg_to_spec(const qosspec_msg_t * msg)
         spec.in_order     = msg->in_order;
         spec.max_gap      = msg->max_gap;
         spec.cypher_s     = msg->cypher_s;
+        spec.timeout      = msg->timeout;
 
         return spec;
 }
-- 
2.35.1


Other related posts:

  • » [PATCH] lib: Make flow liveness timeout configurable - Dimitri Staessens