[PATCH v2] lib: Add event types to fqueue

  • From: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Mon, 14 May 2018 09:20:44 +0200

The event type of the current event in the fqueue can now be requested
using the fqueue_type() command. Currently events for packets
(FLOW_PKT), flows (FLOW_UP, FLOW_DOWN) and allocation (FLOW_ALLOC,
FLOW_DEALLOC) are specified. The implementation only tracks FLOW_PKT
at this point.

Signed-off-by: Dimitri Staessens <dimitri.staessens@xxxxxxxx>
---
 doc/man/fqueue.3                 | 31 +++++++++++++++++--
 include/ouroboros/fqueue.h       | 40 +++++++++++++++---------
 include/ouroboros/shm_flow_set.h |  3 +-
 src/lib/dev.c                    | 52 +++++++++++++++++++-------------
 src/lib/shm_flow_set.c           | 38 +++++++++++++----------
 5 files changed, 108 insertions(+), 56 deletions(-)

diff --git a/doc/man/fqueue.3 b/doc/man/fqueue.3
index 00c28d4..abd21cf 100644
--- a/doc/man/fqueue.3
+++ b/doc/man/fqueue.3
@@ -19,6 +19,8 @@ on flows
 
 \fBint fqueue_next(fqueue_t * \fIfq\fB);
 
+\fBint fqueue_type(fqueue_t * \fIfq\fB);
+
 \fBint fevent(fset_t * \fIset\fB, fqueue_t * \fIfq\fB,
 const struct timespec * \fItimeo\fB);
 
@@ -36,6 +38,22 @@ an \fBfqueue_t\fR \fIfq\fR.
 The \fBfqueue_next\fR() function retrieves the next event (a \fIflow
 descriptor\fR) that is ready within the event queue \fIfq\fR.
 
+The \fBfqueue_type\fR() function retrieves the type for the current
+event on the fd that was returned by \fBfqueue_next\fR(). Event types
+are:
+.RS 4
+FLOW_PKT: A new packet arrived on this flow and is ready for reading.
+
+FLOW_UP: The flow is now marked UP and ready for read/write.
+
+FLOW_DOWN: The flow is now marked DOWN and cannot be written to.
+
+FLOW_ALLOC: A pending flow is now allocated.
+
+FLOW_DEALLOC: The flow is deallocated by the other (N+1 or N-1)
+process.
+.RE
+
 The \fBfevent\fR() function retrieves all events that occured on any
 \fIflow descriptor\fR within \fIset\fR and returns them in the event
 queue \fBfq\fR. If a \fBstruct timespec *\fI timeo\fR can be provided,
@@ -50,7 +68,14 @@ On success, \fBfqueue_create\fR() returns a pointer to an
 
 \fBfqueue_destroy\fR() has no return value.
 
-On success, \fBfevent\fR() returns the number of events that occured in 
\fIset\fR.
+On success, \fBfevent\fR() returns the number of events that occured
+in \fIset\fR.
+
+On success, \fBfqueue_next\fR() returns the next file descriptor for
+which an event occurred.
+
+On success, \fBfqueue_type\fR() returns the event type for the last
+event returned by \fBfqueue_next\fR().
 
 .SH ERRORS
 
@@ -62,10 +87,10 @@ were available to create the \fBfqueue_t\fR.
 .B -EINVAL
 An invalid argument was passed (\fIfq\fR or \fIset\fR was \fINULL\fR).
 
-In addition, \fBfqueue_next\fR() can return
+In addition, \fBfqueue_next\fR() or \fBqueue_type\fR() can return
 
 .B -EPERM
-No more fds available in \fIfq\fR.
+No more fds available or no current event in \fIfq\fR.
 
 and \fBfevent\fR() can return
 
diff --git a/include/ouroboros/fqueue.h b/include/ouroboros/fqueue.h
index 1b10266..8a5dc98 100644
--- a/include/ouroboros/fqueue.h
+++ b/include/ouroboros/fqueue.h
@@ -28,6 +28,14 @@
 #include <stdbool.h>
 #include <time.h>
 
