[PATCH] lib: Move flow monitoring to its own thread

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sun, 27 Mar 2022 18:18:52 +0200

This adds a monitoring thread to handle flow keepalive management in
the application and removes the thread interruptions to schedule FRCT
calls within the regular IPC calls.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/lib/dev.c | 304 +++++++++++++++++++++-----------------------------
 1 file changed, 130 insertions(+), 174 deletions(-)

diff --git a/src/lib/dev.c b/src/lib/dev.c
index 845999a0..c76504d9 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -123,9 +123,6 @@ struct flow_set_entry {
 struct flow_set {
         size_t           idx;
 
-        struct timespec  chk;   /* Last keepalive check.          */
-        uint32_t         min;   /* Minimum keepalive time in set. */
-
         struct list_head flows;
         pthread_rwlock_t lock;
 };
@@ -146,6 +143,11 @@ struct {
         struct flow *         flows;
         struct port *         ports;
 
+        pthread_t             mon;
+        int                   min_timeo;
+        int                   min_fd;
+        int                   max_fd;
+
         pthread_t             tx;
         size_t                n_frcti;
 
@@ -253,14 +255,6 @@ static int proc_announce(char * prog)
         return ret;
 }
 
-static void flow_clear(int fd)
-{
-        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
-
-        ai.flows[fd].flow_id  = -1;
-        ai.flows[fd].pid      = -1;
-}
-
 #include "crypt.c"
 #include "frct.c"
 
@@ -279,6 +273,90 @@ void * frct_tx(void * o)
         return (void *) 0;
 }
 
+static void flow_send_keepalive(int fd)
+{
+        flow_write(fd, NULL, 0);
+}
+
+static void flow_keepalive(int fd)
+{
+        struct timespec    now;
+        struct timespec    s_act;
+        struct timespec    r_act;
+        struct flow *      flow;
+        int                flow_id;
+        uint32_t           timeo;
+        struct shm_rbuff * rb;
+        uint32_t           acl;
+
+        flow = &ai.flows[fd];
+
+        pthread_rwlock_rdlock(&ai.lock);
+
+        if (flow->flow_id < 0) {
+                pthread_rwlock_unlock(&ai.lock);
+                return;
+        }
+
+        s_act = flow->snd_act;
+        r_act = flow->rcv_act;
+
+        flow_id = flow->flow_id;
+        timeo   = flow->qs.timeout;
+
+        rb = flow->rx_rb;
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        acl = shm_rbuff_get_acl(rb);
+        if (timeo == 0 ||  acl & (ACL_FLOWPEER | ACL_FLOWDOWN))
+                return;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
+                shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWPEER);
+                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
+                return;
+        }
+
+        if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2)
+                flow_send_keepalive(fd);
+}
+
+void * monitor(void * o)
+{
+        struct timespec tic = {0, TICTIME};
+
+        (void) o;
+
+        while (true) {
+                int i;
+                int min;
+                int max;
+
+                pthread_rwlock_rdlock(&ai.lock);
+                min = ai.min_fd;
+                max = ai.max_fd;
+                pthread_rwlock_unlock(&ai.lock);
+
+                for (i = min; i <= max; ++i)
+                        flow_keepalive(i);
+
+                nanosleep(&tic, NULL);
+        }
+
+        return (void *) 0;
+}
+
+static void flow_clear(int fd)
+{
+        memset(&ai.flows[fd], 0, sizeof(ai.flows[fd]));
+
+        ai.flows[fd].flow_id  = -1;
+        ai.flows[fd].pid      = -1;
+}
+
 static void flow_fini(int fd)
 {
         assert(fd >= 0 && fd < SYS_MAX_FLOWS);
@@ -297,7 +375,6 @@ static void flow_fini(int fd)
                 bmp_release(ai.fds, fd);
         }
 
