[PATCH 1/3] ipcpd: Decouple flow allocator from dt thread

  • From: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Thu, 11 Oct 2018 01:44:28 +0200

The flow allocator passed a blocking callback to the forwarding
component, which blocks packet processing.

Signed-off-by: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
---
 src/ipcpd/normal/fa.c | 258 ++++++++++++++++++++++++++++--------------
 1 file changed, 172 insertions(+), 86 deletions(-)

diff --git a/src/ipcpd/normal/fa.c b/src/ipcpd/normal/fa.c
index 027223b..2a67831 100644
--- a/src/ipcpd/normal/fa.c
+++ b/src/ipcpd/normal/fa.c
@@ -68,13 +68,23 @@ struct fa_msg {
         uint32_t max_gap;
 } __attribute__((packed));
 
+struct cmd {
+        struct list_head     next;
+        struct shm_du_buff * sdb;
+};
+
 struct {
-        pthread_rwlock_t   flows_lock;
-        int                r_eid[PROG_MAX_FLOWS];
-        uint64_t           r_addr[PROG_MAX_FLOWS];
-        int                fd;
+        pthread_rwlock_t flows_lock;
+        int              r_eid[PROG_MAX_FLOWS];
+        uint64_t         r_addr[PROG_MAX_FLOWS];
+        int              fd;
 
-        struct psched *    psched;
+        struct list_head cmds;
+        pthread_cond_t   cond;
+        pthread_mutex_t  mtx;
+        pthread_t        worker;
+
+        struct psched *  psched;
 } fa;
 
 static void packet_handler(int                  fd,
@@ -100,109 +110,157 @@ static void destroy_conn(int fd)
 }
 
 static void fa_post_packet(void *               comp,
-                        struct shm_du_buff * sdb)
+                           struct shm_du_buff * sdb)
 {
-        struct timespec ts  = {0, TIMEOUT * 1000};
-        struct timespec abstime;
-        int             fd;
-        uint8_t *       buf;
-        struct fa_msg * msg;
-        qosspec_t       qs;
-
-        (void) comp;
+        struct cmd * cmd;
 
         assert(comp == &fa);
-        assert(sdb);
 
-        buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
-        if (buf == NULL)
+        (void) comp;
+
+        cmd = malloc(sizeof(*cmd));
+        if (cmd == NULL) {
+                log_err("Command failed. Out of memory.");
                 return;
+        }
 
-        msg = (struct fa_msg *) buf;
+        cmd->sdb = sdb;
 
-        /* Depending on the message call the function in ipcp-dev.h */
+        pthread_mutex_lock(&fa.mtx);
 
-        memcpy(msg, shm_du_buff_head(sdb),
-               shm_du_buff_tail(sdb) - shm_du_buff_head(sdb));
+        list_add(&cmd->next, &fa.cmds);
 
-        ipcp_sdb_release(sdb);
+        pthread_cond_signal(&fa.cond);
 
-        switch (msg->code) {
-        case FLOW_REQ:
-                clock_gettime(PTHREAD_COND_CLOCK, &abstime);
+        pthread_mutex_unlock(&fa.mtx);
+}
 
-                pthread_mutex_lock(&ipcpi.alloc_lock);
+static void * fa_handle_packet(void * o)
+{
+        struct timespec ts  = {0, TIMEOUT * 1000};
 
-                while (ipcpi.alloc_id != -1 &&
-                       ipcp_get_state() == IPCP_OPERATIONAL) {
-                        ts_add(&abstime, &ts, &abstime);
-                        pthread_cond_timedwait(&ipcpi.alloc_cond,
-                                               &ipcpi.alloc_lock,
-                                               &abstime);
-                }
+        (void) o;
 
-                if (ipcp_get_state() != IPCP_OPERATIONAL) {
-                        log_dbg("Won't allocate over non-operational IPCP.");
-                        pthread_mutex_unlock(&ipcpi.alloc_lock);
-                        free(msg);
-                        return;
-                }
+        while (true) {
+                struct timespec abstime;
+                int             fd;
+                uint8_t *       buf;
+                struct fa_msg * msg;
+                qosspec_t       qs;
+                struct cmd *    cmd;
 
-                assert(ipcpi.alloc_id == -1);
-
-                qs.delay        = ntoh32(msg->delay);
-                qs.bandwidth    = ntoh64(msg->bandwidth);
-                qs.availability = msg->availability;
-                qs.loss         = ntoh32(msg->loss);
-                qs.ber          = ntoh32(msg->ber);
-                qs.in_order     = msg->in_order;
-                qs.max_gap      = ntoh32(msg->max_gap);
-
-                fd = ipcp_flow_req_arr(getpid(),
-                                       (uint8_t *) (msg + 1),
-                                       ipcp_dir_hash_len(),
-                                       qs);
-                if (fd < 0) {
-                        pthread_mutex_unlock(&ipcpi.alloc_lock);
-                        log_err("Failed to get fd for flow.");
-                        free(msg);
-                        return;
+                pthread_mutex_lock(&fa.mtx);
+
+                pthread_cleanup_push((void *)(void *) pthread_mutex_unlock,
+                                     &fa.mtx);
+
+                while (list_is_empty(&fa.cmds))
+                        pthread_cond_wait(&fa.cond, &fa.mtx);
+
+                cmd = list_last_entry(&fa.cmds, struct cmd, next);
+                list_del(&cmd->next);
+
+                pthread_cleanup_pop(true);
+
+                buf = malloc(sizeof(*msg) + ipcp_dir_hash_len());
+                if (buf == NULL) {
+                        log_err("Failed to allocate memory.");
+                        continue;
                 }
 
-                pthread_rwlock_wrlock(&fa.flows_lock);
+                msg = (struct fa_msg *) buf;
 
-                fa.r_eid[fd]  = ntoh32(msg->s_eid);
-                fa.r_addr[fd] = ntoh64(msg->s_addr);
+                /* Depending on the message call the function in ipcp-dev.h */
 
-                pthread_rwlock_unlock(&fa.flows_lock);
+                assert(sizeof(*msg) + ipcp_dir_hash_len() >=
+                       shm_du_buff_tail(cmd->sdb) - 
shm_du_buff_head(cmd->sdb));
 
-                ipcpi.alloc_id = fd;
-                pthread_cond_broadcast(&ipcpi.alloc_cond);
+                memcpy(msg, shm_du_buff_head(cmd->sdb),
+                       shm_du_buff_tail(cmd->sdb) - 
shm_du_buff_head(cmd->sdb));
 
-                pthread_mutex_unlock(&ipcpi.alloc_lock);
+                ipcp_sdb_release(cmd->sdb);
 
-                break;
-        case FLOW_REPLY:
-                pthread_rwlock_wrlock(&fa.flows_lock);
+                free(cmd);
 
-                fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+                switch (msg->code) {
+                case FLOW_REQ:
+                        clock_gettime(PTHREAD_COND_CLOCK, &abstime);
 
-                ipcp_flow_alloc_reply(ntoh32(msg->r_eid), msg->response);
+                        pthread_mutex_lock(&ipcpi.alloc_lock);
 
-                if (msg->response < 0)
-                        destroy_conn(ntoh32(msg->r_eid));
-                else
-                        psched_add(fa.psched, ntoh32(msg->r_eid));
+                        while (ipcpi.alloc_id != -1 &&
+                               ipcp_get_state() == IPCP_OPERATIONAL) {
+                                ts_add(&abstime, &ts, &abstime);
+                                pthread_cond_timedwait(&ipcpi.alloc_cond,
+                                                       &ipcpi.alloc_lock,
+                                                       &abstime);
+                        }
 
-                pthread_rwlock_unlock(&fa.flows_lock);
+                        if (ipcp_get_state() != IPCP_OPERATIONAL) {
+                                log_dbg("Won't allocate over non-operational"
+                                        "IPCP.");
+                                pthread_mutex_unlock(&ipcpi.alloc_lock);
+                                free(msg);
+                                continue;
+                        }
 
-                break;
-        default:
-                log_err("Got an unknown flow allocation message.");
-                break;
-        }
+                        assert(ipcpi.alloc_id == -1);
+
+                        qs.delay        = ntoh32(msg->delay);
+                        qs.bandwidth    = ntoh64(msg->bandwidth);
+                        qs.availability = msg->availability;
+                        qs.loss         = ntoh32(msg->loss);
+                        qs.ber          = ntoh32(msg->ber);
+                        qs.in_order     = msg->in_order;
+                        qs.max_gap      = ntoh32(msg->max_gap);
 
-        free(msg);
+                        fd = ipcp_flow_req_arr(getpid(),
+                                               (uint8_t *) (msg + 1),
+                                               ipcp_dir_hash_len(),
+                                               qs);
+                        if (fd < 0) {
+                                pthread_mutex_unlock(&ipcpi.alloc_lock);
+                                log_err("Failed to get fd for flow.");
+                                free(msg);
+                                continue;
+                        }
+
+                        pthread_rwlock_wrlock(&fa.flows_lock);
+
+                        fa.r_eid[fd]  = ntoh32(msg->s_eid);
+                        fa.r_addr[fd] = ntoh64(msg->s_addr);
+
+                        pthread_rwlock_unlock(&fa.flows_lock);
+
+                        ipcpi.alloc_id = fd;
+                        pthread_cond_broadcast(&ipcpi.alloc_cond);
+
+                        pthread_mutex_unlock(&ipcpi.alloc_lock);
+
+                        break;
+                case FLOW_REPLY:
+                        pthread_rwlock_wrlock(&fa.flows_lock);
+
+                        fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+
+                        ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
+                                              msg->response);
+
+                        if (msg->response < 0)
+                                destroy_conn(ntoh32(msg->r_eid));
+                        else
+                                psched_add(fa.psched, ntoh32(msg->r_eid));
+
+                        pthread_rwlock_unlock(&fa.flows_lock);
+
+                        break;
+                default:
+                        log_err("Got an unknown flow allocation message.");
+                        break;
+                }
+
+                free(msg);
+        }
 }
 
 int fa_init(void)