+enum fqtype {
+        FLOW_PKT = 0,
+        FLOW_DOWN,
+        FLOW_UP,
+        FLOW_ALLOC,
+        FLOW_DEALLOC
+};
+
 struct flow_set;
 
 struct fqueue;
@@ -37,30 +45,32 @@ typedef struct fqueue fqueue_t;
 
 __BEGIN_DECLS
 
-fset_t *   fset_create(void);
+fset_t *    fset_create(void);
+
+void        fset_destroy(fset_t * set);
 
-void       fset_destroy(fset_t * set);
+fqueue_t *  fqueue_create(void);
 
-fqueue_t * fqueue_create(void);
+void        fqueue_destroy(struct fqueue * fq);
 
-void       fqueue_destroy(struct fqueue * fq);
+void        fset_zero(fset_t * set);
 
-void       fset_zero(fset_t * set);
+int         fset_add(fset_t * set,
+                     int      fd);
 
-int        fset_add(fset_t * set,
-                    int      fd);
+bool        fset_has(const fset_t * set,
+                     int            fd);
 
-bool       fset_has(const fset_t * set,
-                    int            fd);
+void        fset_del(fset_t * set,
+                     int      fd);
 
-void       fset_del(fset_t * set,
-                    int      fd);
+int         fqueue_next(fqueue_t * fq);
 
-int        fqueue_next(fqueue_t * fq);
+enum fqtype fqueue_type(fqueue_t * fq);
 
-int        fevent(fset_t *                set,
-                  fqueue_t *              fq,
-                  const struct timespec * timeo);
+int         fevent(fset_t *                set,
+                   fqueue_t *              fq,
+                   const struct timespec * timeo);
 
 __END_DECLS
 
