[PATCH] ipcpd: Flow liveness monitoring for unicast IPCP

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sun, 20 Feb 2022 14:49:18 +0100

This adds FLM (Flow Liveness Monitoring) to the unicast IPCP. The
IPCPs will now send periodic 'pings' to eachother every X seconds, and
if no activity was recorded for 4 periods, the flow is marked as DOWN.

This is a first implementation, and the period is set to 30 seconds,
making a flow timeout after about 120 seconds of peer inactivity. I
will make this configurable via the flow QoS parameters. It is
currently only available on the unicast IPCP, the udp/eth/local IPCPs
may also need this mechanism at some point.

Because of the way that IPCPs currently deal with FRCP keepalive, the
flow monitor will keep the flow alive while FRCP is timing out. So if
there is a 5-minute Delta-t, the reliable flow will be kept alive
during these 5 minutes, and only then the IPCP will flag the flow as
gone and stop updating the peer. We can improve on this by keeping the
flow alive in the IPCP only to wait for the delta-t timers, but
flagging it down to the application. This does mean passing the
timeout all the way down into each IPCP's deallocation
implementation. Which is the opposite direction of my initial idea,
keeping the timeout in the IRMd and only deallocating at the IPCP when
all is timed out. But I think it is an optimization that will be
needed. If the flow is down, it's down and there is no need for the
application to sit there and wait for Godot. This will come in a next
update.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/ipcpd/ipcp.c       |   2 +-
 src/ipcpd/unicast/fa.c | 195 +++++++++++++++++++++++++++++++++++++----
 2 files changed, 178 insertions(+), 19 deletions(-)

diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 12caac17..870e1e49 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -620,7 +620,7 @@ static void * mainloop(void * o)
                         fd = np1_flow_dealloc(msg->flow_id, msg->timeo_sec);
                         if (fd < 0) {
                                 log_warn("Could not deallocate flow_id %d.",
-                                        msg->flow_id);
+                                         msg->flow_id);
                                 ret_msg.result = -1;
                                 break;
                         }
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index dcc79031..4a1fc841 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -35,6 +35,7 @@
 #include <ouroboros/fqueue.h>
 #include <ouroboros/errno.h>
 #include <ouroboros/dev.h>
+#include <ouroboros/fccntl.h>
 #include <ouroboros/ipcp-dev.h>
 #include <ouroboros/rib.h>
 #include <ouroboros/random.h>
@@ -55,11 +56,13 @@
 #define CLOCK_REALTIME_COARSE CLOCK_REALTIME
 #endif
 
-#define TIMEOUT 10000 /* nanoseconds */
+#define TIMEOUT     10000 /* nanoseconds */
+#define FLOWTIMEOUT 120   /* seconds     */
 
 #define FLOW_REQ    0
 #define FLOW_REPLY  1
 #define FLOW_UPDATE 2
+#define FLOW_FLM    3
 #define MSGBUFSZ    2048
 
 #define STAT_FILE_LEN 0
@@ -72,14 +75,14 @@ struct fa_msg {
         int8_t   response;
         uint16_t ece;
         /* QoS parameters from spec, aligned */
-        uint8_t  availability;
-        uint8_t  in_order;
         uint32_t delay;
         uint64_t bandwidth;
         uint32_t loss;
         uint32_t ber;
         uint32_t max_gap;
         uint16_t cypher_s;
+        uint8_t  availability;
+        uint8_t  in_order;
 } __attribute__((packed));
 
 struct cmd {
@@ -100,11 +103,14 @@ struct fa_flow {
         size_t   b_rcv_f;  /* Bytes received fail            */
         size_t   u_snd;    /* Flow updates sent              */
         size_t   u_rcv;    /* Flow updates received          */
+
 #endif
-        uint64_t s_eid;  /* Local endpoint id                */
-        uint64_t r_eid;  /* Remote endpoint id               */
-        uint64_t r_addr; /* Remote address                   */
-        void *   ctx;    /* Congestion avoidance context     */
+        uint64_t s_eid;    /* Local endpoint id              */
+        uint64_t r_eid;    /* Remote endpoint id             */
+        time_t   s_act;    /* Local (send) activity          */
+        time_t   r_act;    /* Remote (recv) activity         */
+        uint64_t r_addr;   /* Remote address                 */
+        void *   ctx;      /* Congestion avoidance context   */
 };
 
 struct {
@@ -119,6 +125,7 @@ struct {
         pthread_cond_t   cond;
         pthread_mutex_t  mtx;
         pthread_t        worker;
+        pthread_t        monitor;
 
         struct psched *  psched;
 } fa;
@@ -330,12 +337,15 @@ static void packet_handler(int                  fd,
                            qoscube_t            qc,
                            struct shm_du_buff * sdb)
 {
+        struct timespec  now;
         struct fa_flow * flow;
         uint64_t         r_addr;
         uint64_t         r_eid;
         ca_wnd_t         wnd;
         size_t           len;
 
+        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
         flow = &fa.flows[fd];
 
         pthread_rwlock_wrlock(&fa.flows_lock);
@@ -346,6 +356,8 @@ static void packet_handler(int                  fd,
         ++flow->p_snd;
         flow->b_snd += len;
 #endif
+        flow->r_act  = now.tv_sec;
+
         wnd = ca_ctx_update_snd(flow->ctx, len);
 
         r_addr = flow->r_addr;
@@ -370,22 +382,24 @@ static void packet_handler(int                  fd,
 
 static int fa_flow_init(struct fa_flow * flow)
 {
-#ifdef IPCP_FLOW_STATS
         struct timespec now;
-#endif
+
+        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
         memset(flow, 0, sizeof(*flow));
 
         flow->r_eid  = -1;
         flow->s_eid  = -1;
         flow->r_addr = INVALID_ADDR;
+        flow->s_act  = now.tv_sec;
+        flow->r_act  = now.tv_sec;
 
         flow->ctx = ca_ctx_create();
         if (flow->ctx == NULL)
                 return -1;
 
-#ifdef IPCP_FLOW_STATS
-        clock_gettime(CLOCK_REALTIME_COARSE, &now);
 
+#ifdef IPCP_FLOW_STATS
         flow->stamp = now.tv_sec;
 
         ++fa.n_flows;
@@ -630,9 +644,12 @@ static int fa_handle_flow_reply(struct fa_msg * msg,
 static int fa_handle_flow_update(struct fa_msg * msg,
                                  size_t          len)
 {
+        struct timespec  now;
         struct fa_flow * flow;
         int              fd;
 
+        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
         (void) len;
         assert(len >= sizeof(*msg));
 
@@ -648,6 +665,8 @@ static int fa_handle_flow_update(struct fa_msg * msg,
 #ifdef IPCP_FLOW_STATS
         flow->u_rcv++;
 #endif
+        flow->r_act = now.tv_sec;
+
         ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
 
         pthread_rwlock_unlock(&fa.flows_lock);
@@ -655,6 +674,35 @@ static int fa_handle_flow_update(struct fa_msg * msg,
         return 0;
 }
 
+static int fa_handle_flow_flm(struct fa_msg * msg,
+                              size_t          len)
+{
+        struct timespec  now;
+        struct fa_flow * flow;
+        int              fd;
+
+        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+        (void) len;
+        assert(len >= sizeof(*msg));
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        fd = eid_to_fd(ntoh64(msg->r_eid));
+        if (fd < 0) {
+                pthread_rwlock_unlock(&fa.flows_lock);
+                return 0; /* Flow unknown/deallocated. We're fine. */
+        }
+
+        flow = &fa.flows[fd];
+
+        flow->r_act = now.tv_sec;
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        return 0;
+}
+
 static void * fa_handle_packet(void * o)
 {
         (void) o;
@@ -683,8 +731,12 @@ static void * fa_handle_packet(void * o)
                         if (fa_handle_flow_update(msg, len) < 0)
                                 log_err("Error handling flow update.");
                         break;
+                case FLOW_FLM:
+                        if (fa_handle_flow_flm(msg, len) < 0)
+                                log_err("Error handling flow monitor update.");
+                        break;
                 default:
-                        log_warn("Recieved unknown flow allocation message.");
+                        log_warn("Received unknown flow allocation message.");
                         break;
                 }
         }
@@ -692,6 +744,103 @@ static void * fa_handle_packet(void * o)
         return (void *) 0;
 }
 
+static int fa_send_flm(int fd)
+{
+        struct timespec      now;
+        struct fa_msg *      msg;
+        struct shm_du_buff * sdb;
+        qoscube_t            qc = QOS_CUBE_BE;
+        struct fa_flow *     flow;
+        uint64_t             r_addr;
+
+        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+        if (ipcp_sdb_reserve(&sdb, sizeof(*msg)))
+                return -ENOMEM;
+
+        msg = (struct fa_msg *) shm_du_buff_head(sdb);
+
+        memset(msg, 0, sizeof(*msg));
+
+        flow = &fa.flows[fd];
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        msg->code  = FLOW_FLM;
+        msg->r_eid = hton64(flow->r_eid);
+
+        r_addr = flow->r_addr;
+
+        flow->s_act= now.tv_sec;
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
+                ipcp_sdb_release(sdb);
+                return -1;
+        }
+
+        return 0;
+}
+
+static void * fa_monitor(void * o)
+{
+        bool   updates[PROG_MAX_FLOWS];
+        bool   dead[PROG_MAX_FLOWS];
+
+        (void) o;
+
+        while (true) {
+                struct timespec  now;
+                struct fa_flow * flow;
+                int              i;
+
+                memset(updates, 0, sizeof(updates));
+                memset(dead, 0, sizeof(updates));
+
+                clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+                pthread_rwlock_rdlock(&fa.flows_lock);
+
+                for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+                        time_t s_delta;
+                        time_t r_delta;
+
+                        flow = &fa.flows[i];
+
+                        if (flow->r_addr == INVALID_ADDR)
+                                continue;
+
+                        s_delta = now.tv_sec - flow->s_act;
+                        r_delta = now.tv_sec - flow->r_act;
+
+                        if (s_delta > FLOWTIMEOUT / 4)
+                                updates[i] = true;
+
+                        if (r_delta > FLOWTIMEOUT) {
+                                log_info("Flow %d down: Unresponsive peer.", 
i);
+                                dead[i] = true;
+                        }
+                }
+
+                pthread_rwlock_unlock(&fa.flows_lock);
+
+                for (i = 0; i < PROG_MAX_FLOWS; ++i) {
+                        if (updates[i])
+                                fa_send_flm(i);
+                        if (dead[i]) {
+                                uint32_t flags;
+                                fccntl(i, FLOWGFLAGS, &flags);
+                                fccntl(i, FLOWSFLAGS, flags | FLOWFDOWN);
+                        }
+                }
+
+                sleep(FLOWTIMEOUT / 8);
+        }
+
+        return (void *) 0;
+}
+
 int fa_init(void)
 {
         pthread_condattr_t cattr;
@@ -763,6 +912,11 @@ int fa_start(void)
                 goto fail_thread;
         }
 
+        if (pthread_create(&fa.monitor, NULL, fa_monitor, NULL)) {
+                log_err("Failed to create worker thread.");
+                goto fail_thread;
+        }
+
         if (pthread_getschedparam(fa.worker, &pol, &par)) {
                 log_err("Failed to get worker thread scheduling parameters.");
                 goto fail_sched;
@@ -795,7 +949,9 @@ int fa_start(void)
 
 void fa_stop(void)
 {
+        pthread_cancel(fa.monitor);
         pthread_cancel(fa.worker);
+        pthread_join(fa.monitor, NULL);
         pthread_join(fa.worker, NULL);
 
         psched_destroy(fa.psched);
@@ -822,7 +978,7 @@ int fa_alloc(int             fd,
         len = sizeof(*msg) + ipcp_dir_hash_len();
 
         if (ipcp_sdb_reserve(&sdb, len + dlen))
-                return -1;
+                return -ENOMEM;
 
         msg = (struct fa_msg *) shm_du_buff_head(sdb);
         memset(msg, 0, sizeof(*msg));
@@ -879,7 +1035,7 @@ int fa_alloc_resp(int          fd,
 
         if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
                 fa_flow_fini(flow);
-                return -1;
+                return -ENOMEM;
         }
 
         msg = (struct fa_msg *) shm_du_buff_head(sdb);
@@ -934,15 +1090,17 @@ int fa_dealloc(int fd)
 static int fa_update_remote(int      fd,
                             uint16_t ece)
 {
+        struct timespec      now;
         struct fa_msg *      msg;
         struct shm_du_buff * sdb;
         qoscube_t            qc = QOS_CUBE_BE;
         struct fa_flow *     flow;
         uint64_t             r_addr;
 
-        if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
-                return -1;
-        }
+        clock_gettime(CLOCK_REALTIME_COARSE, &now);
+
+        if (ipcp_sdb_reserve(&sdb, sizeof(*msg)))
+                return -ENOMEM;
 
         msg = (struct fa_msg *) shm_du_buff_head(sdb);
 
@@ -960,8 +1118,9 @@ static int fa_update_remote(int      fd,
 #ifdef IPCP_FLOW_STATS
         flow->u_snd++;
 #endif
-        pthread_rwlock_unlock(&fa.flows_lock);
+        flow->s_act= now.tv_sec;
 
+        pthread_rwlock_unlock(&fa.flows_lock);
 
         if (dt_write_packet(r_addr, qc, fa.eid, sdb)) {
                 ipcp_sdb_release(sdb);
-- 
2.35.1


Other related posts:

  • » [PATCH] ipcpd: Flow liveness monitoring for unicast IPCP - Dimitri Staessens