-
         if (ai.flows[fd].rx_rb != NULL) {
                 shm_rbuff_set_acl(ai.flows[fd].rx_rb, ACL_FLOWDOWN);
                 shm_rbuff_close(ai.flows[fd].rx_rb);
@@ -319,6 +396,12 @@ static void flow_fini(int fd)
                 crypt_fini(ai.flows[fd].ctx);
 
         flow_clear(fd);
+
+        while (ai.flows[ai.max_fd].flow_id == -1 && ai.max_fd > ai.min_fd)
+                --ai.max_fd;
+
+        while (ai.flows[ai.min_fd].flow_id == -1 && ai.min_fd < ai.max_fd)
+                ++ai.min_fd;
 }
 
 static int flow_init(int       flow_id,
@@ -342,6 +425,12 @@ static int flow_init(int       flow_id,
                 goto fail_fds;
         }
 
+        if (fd > ai.max_fd)
+                ai.max_fd = fd;
+
+        if (fd < ai.min_fd)
+                ai.min_fd = fd;
+
         flow = &ai.flows[fd];
 
         flow->rx_rb = shm_rbuff_open(getpid(), flow_id);
@@ -447,6 +536,9 @@ static void init(int     argc,
         if (ai.fds == NULL)
                 goto fail_fds;
 
+        ai.min_fd = PROG_RES_FDS;
+        ai.max_fd = PROG_RES_FDS;
+
         ai.fqueues = bmp_create(PROG_MAX_FQUEUES, 0);
         if (ai.fqueues == NULL)
                 goto fail_fqueues;
@@ -506,12 +598,17 @@ static void init(int     argc,
                         goto fail_rib_init;
         }
 #endif
+        if (pthread_create(&ai.mon, NULL, monitor, NULL) < 0)
+                goto fail_monitor;
+
         return;
 
+ fail_monitor:
 #if defined PROC_FLOW_STATS
+        rib_fini();
  fail_rib_init:
-        timerwheel_fini();
 #endif
+        timerwheel_fini();
  fail_timerwheel:
         shm_flow_set_close(ai.fqset);
  fail_fqset:
@@ -548,6 +645,9 @@ static void fini(void)
         if (ai.fds == NULL)
                 return;
 
+        pthread_cancel(ai.mon);
+        pthread_join(ai.mon, NULL);
+
         pthread_rwlock_wrlock(&ai.lock);
 
         for (i = 0; i < PROG_MAX_FLOWS; ++i) {
@@ -669,7 +769,7 @@ int flow_accept(qosspec_t *             qs,
         if (fd < 0)
                 return fd;
 
-        pthread_rwlock_wrlock(&ai.lock);
+        pthread_rwlock_rdlock(&ai.lock);
 
         if (qs != NULL)
                 *qs = ai.flows[fd].qs;
@@ -1056,48 +1156,6 @@ static int add_crc(struct shm_du_buff * sdb)
         return 0;
 }
 
-static void flow_send_keepalive(int fd)
-{
-        flow_write(fd, NULL, 0);
-}
-
-static int flow_keepalive(int fd)
-{
-        struct timespec    now;
-        struct timespec    s_act;
-        struct timespec    r_act;
-        struct flow *      flow;
-        int                flow_id;
-        uint32_t           timeo;
-
-        flow = &ai.flows[fd];
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        pthread_rwlock_rdlock(&ai.lock);
-
-        s_act = flow->snd_act;
-        r_act = flow->rcv_act;
-
-        flow_id = flow->flow_id;
-        timeo   = flow->qs.timeout;
-
-        pthread_rwlock_unlock(&ai.lock);
-
-        if (timeo == 0)
-                return 0;
-
-        if (ts_diff_ns(&r_act, &now) > timeo * MILLION) {
-                shm_flow_set_notify(ai.fqset, flow_id, FLOW_PEER);
-                return -EFLOWPEER;
-        }
-
-        if (ts_diff_ns(&s_act, &now) > (timeo * MILLION) >> 2)
-                flow_send_keepalive(fd);
-
-        return 0;
-}
-
 static int flow_tx_sdb(struct flow *        flow,
                        struct shm_du_buff * sdb,
                        bool                 block,
@@ -1162,8 +1220,6 @@ ssize_t flow_write(int          fd,
         int                  flags;
         struct timespec      abs;
         struct timespec *    abstime = NULL;
-        struct timespec      tic = {0, TICTIME};
-        struct timespec      tictime;
         struct shm_du_buff * sdb;
         uint8_t *            ptr;
 
@@ -1184,9 +1240,7 @@ ssize_t flow_write(int          fd,
                 return -ENOTALLOC;
         }
 
-        ts_add(&tic, &abs, &tictime);
-
-        if (ai.flows[fd].snd_timesout) {
+        if (flow->snd_timesout) {
                 ts_add(&abs, &flow->snd_timeo, &abs);
                 abstime = &abs;
         }
@@ -1203,17 +1257,9 @@ ssize_t flow_write(int          fd,
                         return -EAGAIN;
                 idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
         } else {
-                while ((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
-                        if (ret != -ETIMEDOUT)
+                while ((ret = frcti_window_wait(flow->frcti, abstime)) < 0) {
+                        if (ret < 0)
                                 return ret;
-
-                        if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)
-                                return -ETIMEDOUT;
-
-                        if (flow_keepalive(fd))
-                                return -EFLOWPEER;
-
-                        ts_add(&tictime, &tic, &tictime);
                 }
                 idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
         }
@@ -1224,7 +1270,7 @@ ssize_t flow_write(int          fd,
         if (count > 0)
                 memcpy(ptr, buf, count);
 
-        ret = flow_tx_sdb(flow, sdb, flags & FLOWFWNOBLOCK, abstime);
+        ret = flow_tx_sdb(flow, sdb, !(flags & FLOWFWNOBLOCK), abstime);
 
         return ret < 0 ? (ssize_t) ret : (ssize_t) count;
 }
@@ -1257,8 +1303,6 @@ static ssize_t flow_rx_sdb(struct flow *         flow,
         if (idx < 0)
                 return idx;
 
-        *sdb = shm_rdrbuff_get(ai.rdrb, idx);
-
         clock_gettime(PTHREAD_COND_CLOCK, &now);
 
         pthread_rwlock_wrlock(&ai.lock);
@@ -1267,6 +1311,7 @@ static ssize_t flow_rx_sdb(struct flow *         flow,
 
         pthread_rwlock_unlock(&ai.lock);
 
+        *sdb = shm_rdrbuff_get(ai.rdrb, idx);
         if (invalid_pkt(flow, *sdb)) {
                 shm_rdrbuff_remove(ai.rdrb, idx);
                 return -EAGAIN;
@@ -1285,8 +1330,6 @@ ssize_t flow_read(int    fd,
         struct shm_du_buff * sdb;
         struct timespec      abs;
         struct timespec      now;
-        struct timespec      tic = {0, TICTIME};
-        struct timespec      tictime;
         struct timespec *    abstime = NULL;
         struct flow *        flow;
         bool                 block;
@@ -1315,8 +1358,6 @@ ssize_t flow_read(int    fd,
         block  = !(flow->oflags & FLOWFRNOBLOCK);
         partrd = !(flow->oflags & FLOWFRNOPART);
 
-        ts_add(&now, &tic, &tictime);
-
         if (flow->rcv_timesout) {
                 ts_add(&now, &flow->rcv_timeo, &abs);
                 abstime = &abs;
@@ -1327,19 +1368,12 @@ ssize_t flow_read(int    fd,
                 while ((idx = frcti_queued_pdu(flow->frcti)) < 0) {
                         pthread_rwlock_unlock(&ai.lock);
 
-                        idx = flow_rx_sdb(flow, &sdb, block, &tictime);
+                        idx = flow_rx_sdb(flow, &sdb, block, abstime);
                         if (idx < 0) {
-                                if (idx != -ETIMEDOUT && idx != -EAGAIN)
+                                if (block && idx != -EAGAIN)
+                                        return idx;
+                                if (!block)
                                         return idx;
-
-                                if (abstime != NULL
-                                    && ts_diff_ns(&tictime, &abs) <= 0)
-                                        return -ETIMEDOUT;
-
-                                if (flow_keepalive(fd) < 0)
-                                        return -EFLOWPEER;
-
-                                ts_add(&tictime, &tic, &tictime);
 
                                 pthread_rwlock_rdlock(&ai.lock);
                                 continue;
@@ -1397,9 +1431,6 @@ ssize_t flow_read(int    fd,
 struct flow_set * fset_create()
 {
         struct flow_set * set;
-        struct timespec   now;
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
 
         set = malloc(sizeof(*set));
         if (set == NULL)
@@ -1416,9 +1447,6 @@ struct flow_set * fset_create()
         if (!bmp_is_id_valid(ai.fqueues, set->idx))
                 goto fail_bmp_alloc;
 
-        set->chk = now;
-        set->min = UINT32_MAX;
-
         pthread_rwlock_unlock(&ai.lock);
 
         list_head_init(&set->flows);
@@ -1523,9 +1551,6 @@ int fset_add(struct flow_set * set,
 
         pthread_rwlock_wrlock(&set->lock);
 
-        if (flow->qs.timeout != 0 && flow->qs.timeout < set->min)
-                set->min = flow->qs.timeout;
-
         list_add_tail(&fse->next, &set->flows);
 
         pthread_rwlock_unlock(&set->lock);
@@ -1549,15 +1574,12 @@ void fset_del(struct flow_set * set,
         struct list_head * p;
         struct list_head * h;
         struct flow *      flow;
-        uint32_t           min;
 
         if (set == NULL || fd < 0 || fd >= SYS_MAX_FLOWS)
                 return;
 
         flow = &ai.flows[fd];
 
-        min = UINT32_MAX;
-
         pthread_rwlock_rdlock(&ai.lock);
 
         if (flow->flow_id >= 0)
@@ -1571,14 +1593,10 @@ void fset_del(struct flow_set * set,
                 if (e->fd == fd) {
                         list_del(&e->next);
                         free(e);
-                } else {
-                        if (flow->qs.timeout != 0 && flow->qs.timeout < min)
-                                min = flow->qs.timeout;
+                        break;
                 }
         }
 
-        set->min = min;
-
         pthread_rwlock_unlock(&set->lock);
 
         pthread_rwlock_unlock(&ai.lock);
@@ -1606,48 +1624,6 @@ bool fset_has(const struct flow_set * set,
         return ret;
 }
 
-static void fset_keepalive(struct flow_set * set)
-{
-        struct timespec    now;
-        struct list_head * p;
-        struct list_head * h;
-        struct list_head   copy;
-
-        clock_gettime(PTHREAD_COND_CLOCK, &now);
-
-        pthread_rwlock_wrlock(&set->lock);
-
-        if (ts_diff_ns(&now, &set->chk) < set->min >> 2) {
-                pthread_rwlock_unlock(&set->lock);
-                return;
-        }
-
-        set->chk = now;
-
-        list_head_init(&copy);
-
-        list_for_each(p, &set->flows) {
-                struct flow_set_entry * c;
-                struct flow_set_entry * e;
-                e = list_entry(p, struct flow_set_entry, next);
-                c = malloc(sizeof(*c));
-                if (c == NULL)
-                        continue;
-                c->fd = e->fd;
-                list_add_tail(&c->next, &copy);
-        }
-
-        pthread_rwlock_unlock(&set->lock);
-
-        list_for_each_safe(p, h, &copy) {
-                struct flow_set_entry * e;
-                e = list_entry(p, struct flow_set_entry, next);
-                flow_send_keepalive(e->fd);
-                list_del(&e->next);
-                free(e);
-        }
-}
-
 int fqueue_next(struct fqueue * fq)
 {
         int fd;
@@ -1690,8 +1666,6 @@ ssize_t fevent(struct flow_set *       set,
                const struct timespec * timeo)
 {
         ssize_t           ret = 0;
-        struct timespec   tic = {0, TICTIME};
-        struct timespec   tictime;
         struct timespec   abs;
         struct timespec * t = NULL;
 
@@ -1703,27 +1677,15 @@ ssize_t fevent(struct flow_set *       set,
 
         clock_gettime(PTHREAD_COND_CLOCK, &abs);
 
-        ts_add(&tic, &abs, &tictime);
-        t = &tictime;
-
-        if (timeo != NULL)
+        if (timeo != NULL) {
                 ts_add(&abs, timeo, &abs);
+                t = &abs;
+        }
 
         while (ret == 0) {
                 ret = shm_flow_set_wait(ai.fqset, set->idx, fq->fqueue, t);
-                if (ret == -ETIMEDOUT) {
-                        if (timeo != NULL && ts_diff_ns(t, &abs) < 0) {
-                                fq->fqsize = 0;
-                                return -ETIMEDOUT;
-                        }
-                        ret = 0;
-                        ts_add(t, &tic, t);
-                        pthread_rwlock_rdlock(&ai.lock);
-                        timerwheel_move();
-                        fset_keepalive(set);
-                        pthread_rwlock_unlock(&ai.lock);
-                        continue;
-                }
+                if (ret == -ETIMEDOUT)
+                        return -ETIMEDOUT;
 
                 fq->fqsize = ret << 1;
                 fq->next   = 0;
@@ -1922,14 +1884,8 @@ int ipcp_flow_read(int                   fd,
                 pthread_rwlock_unlock(&ai.lock);
 
                 idx = flow_rx_sdb(flow, sdb, false, NULL);
-                if (idx < 0) {
-                        if (idx == -EAGAIN) {
-                                pthread_rwlock_rdlock(&ai.lock);
-                                continue;
-                        }
-
+                if (idx < 0)
                         return idx;
-                }
 
                 pthread_rwlock_rdlock(&ai.lock);
 
-- 
2.35.1


Other related posts:

  • » [PATCH] lib: Move flow monitoring to its own thread - Dimitri Staessens