diff --git a/include/ouroboros/shm_flow_set.h b/include/ouroboros/shm_flow_set.h
index 7684913..ebf63af 100644
--- a/include/ouroboros/shm_flow_set.h
+++ b/include/ouroboros/shm_flow_set.h
@@ -53,7 +53,8 @@ void                  shm_flow_set_del(struct shm_flow_set * 
shm_set,
                                        int                   port_id);
 
 void                  shm_flow_set_notify(struct shm_flow_set * set,
-                                          int                   port_id);
+                                          int                   port_id,
+                                          int                   event);
 
 ssize_t               shm_flow_set_wait(const struct shm_flow_set * shm_set,
                                         size_t                      idx,
diff --git a/src/lib/dev.c b/src/lib/dev.c
index edcf56e..3d854c2 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -65,7 +65,7 @@ struct flow_set {
 };
 
 struct fqueue {
-        int    fqueue[SHM_BUFFER_SIZE]; /* Safe copy from shm. */
+        int    fqueue[2 * SHM_BUFFER_SIZE]; /* Safe copy from shm. */
         size_t fqsize;
         size_t next;
 };
@@ -875,7 +875,7 @@ ssize_t flow_write(int          fd,
         if (ret < 0)
                 shm_rdrbuff_remove(ai.rdrb, idx);
         else
-                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+                shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
 
         pthread_rwlock_unlock(&ai.lock);
 
@@ -1039,7 +1039,7 @@ void fset_zero(struct flow_set * set)
 int fset_add(struct flow_set * set,
              int               fd)
 {
-        int ret;
+        int    ret;
         size_t sdus;
         size_t i;
 
@@ -1052,7 +1052,7 @@ int fset_add(struct flow_set * set,
 
         sdus = shm_rbuff_queued(ai.flows[fd].rx_rb);
         for (i = 0; i < sdus; i++)
-                shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id);
+                shm_flow_set_notify(ai.fqset, ai.flows[fd].port_id, FLOW_PKT);
 
         pthread_rwlock_unlock(&ai.lock);
 
@@ -1102,23 +1102,31 @@ int fqueue_next(struct fqueue * fq)
         if (fq == NULL)
                 return -EINVAL;
 
-        if (fq->fqsize == 0)
+        if (fq->fqsize == 0 || fq->next == fq->fqsize)
                 return -EPERM;
 
         pthread_rwlock_rdlock(&ai.lock);
 
-        fd = ai.ports[fq->fqueue[fq->next++]].fd;
+        fd = ai.ports[fq->fqueue[fq->next]].fd;
 
-        pthread_rwlock_unlock(&ai.lock);
+        fq->next += 2;
 
-        if (fq->next == fq->fqsize) {
-                fq->fqsize = 0;
-                fq->next = 0;
-        }
+        pthread_rwlock_unlock(&ai.lock);
 
         return fd;
 }
 
+enum fqtype fqueue_type(struct fqueue * fq)
+{
+        if (fq == NULL)
+                return -EINVAL;
+
+        if (fq->fqsize == 0 || fq->next == 0)
+                return -EPERM;
+
+        return fq->fqueue[fq->next - 1];
+}
+
 int fevent(struct flow_set *       set,
            struct fqueue *         fq,
            const struct timespec * timeo)
@@ -1130,11 +1138,9 @@ int fevent(struct flow_set *       set,
         if (set == NULL || fq == NULL)
                 return -EINVAL;
 
-        if (fq->fqsize > 0)
+        if (fq->fqsize > 0 && fq->next != fq->fqsize)
                 return fq->fqsize;
 
-        assert(!fq->next);
-
         if (timeo != NULL) {
                 clock_gettime(PTHREAD_COND_CLOCK, &abstime);
                 ts_add(&abstime, timeo, &abstime);
@@ -1147,7 +1153,8 @@ int fevent(struct flow_set *       set,
                 return -ETIMEDOUT;
         }
 
-        fq->fqsize = ret;
+        fq->fqsize = ret << 1;
+        fq->next   = 0;
 
         assert(ret);
 
@@ -1365,9 +1372,9 @@ int ipcp_flow_write(int                  fd,
                 return -ENOMEM;
         }
 
-        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+        ret = shm_rbuff_write(flow->tx_rb, idx);
         if (ret == 0)
-                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+                shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
 
         pthread_rwlock_unlock(&ai.lock);
 
@@ -1454,20 +1461,23 @@ ssize_t local_flow_read(int fd)
 int local_flow_write(int    fd,
                      size_t idx)
 {
-        int ret;
+        struct flow * flow;
+        int           ret;
 
         assert(fd >= 0);
 
+        flow = &ai.flows[fd];
+
         pthread_rwlock_rdlock(&ai.lock);
 
-        if (ai.flows[fd].port_id < 0) {
+        if (flow->port_id < 0) {
                 pthread_rwlock_unlock(&ai.lock);
                 return -ENOTALLOC;
         }
 
-        ret = shm_rbuff_write(ai.flows[fd].tx_rb, idx);
+        ret = shm_rbuff_write(flow->tx_rb, idx);
         if (ret == 0)
-                shm_flow_set_notify(ai.flows[fd].set, ai.flows[fd].port_id);
+                shm_flow_set_notify(flow->set, flow->port_id, FLOW_PKT);
 
         pthread_rwlock_unlock(&ai.lock);
 
diff --git a/src/lib/shm_flow_set.c b/src/lib/shm_flow_set.c
index d2107fc..bb9e3ca 100644
--- a/src/lib/shm_flow_set.c
+++ b/src/lib/shm_flow_set.c
@@ -27,7 +27,6 @@
 #include <ouroboros/lockfile.h>
 #include <ouroboros/time_utils.h>
 #include <ouroboros/shm_flow_set.h>
-#include <ouroboros/fqueue.h>
 #include <ouroboros/errno.h>
 
 #include <pthread.h>
@@ -54,24 +53,29 @@
 
 #define FN_MAX_CHARS 255
 
-#define FQUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(int))
+#define QUEUESIZE ((SHM_BUFFER_SIZE) * sizeof(struct portevent))
 
 #define SHM_FLOW_SET_FILE_SIZE (SYS_MAX_FLOWS * sizeof(ssize_t)             \
                                 + PROG_MAX_FQUEUES * sizeof(size_t)         \
                                 + PROG_MAX_FQUEUES * sizeof(pthread_cond_t) \
-                                + PROG_MAX_FQUEUES * FQUEUESIZE             \
+                                + PROG_MAX_FQUEUES * QUEUESIZE              \
                                 + sizeof(pthread_mutex_t))
 
 #define fqueue_ptr(fs, idx) (fs->fqueues + (SHM_BUFFER_SIZE) * idx)
 
+struct portevent {
+        int port_id;
+        int event;
+};
+
 struct shm_flow_set {
-        ssize_t *         mtable;
-        size_t *          heads;
-        pthread_cond_t *  conds;
-        int *             fqueues;
-        pthread_mutex_t * lock;
+        ssize_t *          mtable;
+        size_t *           heads;
+        pthread_cond_t *   conds;
+        struct portevent * fqueues;
+        pthread_mutex_t *  lock;
 
-        pid_t             pid;
+        pid_t pid;
 };
 
 struct shm_flow_set * shm_flow_set_create()
@@ -125,7 +129,7 @@ struct shm_flow_set * shm_flow_set_create()
         set->mtable  = shm_base;
         set->heads   = (size_t *) (set->mtable + SYS_MAX_FLOWS);
         set->conds   = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
-        set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES);
+        set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);
         set->lock    = (pthread_mutex_t *)
                 (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
 
@@ -191,10 +195,9 @@ struct shm_flow_set * shm_flow_set_open(pid_t pid)
         set->mtable  = shm_base;
         set->heads   = (size_t *) (set->mtable + SYS_MAX_FLOWS);
         set->conds   = (pthread_cond_t *)(set->heads + PROG_MAX_FQUEUES);
-        set->fqueues = (int *) (set->conds + PROG_MAX_FQUEUES);
+        set->fqueues = (struct portevent *) (set->conds + PROG_MAX_FQUEUES);
         set->lock    = (pthread_mutex_t *)
                 (set->fqueues + PROG_MAX_FQUEUES * (SHM_BUFFER_SIZE));
-
         set->pid = pid;
 
         return set;
@@ -316,7 +319,8 @@ int shm_flow_set_has(struct shm_flow_set * set,
 }
 
 void shm_flow_set_notify(struct shm_flow_set * set,
-                         int                   port_id)
+                         int                   port_id,
+                         int                   event)
 {
         assert(set);
         assert(!(port_id < 0) && port_id < SYS_MAX_FLOWS);
@@ -328,8 +332,10 @@ void shm_flow_set_notify(struct shm_flow_set * set,
                 return;
         }
 
-        *(fqueue_ptr(set, set->mtable[port_id]) +
-                     (set->heads[set->mtable[port_id]])++) = port_id;
+        (fqueue_ptr(set, set->mtable[port_id]) +
+         (set->heads[set->mtable[port_id]]))->port_id = port_id;
+        (fqueue_ptr(set, set->mtable[port_id]) +
+         (set->heads[set->mtable[port_id]])++)->event = event;
 
         pthread_cond_signal(&set->conds[set->mtable[port_id]]);
 
@@ -380,7 +386,7 @@ ssize_t shm_flow_set_wait(const struct shm_flow_set * set,
         if (ret != -ETIMEDOUT) {
                 memcpy(fqueue,
                        fqueue_ptr(set, idx),
-                       set->heads[idx] * sizeof(int));
+                       set->heads[idx] * sizeof(struct portevent));
                 ret = set->heads[idx];
                 set->heads[idx] = 0;
         }
-- 
2.17.0


Other related posts: