[PATCH 3/4] lib: Block on closed flow control window

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sat, 10 Oct 2020 15:34:27 +0200

If the sending window for flow control is closed, the sending
application will now block until the window opens. Beware that until
the rendez-vous mechanism is implemented, shutting down a server while
the client is sending (with non-timed-out blocking write) will cause
the client to hang indefinitely because its window will close.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 src/lib/dev.c  |  38 +++++++++++------
 src/lib/frct.c | 109 ++++++++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 129 insertions(+), 18 deletions(-)

diff --git a/src/lib/dev.c b/src/lib/dev.c
index 5371527..14e6062 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1018,6 +1018,8 @@ ssize_t flow_write(int          fd,
         int                  flags;
         struct timespec      abs;
         struct timespec *    abstime = NULL;
+        struct timespec      tic = {0, TICTIME};
+        struct timespec      tictime;
         struct shm_du_buff * sdb;
         uint8_t *            ptr;
 
@@ -1038,6 +1040,8 @@ ssize_t flow_write(int          fd,
                 return -ENOTALLOC;
         }
 
+        ts_add(&tic, &abs, &tictime);
+
         if (ai.flows[fd].snd_timesout) {
                 ts_add(&abs, &flow->snd_timeo, &abs);
                 abstime = &abs;
@@ -1050,18 +1054,26 @@ ssize_t flow_write(int          fd,
         if ((flags & FLOWFACCMODE) == FLOWFRDONLY)
                 return -EPERM;
 
-        /* TODO: partial writes. */
-        if (flags & FLOWFWNOBLOCK)
-                idx = shm_rdrbuff_alloc(ai.rdrb,
-                                        count,
-                                        &ptr,
-                                        &sdb);
-        else  /* Blocking. */
-                idx = shm_rdrbuff_alloc_b(ai.rdrb,
-                                          count,
-                                          &ptr,
-                                          &sdb,
-                                          abstime);
+        if (flags & FLOWFWNOBLOCK) {
+                if (!frcti_is_window_open(flow->frcti))
+                        return -EAGAIN;
+                idx = shm_rdrbuff_alloc(ai.rdrb, count, &ptr, &sdb);
+        } else {
+                while((ret = frcti_window_wait(flow->frcti, &tictime)) < 0) {
+
+                        if (ret != -ETIMEDOUT)
+                                return ret;
+
+                        if (abstime != NULL && ts_diff_ns(&tictime, &abs) <= 0)
+                                return -ETIMEDOUT;
+
+                        frcti_tick(flow->frcti);
+
+                        ts_add(&tictime, &tic, &tictime);
+                }
+                idx = shm_rdrbuff_alloc_b(ai.rdrb, count, &ptr, &sdb, abstime);
+        }
+
         if (idx < 0)
                 return idx;
 
@@ -1161,7 +1173,7 @@ ssize_t flow_read(int    fd,
                                         return idx;
 
                                 if (abstime != NULL
-                                    && ts_diff_ns(&tictime, &abs) < 0)
+                                    && ts_diff_ns(&tictime, &abs) <= 0)
                                         return -ETIMEDOUT;
 
                                 ts_add(&tictime, &tic, &tictime);
diff --git a/src/lib/frct.c b/src/lib/frct.c
index fc6c5e8..26bb004 100644
--- a/src/lib/frct.c
+++ b/src/lib/frct.c
@@ -62,6 +62,12 @@ struct frcti {
 
         ssize_t           rq[RQ_SIZE];
         pthread_rwlock_t  lock;
+
+        bool              open;        /* Window open/closed     */
+        size_t            wnd;         /* Window size            */
+        struct timespec   t_wnd;       /* Window closed time     */
+        pthread_cond_t    cond;
+        pthread_mutex_t   mtx;
 };
 
 enum frct_flags {
@@ -181,6 +187,12 @@ static struct frcti * frcti_create(int fd)
         if (pthread_rwlock_init(&frcti->lock, NULL))
                 goto fail_lock;
 
+        if (pthread_mutex_init(&frcti->mtx, NULL))
+                goto fail_mutex;
+
+        if (pthread_cond_init(&frcti->cond, NULL))
+                goto fail_cond;
+
         for (idx = 0; idx < RQ_SIZE; ++idx)
                 frcti->rq[idx] = -1;
 
@@ -216,6 +228,10 @@ static struct frcti * frcti_create(int fd)
 
         return frcti;
 
+ fail_cond:
+        pthread_mutex_destroy(&frcti->mtx);
+ fail_mutex:
+        pthread_rwlock_destroy(&frcti->lock);
  fail_lock:
         free(frcti);
  fail_malloc:
@@ -224,6 +240,8 @@ static struct frcti * frcti_create(int fd)
 
 static void frcti_destroy(struct frcti * frcti)
 {
+        pthread_cond_destroy(&frcti->cond);
+        pthread_mutex_destroy(&frcti->mtx);
         pthread_rwlock_destroy(&frcti->lock);
 
         free(frcti);
@@ -275,6 +293,75 @@ static void frcti_setflags(struct frcti * frcti,
 #define frcti_dealloc(frcti)                            \
         (frcti == NULL ? 0 : __frcti_dealloc(frcti))
 
+#define frcti_is_window_open(frcti)                     \
+        (frcti == NULL ? true : __frcti_is_window_open(frcti))
+
+#define frcti_window_wait(frcti, abstime)               \
+        (frcti == NULL ? 0 : __frcti_window_wait(frcti, abstime))
+
+
+static bool __frcti_is_window_open(struct frcti * frcti)
+{
+        struct frct_cr * snd_cr = &frcti->snd_cr;
+        int ret                 = true;
+
+        pthread_rwlock_rdlock(&frcti->lock);
+
+        if (snd_cr->cflags & FRCTFRESCNTL)
+                ret = before(snd_cr->seqno, snd_cr->rwe);
+
+        if (!ret) {
+                pthread_mutex_lock(&frcti->mtx);
+                if (frcti->open) {
+                        clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
+                        frcti->open = false;
+                }
+                pthread_mutex_unlock(&frcti->mtx);
+        }
+
+        pthread_rwlock_unlock(&frcti->lock);
+
+        return ret;
+}
+
+static int __frcti_window_wait(struct frcti *    frcti,
+                               struct timespec * abstime)
+{
+        struct frct_cr * snd_cr = &frcti->snd_cr;
+        int ret                 = 0;
+
+        pthread_rwlock_rdlock(&frcti->lock);
+
+        if (!(snd_cr->cflags & FRCTFRESCNTL)) {
+                pthread_rwlock_unlock(&frcti->lock);
+                return 0;
+        }
+
+        while (snd_cr->seqno == snd_cr->rwe && ret != -ETIMEDOUT) {
+                pthread_rwlock_unlock(&frcti->lock);
+                pthread_mutex_lock(&frcti->mtx);
+
+                if (frcti->open) {
+                        clock_gettime(PTHREAD_COND_CLOCK, &frcti->t_wnd);
+                        frcti->open = false;
+                }
+
+                pthread_cleanup_push((void(*)(void *))pthread_mutex_unlock,
+                                     (void *) &frcti->mtx);
+
+                ret = -pthread_cond_timedwait(&frcti->cond,
+                                              &frcti->mtx,
+                                              abstime);
+
+                pthread_cleanup_pop(true);
+                pthread_rwlock_rdlock(&frcti->lock);
+        }
+
+        pthread_rwlock_unlock(&frcti->lock);
+
+        return ret;
+}
+
 static ssize_t __frcti_queued_pdu(struct frcti * frcti)
 {
         ssize_t idx;
@@ -373,7 +460,7 @@ static int __frcti_snd(struct frcti *       frcti,
 
         pci = (struct frct_pci *) shm_du_buff_head_alloc(sdb, FRCT_PCILEN);
         if (pci == NULL)
-                return -1;
+                return -ENOMEM;
 
         memset(pci, 0, sizeof(*pci));
 
@@ -509,11 +596,23 @@ static void __frcti_rcv(struct frcti *       frcti,
         }
 
         if (pci->flags & FRCT_FC) {
-                uint32_t rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
-                if (before(rwe, snd_cr->lwe & 0x00FFFFFF))
-                        snd_cr->rwe += 0x01000000;
+                uint32_t rwe;
 
-                snd_cr->rwe = (snd_cr->rwe & 0xFF000000) + rwe;
+                rwe = ntoh32(*((uint32_t *)pci) & hton32(0x00FFFFFF));
+                rwe |= snd_cr->rwe & 0xFF000000;
+
+                /* Rollover for 24 bit */
+                if (before(rwe, snd_cr->rwe) && snd_cr->rwe - rwe > 0x007FFFFF)
+                        rwe += 0x01000000;
+
+                snd_cr->rwe = rwe;
+
+                pthread_mutex_lock(&frcti->mtx);
+                if (!frcti->open) {
+                        frcti->open = true;
+                        pthread_cond_broadcast(&frcti->cond);
+                }
+                pthread_mutex_unlock(&frcti->mtx);
         }
 
         if (!(pci->flags & FRCT_DATA))
-- 
2.28.0


Other related posts:

  • » [PATCH 3/4] lib: Block on closed flow control window - Dimitri Staessens