@@ -213,31 +271,59 @@ int fa_init(void)
                 destroy_conn(i);
 
         if (pthread_rwlock_init(&fa.flows_lock, NULL))
-                return -1;
+                goto fail_rwlock;
+
+        if (pthread_mutex_init(&fa.mtx, NULL))
+                goto fail_mtx;
+
+        if (pthread_cond_init(&fa.cond, NULL))
+                goto fail_cond;
+
+        list_head_init(&fa.cmds);
 
         fa.fd = dt_reg_comp(&fa, &fa_post_packet, FA);
 
         return 0;
+
+ fail_cond:
+        pthread_mutex_destroy(&fa.mtx);
+ fail_mtx:
+        pthread_rwlock_destroy(&fa.flows_lock);
+ fail_rwlock:
+        log_err("Failed to initialize flow allocator.");
+        return -1;
 }
 
 void fa_fini(void)
 {
+        pthread_cond_destroy(&fa.cond);;
+        pthread_mutex_destroy(&fa.mtx);
         pthread_rwlock_destroy(&fa.flows_lock);
 }
 
 int fa_start(void)
 {
         fa.psched = psched_create(packet_handler);
-        if (fa.psched == NULL) {
-                log_err("Failed to create packet scheduler.");
-                return -1;
-        }
+        if (fa.psched == NULL)
+                goto fail_psched;
+
+        if (pthread_create(&fa.worker, NULL, fa_handle_packet, NULL))
+                goto fail_thread;
 
         return 0;
+
+ fail_thread:
+        psched_destroy(fa.psched);
+ fail_psched:
+        log_err("Failed to start flow allocator.");
+        return -1;
 }
 
 void fa_stop(void)
 {
+        pthread_cancel(fa.worker);
+        pthread_join(fa.worker, NULL);
+
         psched_destroy(fa.psched);
 }
 
-- 
2.19.1


Other related posts:

  • » [PATCH 1/3] ipcpd: Decouple flow allocator from dt thread - Dimitri Staessens