[PATCH v2] lib: Add initial flow liveness monitoring

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Wed, 23 Feb 2022 21:14:26 +0100

This adds flow liveness monitoring for flows, with a fixed timeout of
120s. I will make it configurable at flow allocation later on (timeout
needs to be communicated to the peer). If one peer dies, or doesn't
call any IPC calls (flow_write/flow_read/fevent) it will stop sending
keepalives and the other peer's read/writes will error on an
-EFLOWDOWN after the timeout expires.

Packets without a payload (0 length packets) are interpreted as
keepalive packets for the flow. They can be sent from any application,
but they will not trigger a message read at the receiver side (0 as a
return value on flow_read indicates a previous partial read has
completed at exactly the buffer size).

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/lib/crypt.c |   3 +
 src/lib/dev.c   | 182 ++++++++++++++++++++++++++++++++++++++++++------
 src/lib/frct.c  |   1 +
 3 files changed, 163 insertions(+), 23 deletions(-)

diff --git a/src/lib/crypt.c b/src/lib/crypt.c
index 070f5113..043eae13 100644
--- a/src/lib/crypt.c
+++ b/src/lib/crypt.c
@@ -217,6 +217,9 @@ static int openssl_encrypt(struct flow *        f,
         in = shm_du_buff_head(sdb);
         in_sz = shm_du_buff_tail(sdb) - in;
 
+        if (in_sz == 0)
+                return 0;
+
         if (random_buffer(iv, IVSZ) < 0)
                 goto fail_iv;
 
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 0acc7455..4c21fcdf 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -68,6 +68,7 @@
 #define SECMEMSZ  16384
 #define SYMMKEYSZ 32
 #define MSGBUFSZ  2048
+#define FLOWTIMEO 120 /* seconds */
 
 enum port_state {
         PORT_NULL = 0,
@@ -102,6 +103,9 @@ struct flow {
 
         pid_t                 pid;
 
+        struct timespec       snd_act;
+        struct timespec       rcv_act;
+
         bool                  snd_timesout;
         bool                  rcv_timesout;
         struct timespec       snd_timeo;
@@ -119,6 +123,8 @@ struct flow_set_entry {
 struct flow_set {
         size_t idx;
 
+        struct timespec  chk;   /* Last keepalive check */
+
         struct list_head flows;
         pthread_rwlock_t lock;
 };
@@ -300,8 +306,11 @@ static int flow_init(int       flow_id,
                      qosspec_t qs,
                      uint8_t * s)
 {
-        int fd;
-        int err = -ENOMEM;
+        struct timespec now;
+        int             fd;
+        int             err = -ENOMEM;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
 
         pthread_rwlock_wrlock(&ai.lock);
 
@@ -328,6 +337,8 @@ static int flow_init(int       flow_id,
         ai.flows[fd].pid      = pid;
         ai.flows[fd].part_idx = NO_PART;
         ai.flows[fd].qs       = qs;
+        ai.flows[fd].snd_act  = now;
+        ai.flows[fd].rcv_act  = now;
 
         if (qs.cypher_s > 0) {
                 assert(s != NULL);
@@ -1033,6 +1044,43 @@ static int add_crc(struct shm_du_buff * sdb)
         return 0;
 }
 
+static void flow_send_keepalive(int fd)
+{
+        flow_write(fd, NULL, 0);
+}
+
+static int flow_keepalive(int fd)
+{
+        struct timespec    now;
+        struct timespec    s_act;
+        struct timespec    r_act;
+        struct flow *      flow;
+        int                flow_id;
+
+        flow = &ai.flows[fd];
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        pthread_rwlock_rdlock(&ai.lock);
+
+        s_act = flow->snd_act;
+        r_act = flow->rcv_act;
+
+        flow_id = flow->flow_id;
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        if (ts_diff_ns(&r_act, &now) > FLOWTIMEO * BILLION) {
+                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PKT);
+                return -EFLOWDOWN;
+        }
+
+        if (ts_diff_ns(&s_act, &now) > (FLOWTIMEO / 4) * BILLION)
+                flow_send_keepalive(fd);
+
+        return 0;
+}
+
 ssize_t flow_write(int          fd,
                    const void * buf,
                    size_t       count)
@@ -1048,17 +1096,17 @@ ssize_t flow_write(int          fd,
         struct shm_du_buff * sdb;
         uint8_t *            ptr;
 
-        if (buf == NULL)
+        if (buf == NULL && count != 0)
                 return 0;
 
-        if (fd < 0 || fd > PROG_MAX_FLOWS)
+        if (fd < 0 || fd >= PROG_MAX_FLOWS)
                 return -EBADF;
 
         flow = &ai.flows[fd];
 
         clock_gettime(PTHREAD_COND_CLOCK, &abs);
 
-        pthread_rwlock_rdlock(&ai.lock);
+        pthread_rwlock_wrlock(&ai.lock);
 
         if (flow->flow_id < 0) {
                 pthread_rwlock_unlock(&ai.lock);
@@ -1091,6 +1139,9 @@ ssize_t flow_write(int          fd,
                         if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)
                                 return -ETIMEDOUT;
 
+                        if (flow_keepalive(fd))
+                                return -EFLOWDOWN;
+
                         frcti_tick(flow->frcti);
 
                         ts_add(&tictime, &tic, &tictime);
@@ -1101,11 +1152,20 @@ ssize_t flow_write(int          fd,
         if (idx < 0)
                 return idx;
 
-        memcpy(ptr, buf, count);
+        clock_gettime(PTHREAD_COND_CLOCK, &abs);
+
+        pthread_rwlock_wrlock(&ai.lock);
+
+        flow->snd_act = abs;
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        if (count > 0)
+                memcpy(ptr, buf, count);
 
         pthread_rwlock_rdlock(&ai.lock);
 
-        if (frcti_snd(flow->frcti, sdb) < 0)
+        if (count != 0 && frcti_snd(flow->frcti, sdb) < 0)
                 goto enomem;
 
         if (flow->qs.cypher_s > 0 && crypt_encrypt(flow, sdb) < 0)
@@ -1114,6 +1174,8 @@ ssize_t flow_write(int          fd,
         if (flow->qs.ber == 0 && add_crc(sdb) != 0)
                 goto enomem;
 
+        pthread_cleanup_push(__cleanup_rwlock_unlock, &ai.lock);
+
         if (flags & FLOWFWNOBLOCK)
                 ret = shm_rbuff_write(flow->tx_rb, idx);
         else
@@ -1124,7 +1186,7 @@ ssize_t flow_write(int          fd,
         else
                 shm_flow_set_notify(flow->set, flow->flow_id, FLOW_PKT);
 
-        pthread_rwlock_unlock(&ai.lock);
+        pthread_cleanup_pop(true);
 
         return ret < 0 ? (ssize_t) ret : (ssize_t) count;
 
@@ -1144,6 +1206,7 @@ ssize_t flow_read(int    fd,
         struct shm_rbuff *   rb;
         struct shm_du_buff * sdb;
         struct timespec      abs;
+        struct timespec      now;
         struct timespec      tic = {0, TICTIME};
         struct timespec      tictime;
         struct timespec *    abstime = NULL;
@@ -1156,7 +1219,7 @@ ssize_t flow_read(int    fd,
 
         flow = &ai.flows[fd];
 
-        clock_gettime(PTHREAD_COND_CLOCK, &abs);
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
 
         pthread_rwlock_rdlock(&ai.lock);
 
@@ -1175,15 +1238,14 @@ ssize_t flow_read(int    fd,
         noblock = flow->oflags & FLOWFRNOBLOCK;
         partrd = !(flow->oflags & FLOWFRNOPART);
 
-        ts_add(&tic, &abs, &tictime);
+        ts_add(&now, &tic, &tictime);
 
         if (ai.flows[fd].rcv_timesout) {
-                ts_add(&abs, &flow->rcv_timeo, &abs);
+                ts_add(&now, &flow->rcv_timeo, &abs);
                 abstime = &abs;
         }
 
         idx = flow->part_idx;
-
         if (idx < 0) {
                 while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
                         pthread_rwlock_unlock(&ai.lock);
@@ -1200,20 +1262,28 @@ ssize_t flow_read(int    fd,
                                     && ts_diff_ns(&tictime, &abs) <= 0)
                                         return -ETIMEDOUT;
 
+                                if (flow_keepalive(fd) < 0)
+                                        return -EFLOWDOWN;
+
                                 ts_add(&tictime, &tic, &tictime);
-                                pthread_rwlock_rdlock(&ai.lock);
+
+                                pthread_rwlock_wrlock(&ai.lock);
                                 continue;
                         }
 
                         sdb = shm_rdrbuff_get(ai.rdrb, idx);
-                        if (flow->qs.ber == 0 && chk_crc(sdb) != 0) {
-                                pthread_rwlock_rdlock(&ai.lock);
+
+                        pthread_rwlock_wrlock(&ai.lock);
+
+                        flow->rcv_act = tictime;
+
+                        if ((flow->qs.ber == 0 && chk_crc(sdb) != 0) ||
+                            shm_du_buff_head(sdb) == shm_du_buff_tail(sdb)) {
                                 shm_rdrbuff_remove(ai.rdrb, idx);
+                                idx = -EAGAIN;
                                 continue;
                         }
 
-                        pthread_rwlock_rdlock(&ai.lock);
-
                         if (flow->qs.cypher_s > 0
                             && crypt_decrypt(flow, sdb) < 0) {
                                 pthread_rwlock_unlock(&ai.lock);
@@ -1242,6 +1312,8 @@ ssize_t flow_read(int    fd,
                 flow->part_idx = (partrd && n == (ssize_t) count) ?
                         DONE_PART : NO_PART;
 
+                flow->rcv_act = now;
+
                 pthread_rwlock_unlock(&ai.lock);
                 return n;
         } else {
@@ -1251,6 +1323,9 @@ ssize_t flow_read(int    fd,
                         shm_du_buff_head_release(sdb, n);
                         pthread_rwlock_wrlock(&ai.lock);
                         flow->part_idx = idx;
+
+                        flow->rcv_act = now;
+
                         pthread_rwlock_unlock(&ai.lock);
                         return count;
                 } else {
@@ -1265,6 +1340,9 @@ ssize_t flow_read(int    fd,
 struct flow_set * fset_create()
 {
         struct flow_set * set;
+        struct timespec   now;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
 
         set = malloc(sizeof(*set));
         if (set == NULL)
@@ -1281,6 +1359,8 @@ struct flow_set * fset_create()
         if (!bmp_is_id_valid(ai.fqueues, set->idx))
                 goto fail_bmp_alloc;
 
+        set->chk = now;
+
         pthread_rwlock_unlock(&ai.lock);
 
         list_head_init(&set->flows);
@@ -1453,6 +1533,48 @@ bool fset_has(const struct flow_set * set,
         return ret;
 }
 
+static void fset_keepalive(struct flow_set * set)
+{
+        struct timespec    now;
+        struct list_head * p;
+        struct list_head * h;
+        struct list_head   copy;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        pthread_rwlock_wrlock(&set->lock);
+
+        if (ts_diff_ns(&now, &set->chk) < (FLOWTIMEO / 4) * BILLION) {
+                pthread_rwlock_unlock(&set->lock);
+                return;
+        }
+
+        set->chk = now;
+
+        list_head_init(&copy);
+
+        list_for_each(p, &set->flows) {
+                struct flow_set_entry * c;
+                struct flow_set_entry * e;
+                e = list_entry(p, struct flow_set_entry, next);
+                c = malloc(sizeof(*c));
+                if (c == NULL)
+                        continue;
+                c->fd = e->fd;
+                list_add_tail(&c->next, &copy);
+        }
+
+        pthread_rwlock_unlock(&set->lock);
+
+        list_for_each_safe(p, h, &copy) {
+                struct flow_set_entry * e;
+                e = list_entry(p, struct flow_set_entry, next);
+                flow_send_keepalive(e->fd);
+                list_del(&e->next);
+                free(e);
+        }
+}
+
 int fqueue_next(struct fqueue * fq)
 {
         int fd;
@@ -1525,6 +1647,7 @@ ssize_t fevent(struct flow_set *       set,
                         ts_add(t, &tic, t);
                         pthread_rwlock_rdlock(&ai.lock);
                         timerwheel_move();
+                        fset_keepalive(set);
                         pthread_rwlock_unlock(&ai.lock);
                         continue;
                 }
@@ -1707,6 +1830,7 @@ int ipcp_flow_alloc_reply(int          fd,
 int ipcp_flow_read(int                   fd,
                    struct shm_du_buff ** sdb)
 {
+        struct timespec    now;
         struct flow *      flow;
         struct shm_rbuff * rb;
         ssize_t            idx = -1;
@@ -1729,11 +1853,18 @@ int ipcp_flow_read(int                   fd,
                 if (idx < 0)
                         return idx;
 
-                pthread_rwlock_rdlock(&ai.lock);
+                clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+                pthread_rwlock_wrlock(&ai.lock);
 
                 *sdb = shm_rdrbuff_get(ai.rdrb, idx);
-                if (flow->qs.ber == 0 && chk_crc(*sdb) != 0)
+                if ((flow->qs.ber == 0 && chk_crc(*sdb) != 0) ||
+                    (shm_du_buff_head(*sdb) == shm_du_buff_tail(*sdb))) {
+                        shm_rdrbuff_remove(ai.rdrb, idx);
                         continue;
+                }
+
+                flow->rcv_act = now;
 
                 frcti_rcv(flow->frcti, *sdb);
         }
@@ -1750,16 +1881,19 @@ int ipcp_flow_read(int                   fd,
 int ipcp_flow_write(int                  fd,
                     struct shm_du_buff * sdb)
 {
-        struct flow * flow;
-        int           ret;
-        ssize_t       idx;
+        struct timespec now;
+        struct flow *   flow;
+        int             ret;
+        ssize_t         idx;
 
         assert(fd >= 0 && fd < SYS_MAX_FLOWS);
         assert(sdb);
 
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
         flow = &ai.flows[fd];
 
-        pthread_rwlock_rdlock(&ai.lock);
+        pthread_rwlock_wrlock(&ai.lock);
 
         if (flow->flow_id < 0) {
                 pthread_rwlock_unlock(&ai.lock);
@@ -1792,6 +1926,8 @@ int ipcp_flow_write(int                  fd,
         else
                 shm_rdrbuff_remove(ai.rdrb, idx);
 
+        flow->snd_act = now;
+
         pthread_rwlock_unlock(&ai.lock);
 
         assert(ret <= 0);
diff --git a/src/lib/frct.c b/src/lib/frct.c
index ff938aec..e9741aaf 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -667,6 +667,7 @@ static int __frcti_snd(struct frcti *       frcti,
         bool              rtx;
 
         assert(frcti);
+        assert(shm_du_buff_head(sdb) != shm_du_buff_tail(sdb));
 
         snd_cr = &frcti->snd_cr;
         rcv_cr = &frcti->rcv_cr;
-- 
2.35.1


Other related posts: