[PATCH] ipcpd: Refactor flow allocator message handling

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sat, 19 Feb 2022 16:20:22 +0100

This refactors the single long function that handles incoming packets
destined for the flow allocator.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/ipcpd/unicast/fa.c | 337 ++++++++++++++++++++++++-----------------
 1 file changed, 197 insertions(+), 140 deletions(-)

diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index ef6adae6..7143a346 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -435,167 +435,224 @@ static void fa_post_packet(void *               comp,
         pthread_mutex_unlock(&fa.mtx);
 }
 
-static void * fa_handle_packet(void * o)
+static size_t fa_wait_for_fa_msg(struct fa_msg * msg)
+{
+        struct cmd * cmd;
+        size_t       len;
+
+        pthread_mutex_lock(&fa.mtx);
+
+        pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx);
+
+        while (list_is_empty(&fa.cmds))
+                pthread_cond_wait(&fa.cond, &fa.mtx);
+
+        cmd = list_last_entry(&fa.cmds, struct cmd, next);
+        list_del(&cmd->next);
+
+        pthread_cleanup_pop(true);
+
+        len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
+        if (len > MSGBUFSZ || len < sizeof(*msg)) {
+                log_warn("Invalid flow allocation message (len: %zd)\n", len);
+                free(cmd);
+                return 0; /* No valid message */
+        }
+
+        memcpy(msg, shm_du_buff_head(cmd->sdb), len);
+
+        ipcp_sdb_release(cmd->sdb);
+
+        free(cmd);
+
+        return len;
+}
+
+static int fa_wait_irmd_alloc(uint8_t *    dst,
+                              qosspec_t    qs,
+                              const void * data,
+                              size_t       len)
 {
         struct timespec ts  = {0, TIMEOUT * 1000};
+        struct timespec abstime;
+        int             fd;
 
+        clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+
+        pthread_mutex_lock(&ipcpi.alloc_lock);
+
+        while (ipcpi.alloc_id != -1 && ipcp_get_state() == IPCP_OPERATIONAL) {
+                ts_add(&abstime, &ts, &abstime);
+                pthread_cond_timedwait(&ipcpi.alloc_cond,
+                                       &ipcpi.alloc_lock,
+                                       &abstime);
+        }
+
+        if (ipcp_get_state() != IPCP_OPERATIONAL) {
+                pthread_mutex_unlock(&ipcpi.alloc_lock);
+                log_dbg("Won't allocate over non-operational IPCP.");
+                return -EIPCPSTATE;
+        }
+
+        assert(ipcpi.alloc_id == -1);
+
+        fd = ipcp_flow_req_arr(dst, ipcp_dir_hash_len(), qs, data, len);
+        if (fd < 0) {
+                pthread_mutex_unlock(&ipcpi.alloc_lock);
+                log_dbg("Failed to get fd for flow.");
+                return -ENOTALLOC;
+        }
+
+        ipcpi.alloc_id = fd;
+        pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+        pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+        return fd;
+}
+
+static int fa_handle_flow_req(struct fa_msg * msg,
+                              size_t          len)
+{
+        size_t           msg_len;
+        int              fd;
+        qosspec_t        qs;
+        struct fa_flow * flow;
+        uint8_t *        data;  /* Piggbacked data on flow alloc request. */
+        size_t           dlen;  /* Length of piggybacked data.            */
+
+        msg_len = sizeof(*msg) + ipcp_dir_hash_len();
+        if (len < msg_len) {
+                log_err("Invalid flow allocation request");
+                return -EPERM;
+        }
+
+        data = (uint8_t *) msg + msg_len;
+        dlen = len - msg_len;
+
+        qs.delay        = ntoh32(msg->delay);
+        qs.bandwidth    = ntoh64(msg->bandwidth);
+        qs.availability = msg->availability;
+        qs.loss         = ntoh32(msg->loss);
+        qs.ber          = ntoh32(msg->ber);
+        qs.in_order     = msg->in_order;
+        qs.max_gap      = ntoh32(msg->max_gap);
+        qs.cypher_s     = ntoh16(msg->cypher_s);
+
+        fd = fa_wait_irmd_alloc((uint8_t *) (msg + 1), qs, data, dlen);
+        if (fd < 0)
+                return fd;
+
+        flow = &fa.flows[fd];
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        fa_flow_init(flow);
+
+        flow->s_eid  = gen_eid(fd);
+        flow->r_eid  = ntoh64(msg->s_eid);
+        flow->r_addr = ntoh64(msg->s_addr);
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        return fd;
+}
+
+static int fa_handle_flow_reply(struct fa_msg * msg,
+                                size_t          len)
+{
+        int              fd;
+        struct fa_flow * flow;
+        uint8_t *        data;  /* Piggbacked data on flow alloc request. */
+        size_t           dlen;  /* Length of piggybacked data.            */
+
+        assert(len >= sizeof(*msg));
+
+        data = (uint8_t *) msg + sizeof(*msg);
+        dlen = len - sizeof(*msg);
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        fd = eid_to_fd(ntoh64(msg->r_eid));
+        if (fd < 0) {
+                pthread_rwlock_unlock(&fa.flows_lock);
+                return -ENOTALLOC;
+        }
+
+        flow = &fa.flows[fd];
+
+        flow->r_eid = ntoh64(msg->s_eid);
+
+        if (msg->response < 0)
+                fa_flow_fini(flow);
+        else
+                psched_add(fa.psched, fd);
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        if (ipcp_flow_alloc_reply(fd, msg->response, data, dlen))
+                return -EIRMD;
+
+        return 0;
+}
+
+static int fa_handle_flow_update(struct fa_msg * msg,
+                                 size_t          len)
+{
+        struct fa_flow * flow;
+        int              fd;
+
+        assert(len >= sizeof(*msg));
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        fd = eid_to_fd(ntoh64(msg->r_eid));
+        if (fd < 0) {
+                pthread_rwlock_unlock(&fa.flows_lock);
+                return -EPERM;
+        }
+
+        flow = &fa.flows[fd];
+#ifdef IPCP_FLOW_STATS
+        flow->u_rcv++;
+#endif
+        ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        return 0;
+}
+
+static void * fa_handle_packet(void * o)
+{
         (void) o;
 
         while (true) {
-                struct timespec  abstime;
-                int              fd;
                 uint8_t          buf[MSGBUFSZ];
                 struct fa_msg *  msg;
-                qosspec_t        qs;
-                struct cmd *     cmd;
                 size_t           len;
-                size_t           msg_len;
-                struct fa_flow * flow;
-
-                pthread_mutex_lock(&fa.mtx);
-
-                pthread_cleanup_push(__cleanup_mutex_unlock, &fa.mtx);
-
-                while (list_is_empty(&fa.cmds))
-                        pthread_cond_wait(&fa.cond, &fa.mtx);
-
-                cmd = list_last_entry(&fa.cmds, struct cmd, next);
-                list_del(&cmd->next);
-
-                pthread_cleanup_pop(true);
-
-                len = shm_du_buff_tail(cmd->sdb) - shm_du_buff_head(cmd->sdb);
-
-                if (len > MSGBUFSZ) {
-                        log_err("Message over buffer size.");
-                        free(cmd);
-                        continue;
-                }
 
                 msg = (struct fa_msg *) buf;
 
-                /* Depending on the message call the function in ipcp-dev.h */
-
-                memcpy(msg, shm_du_buff_head(cmd->sdb), len);
-
-                ipcp_sdb_release(cmd->sdb);
-
-                free(cmd);
+                len = fa_wait_for_fa_msg(msg);
+                if (len == 0)
+                        continue;
 
                 switch (msg->code) {
                 case FLOW_REQ:
-                        msg_len = sizeof(*msg) + ipcp_dir_hash_len();
-
-                        assert(len >= msg_len);
-
-                        clock_gettime(PTHREAD_COND_CLOCK, &abstime);
-
-                        pthread_mutex_lock(&ipcpi.alloc_lock);
-
-                        while (ipcpi.alloc_id != -1 &&
-                               ipcp_get_state() == IPCP_OPERATIONAL) {
-                                ts_add(&abstime, &ts, &abstime);
-                                pthread_cond_timedwait(&ipcpi.alloc_cond,
-                                                       &ipcpi.alloc_lock,
-                                                       &abstime);
-                        }
-
-                        if (ipcp_get_state() != IPCP_OPERATIONAL) {
-                                pthread_mutex_unlock(&ipcpi.alloc_lock);
-                                log_dbg("Won't allocate over non-operational"
-                                        "IPCP.");
-                                continue;
-                        }
-
-                        assert(ipcpi.alloc_id == -1);
-
-                        qs.delay        = ntoh32(msg->delay);
-                        qs.bandwidth    = ntoh64(msg->bandwidth);
-                        qs.availability = msg->availability;
-                        qs.loss         = ntoh32(msg->loss);
-                        qs.ber          = ntoh32(msg->ber);
-                        qs.in_order     = msg->in_order;
-                        qs.max_gap      = ntoh32(msg->max_gap);
-                        qs.cypher_s     = ntoh16(msg->cypher_s);
-
-                        fd = ipcp_flow_req_arr((uint8_t *) (msg + 1),
-                                               ipcp_dir_hash_len(),
-                                               qs,
-                                               buf + msg_len,
-                                               len - msg_len);
-                        if (fd < 0) {
-                                pthread_mutex_unlock(&ipcpi.alloc_lock);
-                                log_err("Failed to get fd for flow.");
-                                continue;
-                        }
-
-                        flow = &fa.flows[fd];
-
-                        pthread_rwlock_wrlock(&fa.flows_lock);
-
-                        fa_flow_init(flow);
-
-                        flow->s_eid  = gen_eid(fd);
-                        flow->r_eid  = ntoh64(msg->s_eid);
-                        flow->r_addr = ntoh64(msg->s_addr);
-
-                        pthread_rwlock_unlock(&fa.flows_lock);
-
-                        ipcpi.alloc_id = fd;
-                        pthread_cond_broadcast(&ipcpi.alloc_cond);
-
-                        pthread_mutex_unlock(&ipcpi.alloc_lock);
-
+                        if (fa_handle_flow_req(msg, len) < 0)
+                                log_err("Error handling flow alloc request.");
                         break;
                 case FLOW_REPLY:
-                        assert(len >= sizeof(*msg));
-
-                        pthread_rwlock_wrlock(&fa.flows_lock);
-
-                        fd = eid_to_fd(ntoh64(msg->r_eid));
-                        if (fd < 0) {
-                                pthread_rwlock_unlock(&fa.flows_lock);
-                                break;
-                        }
-
-                        flow = &fa.flows[fd];
-
-                        flow->r_eid = ntoh64(msg->s_eid);
-
-                        if (msg->response < 0)
-                                fa_flow_fini(flow);
-                        else
-                                psched_add(fa.psched, fd);
-
-                        pthread_rwlock_unlock(&fa.flows_lock);
-
-                        ipcp_flow_alloc_reply(fd,
-                                              msg->response,
-                                              buf + sizeof(*msg),
-                                              len - sizeof(*msg));
+                        if (fa_handle_flow_reply(msg, len) < 0)
+                                log_err("Error handling flow reply.");
                         break;
                 case FLOW_UPDATE:
-                        assert(len >= sizeof(*msg));
-
-                        pthread_rwlock_wrlock(&fa.flows_lock);
-
-                        fd = eid_to_fd(ntoh64(msg->r_eid));
-                        if (fd < 0) {
-                                pthread_rwlock_unlock(&fa.flows_lock);
-                                break;
-                        }
-
-                        flow = &fa.flows[fd];
-#ifdef IPCP_FLOW_STATS
-                        flow->u_rcv++;
-#endif
-                        ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
-
-                        pthread_rwlock_unlock(&fa.flows_lock);
-
+                        if (fa_handle_flow_update(msg, len) < 0)
+                                log_err("Error handling flow update.");
                         break;
                 default:
-                        log_err("Got an unknown flow allocation message.");
+                        log_warn("Recieved unknown flow allocation message.");
                         break;
                 }
         }
-- 
2.35.1


Other related posts:

  • » [PATCH] ipcpd: Refactor flow allocator message handling - Dimitri Staessens