[PATCH] ipcpd: Add congestion avoidance policies

  • From: Dimitri Staessens <dimitri@ouroboros.rocks>
  • To: ouroboros@xxxxxxxxxxxxx
  • Date: Sun, 29 Nov 2020 12:25:11 +0100

This adds congestion avoidance policies to the unicast IPCP.  The
default policy is a multi-bit explicit congestion avoidance algorithm
based on data-center TCP congestion avoidance (DCTCP) to relay
information about the maximum queue depth that packets experienced to
the receiver. There's also a "nop" policy to disable congestion
avoidance for testing and benchmarking puroposed.

The (initial) API for congestion avoidance policies is:

        void *   (* ctx_create)(void);

        void     (* ctx_destroy)(void * ctx);

These calls create / and or destroy a context for congestion control
for a specific flow. Thread-safety of the context is the
responsability of the flow allocator (operations on the ctx should be
performed under a lock).

        ca_wnd_t (* ctx_update_snd)(void *               ctx,
                                    struct shm_du_buff * sdb);

This is the sender call to update the context, and should be called
for every packet that is sent on the flow. I'm still hesitant to pass
the actual packet instead of just its length, but this is all the
information that is available at the sender side, so the most
flexible. It returns an opaque union type that is used for the call to
check/wait if the congestion window is open or closed (and allowing to
release locks before waiting).

        bool     (* ctx_update_rcv)(void *     ctx,
                                    uint8_t    ecn,
                                    uint16_t * ece);

This is the call to update the flow congestion context on the receiver
side. It should be called for every received packet.  It gets the ecn
value from the packet, and returns the ECE (explicit congestion
experienced) value to be sent to the sender in case of congestion. The
boolean returned signals whether or not a congestion update needs to be sent.

        void     (* ctx_update_ece)(void *   ctx,
                                    uint16_t ece);

This is the call for the sending side top update the context when it
receives an ECE update from the receiver.

        void     (* wnd_wait)(ca_wnd_t wnd);

This is a (blocking) call that waits for the congestion window to
clear. It should be stateless (to avoid waiting under locks). This may
change later on if passing the context is needed for different algorithms.

        uint8_t  (* calc_ecn)(int                  fd,
                              struct shm_du_buff * sdb);

This is the call that intermediate IPCPs(routers) should use to update
the ECN field on passing packets.

The multi-bit ECN policy bases the value for the ECN field on the
depth of the rbuff queue packets will be sent on. I created another
call to grab the queue depth as fccntl is write-locking the
application. We can further optimize this to avoid most locking on the
rbuff.

Signed-off-by: Dimitri Staessens <dimitri@ouroboros.rocks>
---
 doc/man/ouroboros.8                |  13 +-
 include/ouroboros/ipcp-dev.h       |  42 +++---
 include/ouroboros/ipcp.h           |  32 +++--
 src/ipcpd/ipcp.c                   |   4 +-
 src/ipcpd/unicast/CMakeLists.txt   |   3 +
 src/ipcpd/unicast/ca.c             |  98 +++++++++++++
 src/ipcpd/unicast/ca.h             |  60 ++++++++
 src/ipcpd/unicast/dt.c             |  25 ++--
 src/ipcpd/unicast/fa.c             | 218 ++++++++++++++++++++++-------
 src/ipcpd/unicast/fa.h             |   3 +
 src/ipcpd/unicast/main.c           |  10 ++
 src/ipcpd/unicast/pol-ca-ops.h     |  49 +++++++
 src/ipcpd/unicast/pol/ca-mb-ecn.c  | 216 ++++++++++++++++++++++++++++
 src/ipcpd/unicast/pol/ca-mb-ecn.h  |  49 +++++++
 src/ipcpd/unicast/pol/ca-nop.c     |  91 ++++++++++++
 src/ipcpd/unicast/pol/ca-nop.h     |  49 +++++++
 src/ipcpd/unicast/pol/link_state.c |   6 +-
 src/lib/dev.c                      |  15 ++
 src/lib/ipcp_config.proto          |  13 +-
 src/lib/irm.c                      |   2 +
 src/tools/irm/irm_ipcp_bootstrap.c |  69 +++++----
 21 files changed, 936 insertions(+), 131 deletions(-)
 create mode 100644 src/ipcpd/unicast/ca.c
 create mode 100644 src/ipcpd/unicast/ca.h
 create mode 100644 src/ipcpd/unicast/pol-ca-ops.h
 create mode 100644 src/ipcpd/unicast/pol/ca-mb-ecn.c
 create mode 100644 src/ipcpd/unicast/pol/ca-mb-ecn.h
 create mode 100644 src/ipcpd/unicast/pol/ca-nop.c
 create mode 100644 src/ipcpd/unicast/pol/ca-nop.h

diff --git a/doc/man/ouroboros.8 b/doc/man/ouroboros.8
index 5a09df8..adb652b 100644
--- a/doc/man/ouroboros.8
+++ b/doc/man/ouroboros.8
@@ -2,7 +2,7 @@
 .\" Dimitri Staessens <dimitri.staessens@xxxxxxxx>
 .\" Sander Vrijders <sander.vrijders@xxxxxxxx>
 
-.TH OUROBOROS 8 2018-03-10 Ouroboros "Ouroboros User Manual"
+.TH OUROBOROS 8 2020-11-29 Ouroboros "Ouroboros User Manual"
 
 .SH NAME
 
@@ -258,6 +258,17 @@ ecmp: equal-cost multipath routing.
 .br
 default: link_state.
 .PP
+[congestion \fIpolicy\fR] specifies the congestion avoidance policy.
+.br
+\fIpolicy\fR:
+.RS 4
+none: no congestion avoidance.
+.br
+mb-ecn: Multi-bit explicit congestion notification and avoidance.
+.RE
+.br
+default: mb-ecn.
+.PP
 [hash \fIpolicy\fR] specifies the hash function used for the directory.
 .br
 \fIpolicy\fR: SHA3_224, SHA3_256, SHA3_384, SHA3_512.
diff --git a/include/ouroboros/ipcp-dev.h b/include/ouroboros/ipcp-dev.h
index d60e0b4..b57cec0 100644
--- a/include/ouroboros/ipcp-dev.h
+++ b/include/ouroboros/ipcp-dev.h
@@ -26,33 +26,35 @@
 #ifndef OUROBOROS_IPCP_DEV_H
 #define OUROBOROS_IPCP_DEV_H
 
-int  ipcp_create_r(int result);
+int   ipcp_create_r(int result);
 
-int  ipcp_flow_req_arr(const uint8_t * dst,
-                       size_t          len,
-                       qosspec_t       qs,
-                       const void *    data,
-                       size_t          dlen);
+int   ipcp_flow_req_arr(const uint8_t * dst,
+                        size_t           len,
+                        qosspec_t        qs,
+                        const void *     data,
+                        size_t           dlen);
 
-int  ipcp_flow_alloc_reply(int          fd,
-                           int          response,
-                           const void * data,
-                           size_t       len);
+int    ipcp_flow_alloc_reply(int        fd,
+                             int          response,
+                             const void * data,
+                             size_t       len);
 
-int  ipcp_flow_read(int                   fd,
-                    struct shm_du_buff ** sdb);
+int    ipcp_flow_read(int                   fd,
+                      struct shm_du_buff ** sdb);
 
-int  ipcp_flow_write(int                  fd,
-                     struct shm_du_buff * sdb);
+int    ipcp_flow_write(int                  fd,
+                       struct shm_du_buff * sdb);
 
-int  ipcp_flow_fini(int fd);
+int    ipcp_flow_fini(int fd);
 
-int  ipcp_flow_get_qoscube(int         fd,
-                           qoscube_t * cube);
+int    ipcp_flow_get_qoscube(int         fd,
+                             qoscube_t * cube);
 
-int  ipcp_sdb_reserve(struct shm_du_buff ** sdb,
-                      size_t                len);
+size_t ipcp_flow_queued(int fd);
 
-void ipcp_sdb_release(struct shm_du_buff * sdb);
+int    ipcp_sdb_reserve(struct shm_du_buff ** sdb,
+                        size_t                len);
+
+void   ipcp_sdb_release(struct shm_du_buff * sdb);
 
 #endif /* OUROBOROS_IPCP_DEV_H */
diff --git a/include/ouroboros/ipcp.h b/include/ouroboros/ipcp.h
index 6494d9b..86dfd2d 100644
--- a/include/ouroboros/ipcp.h
+++ b/include/ouroboros/ipcp.h
@@ -55,6 +55,11 @@ enum pol_routing {
         ROUTING_LINK_STATE_ECMP
 };
 
+enum pol_cong_avoid {
+        CA_NONE = 0,
+        CA_MB_ECN
+};
+
 enum pol_dir_hash {
         DIR_HASH_SHA3_224 = 0,
         DIR_HASH_SHA3_256,
@@ -70,29 +75,30 @@ struct layer_info {
 
 /* Structure to configure the first IPCP */
 struct ipcp_config {
-        struct layer_info  layer_info;
+        struct layer_info   layer_info;
 
-        enum ipcp_type     type;
+        enum ipcp_type      type;
 
         /* Unicast */
-        uint8_t            addr_size;
-        uint8_t            eid_size;
-        uint8_t            max_ttl;
+        uint8_t             addr_size;
+        uint8_t             eid_size;
+        uint8_t             max_ttl;
 
-        enum pol_addr_auth addr_auth_type;
-        enum pol_routing   routing_type;
+        enum pol_addr_auth  addr_auth_type;
+        enum pol_routing    routing_type;
+        enum pol_cong_avoid cong_avoid;
 
         /* UDP */
-        uint32_t           ip_addr;
-        uint32_t           dns_addr;
-        uint16_t           clt_port;
-        uint16_t           srv_port;
+        uint32_t            ip_addr;
+        uint32_t            dns_addr;
+        uint16_t            clt_port;
+        uint16_t            srv_port;
 
         /* Ethernet */
-        char *             dev;
+        char *              dev;
 
         /* Ethernet DIX */
-        uint16_t           ethertype;
+        uint16_t            ethertype;
 };
 
 #endif /* OUROBOROS_IPCP_H */
diff --git a/src/ipcpd/ipcp.c b/src/ipcpd/ipcp.c
index 95d2f78..c8b5f84 100644
--- a/src/ipcpd/ipcp.c
+++ b/src/ipcpd/ipcp.c
@@ -241,6 +241,7 @@ static void * mainloop(void * o)
                                 conf.max_ttl        = conf_msg->max_ttl;
                                 conf.addr_auth_type = conf_msg->addr_auth_type;
                                 conf.routing_type   = conf_msg->routing_type;
+                                conf.cong_avoid     = conf_msg->cong_avoid;
                                 break;
                         case IPCP_ETH_DIX:
                                 conf.ethertype = conf_msg->ethertype;
@@ -261,7 +262,8 @@ static void * mainloop(void * o)
                                 layer_info.dir_hash_algo      = HASH_SHA3_256;
                                 break;
                         default:
-                                log_err("Unknown IPCP type: %d.", 
conf_msg->ipcp_type);
+                                log_err("Unknown IPCP type: %d.",
+                                        conf_msg->ipcp_type);
                         }
 
                         /* UDP and broadcast use fixed hash algorithm. */
diff --git a/src/ipcpd/unicast/CMakeLists.txt b/src/ipcpd/unicast/CMakeLists.txt
index c0c5551..035ee5f 100644
--- a/src/ipcpd/unicast/CMakeLists.txt
+++ b/src/ipcpd/unicast/CMakeLists.txt
@@ -33,6 +33,7 @@ endif ()
 set(SOURCE_FILES
   # Add source files here
   addr_auth.c
+  ca.c
   connmgr.c
   dht.c
   dir.c
@@ -51,6 +52,8 @@ set(SOURCE_FILES
   pol/simple_pff.c
   pol/alternate_pff.c
   pol/multipath_pff.c
+  pol/ca-mb-ecn.c
+  pol/ca-nop.c
   )
 
 add_executable(ipcpd-unicast ${SOURCE_FILES} ${IPCP_SOURCES}
diff --git a/src/ipcpd/unicast/ca.c b/src/ipcpd/unicast/ca.c
new file mode 100644
index 0000000..e3d1b36
--- /dev/null
+++ b/src/ipcpd/unicast/ca.c
@@ -0,0 +1,98 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion Avoidance
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#define OUROBOROS_PREFIX "ca"
+
+#include <ouroboros/logs.h>
+
+#include "ca.h"
+#include "pol-ca-ops.h"
+#include "pol/ca-mb-ecn.h"
+#include "pol/ca-nop.h"
+
+struct {
+        struct pol_ca_ops * ops;
+} ca;
+
+int ca_init(enum pol_cong_avoid pol)
+{
+        switch(pol) {
+        case CA_NONE:
+                log_dbg("Disabling congestion control.");
+                ca.ops = &nop_ca_ops;
+                break;
+        case CA_MB_ECN:
+                log_dbg("Using multi-bit ECN.");
+                ca.ops = &mb_ecn_ca_ops;
+                break;
+        default:
+                return -1;
+        }
+
+        return 0;
+}
+
+
+void ca_fini(void)
+{
+        ca.ops = NULL;
+}
+
+void * ca_ctx_create(void)
+{
+        return ca.ops->ctx_create();
+}
+
+void ca_ctx_destroy(void * ctx)
+{
+        return ca.ops->ctx_destroy(ctx);
+}
+
+ca_wnd_t ca_ctx_update_snd(void *               ctx,
+                           struct shm_du_buff * sdb)
+{
+        return ca.ops->ctx_update_snd(ctx, sdb);
+}
+
+bool ca_ctx_update_rcv(void *     ctx,
+                       uint8_t    ecn,
+                       uint16_t * ece)
+{
+        return ca.ops->ctx_update_rcv(ctx, ecn, ece);
+}
+
+void ca_ctx_update_ece(void *   ctx,
+                       uint16_t ece)
+{
+        return ca.ops->ctx_update_ece(ctx, ece);
+}
+
+void ca_wnd_wait(ca_wnd_t wnd)
+{
+        return ca.ops->wnd_wait(wnd);
+}
+
+uint8_t ca_calc_ecn(int                  fd,
+                    struct shm_du_buff * sdb)
+{
+        return ca.ops->calc_ecn(fd, sdb);
+}
diff --git a/src/ipcpd/unicast/ca.h b/src/ipcpd/unicast/ca.h
new file mode 100644
index 0000000..a3280c6
--- /dev/null
+++ b/src/ipcpd/unicast/ca.h
@@ -0,0 +1,60 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion avoidance
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_H
+#define OUROBOROS_IPCPD_UNICAST_CA_H
+
+#include <ouroboros/ipcp.h>
+#include <ouroboros/shm_du_buff.h>
+
+#include <stdbool.h>
+
+typedef union {
+        time_t wait;
+} ca_wnd_t;
+
+int      ca_init(enum pol_cong_avoid ca);
+
+void     ca_fini(void);
+
+
+/* OPS */
+void *   ca_ctx_create(void);
+
+void     ca_ctx_destroy(void * ctx);
+
+ca_wnd_t ca_ctx_update_snd(void *               ctx,
+                           struct shm_du_buff * sdb);
+
+bool     ca_ctx_update_rcv(void *     ctx,
+                           uint8_t    ecn,
+                           uint16_t * ece);
+
+void     ca_ctx_update_ece(void *   ctx,
+                           uint16_t ece);
+
+void     ca_wnd_wait(ca_wnd_t wnd);
+
+uint8_t  ca_calc_ecn(int                  fd,
+                     struct shm_du_buff * sdb);
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_H */
diff --git a/src/ipcpd/unicast/dt.c b/src/ipcpd/unicast/dt.c
index 7db766a..6b2e984 100644
--- a/src/ipcpd/unicast/dt.c
+++ b/src/ipcpd/unicast/dt.c
@@ -41,6 +41,7 @@
 #include <ouroboros/fccntl.h>
 #endif
 
+#include "ca.h"
 #include "connmgr.h"
 #include "ipcp.h"
 #include "dt.h"
@@ -115,16 +116,12 @@ static void dt_pci_ser(uint8_t *       head,
 
 }
 
-static void dt_pci_des(struct shm_du_buff * sdb,
-                       struct dt_pci *      dt_pci)
+static void dt_pci_des(uint8_t *       head,
+                       struct dt_pci * dt_pci)
 {
-        uint8_t * head;
-
-        assert(sdb);
+        assert(head);
         assert(dt_pci);
 
-        head = shm_du_buff_head(sdb);
-
         /* Decrease TTL */
         --*(head + dt_pci_info.ttl_o);
 
@@ -226,7 +223,6 @@ static int dt_stat_read(const char * path,
                 "Queued packets (rx):      %20zu\n"
                 "Queued packets (tx):      %20zu\n\n",
                 tmstr, addrstr, rxqlen, txqlen);
-
         for (i = 0; i < QOS_CUBE_MAX; ++i) {
                 sprintf(str,
                         "Qos cube %3d:\n"
@@ -434,6 +430,7 @@ static void packet_handler(int                  fd,
         struct dt_pci dt_pci;
         int           ret;
         int           ofd;
+        uint8_t *     head;
 #ifndef IPCP_FLOW_STATS
         (void)        fd;
 #else
@@ -449,7 +446,10 @@ static void packet_handler(int                  fd,
         pthread_mutex_unlock(&dt.stat[fd].lock);
 #endif
         memset(&dt_pci, 0, sizeof(dt_pci));
-        dt_pci_des(sdb, &dt_pci);
+
+        head = shm_du_buff_head(sdb);
+
+        dt_pci_des(head, &dt_pci);
         if (dt_pci.dst_addr != ipcpi.dt_addr) {
                 if (dt_pci.ttl == 0) {
                         log_dbg("TTL was zero.");
@@ -481,6 +481,8 @@ static void packet_handler(int                  fd,
                         return;
                 }
 
+                *(head + dt_pci_info.ecn_o) |= ca_calc_ecn(ofd, sdb);
+
                 ret = ipcp_flow_write(ofd, sdb);
                 if (ret < 0) {
                         log_dbg("Failed to write packet to fd %d.", ofd);
@@ -508,6 +510,9 @@ static void packet_handler(int                  fd,
         } else {
                 dt_pci_shrink(sdb);
                 if (dt_pci.eid >= PROG_RES_FDS) {
+                        uint8_t ecn = *(head + dt_pci_info.ecn_o);
+                        fa_ecn_update(dt_pci.eid, ecn);
+
                         if (ipcp_flow_write(dt_pci.eid, sdb)) {
                                 ipcp_sdb_release(sdb);
 #ifdef IPCP_FLOW_STATS
@@ -832,7 +837,7 @@ int dt_write_packet(uint64_t             dst_addr,
         dt_pci.dst_addr = dst_addr;
         dt_pci.qc       = qc;
         dt_pci.eid      = np1_fd;
-        dt_pci.ecn      = 0;
+        dt_pci.ecn      = ca_calc_ecn(fd, sdb);
 
         dt_pci_ser(head, &dt_pci);
 #ifdef IPCP_FLOW_STATS
diff --git a/src/ipcpd/unicast/fa.c b/src/ipcpd/unicast/fa.c
index e154d78..ad5d5bf 100644
--- a/src/ipcpd/unicast/fa.c
+++ b/src/ipcpd/unicast/fa.c
@@ -42,6 +42,7 @@
 #include "psched.h"
 #include "ipcp.h"
 #include "dt.h"
+#include "ca.h"
 
 #include <pthread.h>
 #include <stdlib.h>
@@ -49,9 +50,10 @@
 
 #define TIMEOUT 10000 /* nanoseconds */
 
-#define FLOW_REQ   0
-#define FLOW_REPLY 1
-#define MSGBUFSZ   2048
+#define FLOW_REQ    0
+#define FLOW_REPLY  1
+#define FLOW_UPDATE 2
+#define MSGBUFSZ    2048
 
 struct fa_msg {
         uint64_t s_addr;
@@ -59,6 +61,7 @@ struct fa_msg {
         uint32_t s_eid;
         uint8_t  code;
         int8_t   response;
+        uint16_t ece;
         /* QoS parameters from spec, aligned */
         uint8_t  availability;
         uint8_t  in_order;
@@ -75,10 +78,16 @@ struct cmd {
         struct shm_du_buff * sdb;
 };
 
+struct fa_flow {
+        int      r_eid;  /* remote endpoint id               */
+        uint64_t r_addr; /* remote address                   */
+        void *   ctx;    /* congestion avoidance context     */
+};
+
 struct {
         pthread_rwlock_t flows_lock;
-        int              r_eid[PROG_MAX_FLOWS];
-        uint64_t         r_addr[PROG_MAX_FLOWS];
+        struct fa_flow   flows[PROG_MAX_FLOWS];
+
         int              fd;
 
         struct list_head cmds;
@@ -93,22 +102,53 @@ static void packet_handler(int                  fd,
                            qoscube_t            qc,
                            struct shm_du_buff * sdb)
 {
-        pthread_rwlock_rdlock(&fa.flows_lock);
+        struct fa_flow * flow;
+        uint64_t         r_addr;
+        uint32_t         r_eid;
+        ca_wnd_t         wnd;
 
-        if (dt_write_packet(fa.r_addr[fd], qc, fa.r_eid[fd], sdb)) {
-                pthread_rwlock_unlock(&fa.flows_lock);
+        flow = &fa.flows[fd];
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        wnd = ca_ctx_update_snd(flow->ctx, sdb);
+
+        r_addr = flow->r_addr;
+        r_eid  = flow->r_eid;
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        ca_wnd_wait(wnd);
+
+        if (dt_write_packet(r_addr, qc, r_eid, sdb)) {
                 ipcp_sdb_release(sdb);
                 log_warn("Failed to forward packet.");
                 return;
         }
+}
+
+static int fa_flow_init(struct fa_flow * flow)
+{
+        memset(flow, 0, sizeof(*flow));
+
+        flow->r_eid  = -1;
+        flow->r_addr = INVALID_ADDR;
 
-        pthread_rwlock_unlock(&fa.flows_lock);
+        flow->ctx = ca_ctx_create();
+        if (flow->ctx == NULL)
+                return -1;
+
+        return 0;
 }
 
-static void destroy_conn(int fd)
+static void fa_flow_fini(struct fa_flow * flow)
 {
-        fa.r_eid[fd]  = -1;
-        fa.r_addr[fd] = INVALID_ADDR;
+        ca_ctx_destroy(flow->ctx);
+
+        memset(flow, 0, sizeof(*flow));
+
+        flow->r_eid  = -1;
+        flow->r_addr = INVALID_ADDR;
 }
 
 static void fa_post_packet(void *               comp,
@@ -145,14 +185,15 @@ static void * fa_handle_packet(void * o)
         (void) o;
 
         while (true) {
-                struct timespec abstime;
-                int             fd;
-                uint8_t         buf[MSGBUFSZ];
-                struct fa_msg * msg;
-                qosspec_t       qs;
-                struct cmd *    cmd;
-                size_t          len;
-                size_t          msg_len;
+                struct timespec  abstime;
+                int              fd;
+                uint8_t          buf[MSGBUFSZ];
+                struct fa_msg *  msg;
+                qosspec_t        qs;
+                struct cmd *     cmd;
+                size_t           len;
+                size_t           msg_len;
+                struct fa_flow * flow;
 
                 pthread_mutex_lock(&fa.mtx);
 
@@ -232,10 +273,14 @@ static void * fa_handle_packet(void * o)
                                 continue;
                         }
 
+                        flow = &fa.flows[fd];
+
                         pthread_rwlock_wrlock(&fa.flows_lock);
 
-                        fa.r_eid[fd]  = ntoh32(msg->s_eid);
-                        fa.r_addr[fd] = ntoh64(msg->s_addr);
+                        fa_flow_init(flow);
+
+                        flow->r_eid  = ntoh32(msg->s_eid);
+                        flow->r_addr = ntoh64(msg->s_addr);
 
                         pthread_rwlock_unlock(&fa.flows_lock);
 
@@ -248,19 +293,32 @@ static void * fa_handle_packet(void * o)
                 case FLOW_REPLY:
                         assert(len >= sizeof(*msg));
 
+                        flow = &fa.flows[ntoh32(msg->r_eid)];
+
                         pthread_rwlock_wrlock(&fa.flows_lock);
 
-                        fa.r_eid[ntoh32(msg->r_eid)] = ntoh32(msg->s_eid);
+                        flow->r_eid = ntoh32(msg->s_eid);
+
+                        if (msg->response < 0)
+                                fa_flow_fini(flow);
+                        else
+                                psched_add(fa.psched, ntoh32(msg->r_eid));
+
+                        pthread_rwlock_unlock(&fa.flows_lock);
 
                         ipcp_flow_alloc_reply(ntoh32(msg->r_eid),
                                               msg->response,
                                               buf + sizeof(*msg),
                                               len - sizeof(*msg));
+                        break;
+                case FLOW_UPDATE:
+                        assert(len >= sizeof(*msg));
 
-                        if (msg->response < 0)
-                                destroy_conn(ntoh32(msg->r_eid));
-                        else
-                                psched_add(fa.psched, ntoh32(msg->r_eid));
+                        flow = &fa.flows[ntoh32(msg->r_eid)];
+
+                        pthread_rwlock_wrlock(&fa.flows_lock);
+
+                        ca_ctx_update_ece(flow->ctx, ntoh16(msg->ece));
 
                         pthread_rwlock_unlock(&fa.flows_lock);
 
@@ -275,10 +333,6 @@ static void * fa_handle_packet(void * o)
 int fa_init(void)
 {
         pthread_condattr_t cattr;
-        int                i;
-
-        for (i = 0; i < PROG_MAX_FLOWS; ++i)
-                destroy_conn(i);
 
         if (pthread_rwlock_init(&fa.flows_lock, NULL))
                 goto fail_rwlock;
@@ -383,9 +437,10 @@ int fa_alloc(int             fd,
              size_t          dlen)
 {
         struct fa_msg *      msg;
+        struct shm_du_buff * sdb;
+        struct fa_flow *     flow;
         uint64_t             addr;
-        struct shm_du_buff * sdb;
-        qoscube_t            qc;
+        qoscube_t            qc = QOS_CUBE_BE;
         size_t               len;
 
         addr = dir_query(dst);
@@ -397,7 +452,9 @@ int fa_alloc(int             fd,
         if (ipcp_sdb_reserve(&sdb, len + dlen))
                 return -1;
 
-        msg               = (struct fa_msg *) shm_du_buff_head(sdb);
+        msg = (struct fa_msg *) shm_du_buff_head(sdb);
+        memset(msg, 0, sizeof(*msg));
+
         msg->code         = FLOW_REQ;
         msg->s_eid        = hton32(fd);
         msg->s_addr       = hton64(ipcpi.dt_addr);
@@ -413,17 +470,17 @@ int fa_alloc(int             fd,
         memcpy(msg + 1, dst, ipcp_dir_hash_len());
         memcpy(shm_du_buff_head(sdb) + len, data, dlen);
 
-        qc = qos_spec_to_cube(qs);
-
         if (dt_write_packet(addr, qc, fa.fd, sdb)) {
                 ipcp_sdb_release(sdb);
                 return -1;
         }
 
+        flow = &fa.flows[fd];
+
         pthread_rwlock_wrlock(&fa.flows_lock);
 
-        assert(fa.r_eid[fd] == -1);
-        fa.r_addr[fd] = addr;
+        fa_flow_init(flow);
+        flow->r_addr = addr;
 
         pthread_rwlock_unlock(&fa.flows_lock);
 
@@ -439,10 +496,13 @@ int fa_alloc_resp(int          fd,
         struct timespec      abstime;
         struct fa_msg *      msg;
         struct shm_du_buff * sdb;
-        qoscube_t            qc;
+        struct fa_flow *     flow;
+        qoscube_t            qc = QOS_CUBE_BE;
 
         clock_gettime(PTHREAD_COND_CLOCK, &abstime);
 
+        flow = &fa.flows[fd];
+
         pthread_mutex_lock(&ipcpi.alloc_lock);
 
         while (ipcpi.alloc_id != fd && ipcp_get_state() == IPCP_OPERATIONAL) {
@@ -463,33 +523,31 @@ int fa_alloc_resp(int          fd,
         pthread_mutex_unlock(&ipcpi.alloc_lock);
 
         if (ipcp_sdb_reserve(&sdb, sizeof(*msg) + len)) {
-                destroy_conn(fd);
+                fa_flow_fini(flow);
                 return -1;
         }
 
+        msg = (struct fa_msg *) shm_du_buff_head(sdb);
+        memset(msg, 0, sizeof(*msg));
+
         pthread_rwlock_wrlock(&fa.flows_lock);
 
-        msg           = (struct fa_msg *) shm_du_buff_head(sdb);
         msg->code     = FLOW_REPLY;
-        msg->r_eid    = hton32(fa.r_eid[fd]);
+        msg->r_eid    = hton32(flow->r_eid);
         msg->s_eid    = hton32(fd);
         msg->response = response;
 
         memcpy(msg + 1, data, len);
 
         if (response < 0) {
-                destroy_conn(fd);
+                fa_flow_fini(flow);
                 ipcp_sdb_release(sdb);
         } else {
                 psched_add(fa.psched, fd);
         }
 
-        ipcp_flow_get_qoscube(fd, &qc);
-
-        assert(qc >= 0 && qc < QOS_CUBE_MAX);
-
-        if (dt_write_packet(fa.r_addr[fd], qc, fa.fd, sdb)) {
-                destroy_conn(fd);
+        if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+                fa_flow_fini(flow);
                 pthread_rwlock_unlock(&fa.flows_lock);
                 ipcp_sdb_release(sdb);
                 return -1;
@@ -505,11 +563,11 @@ int fa_dealloc(int fd)
         if (ipcp_flow_fini(fd) < 0)
                 return 0;
 
-        pthread_rwlock_wrlock(&fa.flows_lock);
-
         psched_del(fa.psched, fd);
 
-        destroy_conn(fd);
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        fa_flow_fini(&fa.flows[fd]);
 
         pthread_rwlock_unlock(&fa.flows_lock);
 
@@ -517,3 +575,59 @@ int fa_dealloc(int fd)
 
         return 0;
 }
+
+static int fa_update_remote(int      fd,
+                            uint16_t ece)
+{
+        struct fa_msg *      msg;
+        struct shm_du_buff * sdb;
+        qoscube_t            qc = QOS_CUBE_BE;
+        struct fa_flow *     flow;
+
+        if (ipcp_sdb_reserve(&sdb, sizeof(*msg))) {
+                return -1;
+        }
+
+        msg = (struct fa_msg *) shm_du_buff_head(sdb);
+
+        memset(msg, 0, sizeof(*msg));
+
+        flow = &fa.flows[fd];
+
+        pthread_rwlock_rdlock(&fa.flows_lock);
+
+        msg->code  = FLOW_UPDATE;
+        msg->r_eid = hton32(flow->r_eid);
+        msg->ece   = hton16(ece);
+
+        if (dt_write_packet(flow->r_addr, qc, fa.fd, sdb)) {
+                pthread_rwlock_unlock(&fa.flows_lock);
+                ipcp_sdb_release(sdb);
+                return -1;
+        }
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+
+        return 0;
+}
+
+void  fa_ecn_update(int     eid,
+                    uint8_t ecn)
+{
+        struct fa_flow * flow;
+        bool             update;
+        uint16_t         ece;
+
+        flow = &fa.flows[eid];
+
+        pthread_rwlock_wrlock(&fa.flows_lock);
+
+        update = ca_ctx_update_rcv(flow->ctx, ecn, &ece);
+
+        pthread_rwlock_unlock(&fa.flows_lock);
+
+        if (update)
+                fa_update_remote(eid, ece);
+
+}
diff --git a/src/ipcpd/unicast/fa.h b/src/ipcpd/unicast/fa.h
index 12a10a0..70e5c89 100644
--- a/src/ipcpd/unicast/fa.h
+++ b/src/ipcpd/unicast/fa.h
@@ -47,4 +47,7 @@ int  fa_alloc_resp(int          fd,
 
 int  fa_dealloc(int fd);
 
+void fa_ecn_update(int     eid,
+                   uint8_t ecn);
+
 #endif /* OUROBOROS_IPCPD_UNICAST_FA_H */
diff --git a/src/ipcpd/unicast/main.c b/src/ipcpd/unicast/main.c
index 0ab37d2..8284c9f 100644
--- a/src/ipcpd/unicast/main.c
+++ b/src/ipcpd/unicast/main.c
@@ -39,6 +39,7 @@
 #include <ouroboros/time_utils.h>
 
 #include "addr_auth.h"
+#include "ca.h"
 #include "connmgr.h"
 #include "dir.h"
 #include "dt.h"
@@ -83,6 +84,11 @@ static int initialize_components(const struct ipcp_config * 
conf)
 
         log_dbg("IPCP got address %" PRIu64 ".", ipcpi.dt_addr);
 
+        if (ca_init(conf->cong_avoid)) {
+                log_err("Failed to initialize data transfer component.");
+                goto fail_ca;
+        }
+
         if (dt_init(conf->routing_type,
                     conf->addr_size,
                     conf->eid_size,
@@ -110,6 +116,8 @@ static int initialize_components(const struct ipcp_config * 
conf)
  fail_fa:
         dt_fini();
  fail_dt:
+        ca_fini();
+ fail_ca:
         addr_auth_fini();
  fail_addr_auth:
         free(ipcpi.layer_name);
@@ -125,6 +133,8 @@ static void finalize_components(void)
 
         dt_fini();
 
+        ca_fini();
+
         addr_auth_fini();
 
         free(ipcpi.layer_name);
diff --git a/src/ipcpd/unicast/pol-ca-ops.h b/src/ipcpd/unicast/pol-ca-ops.h
new file mode 100644
index 0000000..e19ce0e
--- /dev/null
+++ b/src/ipcpd/unicast/pol-ca-ops.h
@@ -0,0 +1,49 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Congestion avoidance policy ops
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H
+#define OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H
+
+#include "ca.h"
+
+struct pol_ca_ops {
+        void *   (* ctx_create)(void);
+
+        void     (* ctx_destroy)(void * ctx);
+
+        ca_wnd_t (* ctx_update_snd)(void *               ctx,
+                                    struct shm_du_buff * sdb);
+
+        bool     (* ctx_update_rcv)(void *     ctx,
+                                    uint8_t    ecn,
+                                    uint16_t * ece);
+
+        void     (* ctx_update_ece)(void *   ctx,
+                                    uint16_t ece);
+
+        void     (* wnd_wait)(ca_wnd_t wnd);
+
+        uint8_t  (* calc_ecn)(int                  fd,
+                              struct shm_du_buff * sdb);
+};
+
+#endif /* OUROBOROS_IPCPD_UNICAST_POL_CA_OPS_H */
diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.c 
b/src/ipcpd/unicast/pol/ca-mb-ecn.c
new file mode 100644
index 0000000..edf272c
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-mb-ecn.c
@@ -0,0 +1,216 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Multi-bit ECN Congestion Avoidance
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#if defined(__linux__) || defined(__CYGWIN__)
+#define _DEFAULT_SOURCE
+#else
+#define _POSIX_C_SOURCE 200809L
+#endif
+
+#include "config.h"
+
+#include <ouroboros/ipcp-dev.h>
+#include <ouroboros/time_utils.h>
+
+#include "ca-mb-ecn.h"
+
+#include <stdlib.h>
+#include <string.h>
+
+/* congestion avoidance constants */
+#define CA_SHFT      5
+#define CA_WND       (1 << CA_SHFT)
+#define CA_UPD       (1 << (CA_SHFT - 3))
+#define CA_SLOT      18
+#define CA_AI        20000
+#define ECN_Q_SHFT   5
+#define ts_to_ns(ts) (ts.tv_sec * BILLION + ts.tv_nsec)
+
+struct mb_ecn_ctx {
+        uint16_t        rx_ece; /* level of congestion (upstream)   */
+        size_t          rx_ctr; /* receiver side packet counter     */
+
+        uint16_t        tx_ece; /* level of congestion (downstream) */
+        size_t          tx_ctr; /* sender side packet counter       */
+        size_t          tx_aps; /* average packet size              */
+        time_t          tx_wnd; /* tgt time to send packets (ns)    */
+        bool            tx_cav; /* Congestion avoidance             */
+        size_t          tx_slot;
+
+        struct timespec t_sent; /* last sent packet                 */
+};
+
+struct pol_ca_ops mb_ecn_ca_ops = {
+        .ctx_create     = mb_ecn_ctx_create,
+        .ctx_destroy    = mb_ecn_ctx_destroy,
+        .ctx_update_snd = mb_ecn_ctx_update_snd,
+        .ctx_update_rcv = mb_ecn_ctx_update_rcv,
+        .ctx_update_ece = mb_ecn_ctx_update_ece,
+        .wnd_wait       = mb_ecn_wnd_wait,
+        .calc_ecn       = mb_ecn_calc_ecn
+};
+
+void * mb_ecn_ctx_create(void)
+{
+
+        struct mb_ecn_ctx * ctx;
+
+        ctx = malloc(sizeof(*ctx));
+        if (ctx == NULL)
+                return NULL;
+
+        memset(ctx, 0, sizeof(*ctx));
+
+        return (void *) ctx;
+}
+
+void mb_ecn_ctx_destroy(void * ctx)
+{
+        free(ctx);
+}
+
+ca_wnd_t mb_ecn_ctx_update_snd(void *               _ctx,
+                               struct shm_du_buff * sdb)
+{
+        struct timespec  now;
+        size_t           len;
+        size_t           slot;
+        time_t           gap;
+        ca_wnd_t         wnd;
+
+        struct mb_ecn_ctx * ctx = _ctx;
+
+        clock_gettime(PTHREAD_COND_CLOCK, &now);
+
+        len = shm_du_buff_tail(sdb) - shm_du_buff_head(sdb);
+
+        if (ctx->tx_wnd == 0) { /* 10 ms initial window estimate */
+                ctx->tx_wnd = 10 * MILLION;
+                gap          = ctx->tx_wnd >> CA_SHFT;
+                ctx->tx_aps = len >> CA_SHFT;
+                ctx->tx_slot = ts_to_ns(now) >> CA_SLOT;
+        } else {
+                gap           = ts_diff_ns(&ctx->t_sent, &now);
+                ctx->tx_aps -= ctx->tx_aps >> CA_SHFT;
+                ctx->tx_aps += len;
+        }
+
+        ctx->t_sent = now;
+
+        slot = ts_to_ns(now) >> CA_SLOT;
+
+        ctx->tx_ctr++;
+
+        if (slot - ctx->tx_slot > 0) {
+                ctx->tx_slot = slot;
+
+                if (ctx->tx_ctr > CA_WND)
+                        ctx->tx_ece = 0;
+
+                /* Slow start */
+                if (!ctx->tx_cav) {
+                        ctx->tx_wnd >>= 1;
+                /* Multiplicative Decrease */
+                } else if (ctx->tx_ece) { /* MD */
+                        ctx->tx_wnd += (ctx->tx_wnd * ctx->tx_ece)
+                                >> (CA_SHFT + 8);
+                /* Additive Increase */
+                } else {
+                        size_t bw = ctx->tx_aps * BILLION / ctx->tx_wnd;
+                        bw += CA_AI;
+                        ctx->tx_wnd = ctx->tx_aps * BILLION / bw;
+                }
+        }
+
+        wnd.wait = (ctx->tx_wnd >> CA_SHFT) - gap;
+
+        return wnd;
+}
+
+void mb_ecn_wnd_wait(ca_wnd_t wnd)
+{
+        if (wnd.wait > 0) {
+                struct timespec s = {0, 0};
+                if (wnd.wait > BILLION) /* Don't care throttling < 1pps */
+                        s.tv_sec = 1;
+                else
+                        s.tv_nsec = wnd.wait;
+
+                nanosleep(&s, NULL);
+        }
+}
+
+bool mb_ecn_ctx_update_rcv(void *     _ctx,
+                           uint8_t    ecn,
+                           uint16_t * ece)
+{
+        struct mb_ecn_ctx* ctx = _ctx;
+        bool               update;
+
+        if ((ctx->rx_ece | ecn) == 0)
+                return false;
+
+        if (ecn == 0) {
+                /* end of congestion */
+                ctx->rx_ece >>= 2;
+                update = ctx->rx_ece == 0;
+        } else {
+                if (ctx->rx_ece == 0) {
+                        /* start of congestion */
+                        ctx->rx_ece = ecn;
+                        ctx->rx_ctr = 0;
+                        update = true;
+                } else {
+                        /* congestion update */
+                        ctx->rx_ece -= ctx->rx_ece >> CA_SHFT;
+                        ctx->rx_ece += ecn;
+                        update = (ctx->rx_ctr++ & (CA_UPD - 1)) == true;
+                }
+        }
+
+        *ece = ctx->rx_ece;
+
+        return update;
+}
+
+
+void mb_ecn_ctx_update_ece(void *   _ctx,
+                           uint16_t ece)
+{
+        struct mb_ecn_ctx* ctx = _ctx;
+
+        ctx->tx_ece = ece;
+        ctx->tx_ctr = 0;
+        ctx->tx_cav = true;
+}
+
+uint8_t mb_ecn_calc_ecn(int                  fd,
+                        struct shm_du_buff * sdb)
+{
+        size_t q;
+
+        (void) sdb;
+
+        q = ipcp_flow_queued(fd);
+
+        return (uint8_t) (q >> ECN_Q_SHFT);
+}
diff --git a/src/ipcpd/unicast/pol/ca-mb-ecn.h 
b/src/ipcpd/unicast/pol/ca-mb-ecn.h
new file mode 100644
index 0000000..2eb0b8c
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-mb-ecn.h
@@ -0,0 +1,49 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Multi-bit ECN Congestion Avoidance
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H
+#define OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H
+
+#include "pol-ca-ops.h"
+
+void *   mb_ecn_ctx_create(void);
+
+void     mb_ecn_ctx_destroy(void * ctx);
+
+ca_wnd_t mb_ecn_ctx_update_snd(void *               ctx,
+                               struct shm_du_buff * sdb);
+
+bool     mb_ecn_ctx_update_rcv(void *     ctx,
+                               uint8_t    ecn,
+                               uint16_t * ece);
+
+void     mb_ecn_ctx_update_ece(void *   ctx,
+                               uint16_t ece);
+
+void     mb_ecn_wnd_wait(ca_wnd_t wnd);
+
+uint8_t  mb_ecn_calc_ecn(int                  fd,
+                         struct shm_du_buff * sdb);
+
+extern struct pol_ca_ops mb_ecn_ca_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_MB_ECN_H */
diff --git a/src/ipcpd/unicast/pol/ca-nop.c b/src/ipcpd/unicast/pol/ca-nop.c
new file mode 100644
index 0000000..28d9754
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-nop.c
@@ -0,0 +1,91 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Dummy Congestion Avoidance
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#include "ca-nop.h"
+
+#include <string.h>
+
+struct pol_ca_ops nop_ca_ops = {
+        .ctx_create     = nop_ctx_create,
+        .ctx_destroy    = nop_ctx_destroy,
+        .ctx_update_snd = nop_ctx_update_snd,
+        .ctx_update_rcv = nop_ctx_update_rcv,
+        .ctx_update_ece = nop_ctx_update_ece,
+        .wnd_wait       = nop_wnd_wait,
+        .calc_ecn       = nop_calc_ecn
+};
+
+void * nop_ctx_create(void)
+{
+        return (void *) 1;
+}
+
+void nop_ctx_destroy(void * ctx)
+{
+        (void) ctx;
+}
+
+ca_wnd_t nop_ctx_update_snd(void *               ctx,
+                            struct shm_du_buff * sdb)
+{
+        ca_wnd_t wnd;
+
+        (void) ctx;
+        (void) sdb;
+
+        memset(&wnd, 0, sizeof(wnd));
+
+        return wnd;
+}
+
+void nop_wnd_wait(ca_wnd_t wnd)
+{
+        (void) wnd;
+}
+
+bool nop_ctx_update_rcv(void *     ctx,
+                        uint8_t    ecn,
+                        uint16_t * ece)
+{
+        (void) ctx;
+        (void) ecn;
+        (void) ece;
+
+        return false;
+}
+
+void nop_ctx_update_ece(void *   ctx,
+                        uint16_t ece)
+{
+        (void) ctx;
+        (void) ece;
+}
+
+
+uint8_t nop_calc_ecn(int                  fd,
+                     struct shm_du_buff * sdb)
+{
+        (void) fd;
+        (void) sdb;
+
+        return 0;
+}
diff --git a/src/ipcpd/unicast/pol/ca-nop.h b/src/ipcpd/unicast/pol/ca-nop.h
new file mode 100644
index 0000000..1275a6f
--- /dev/null
+++ b/src/ipcpd/unicast/pol/ca-nop.h
@@ -0,0 +1,49 @@
+/*
+ * Ouroboros - Copyright (C) 2016 - 2020
+ *
+ * Dummy Congestion Avoidance
+ *
+ *    Dimitri Staessens <dimitri.staessens@xxxxxxxx>
+ *    Sander Vrijders   <sander.vrijders@xxxxxxxx>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., http://www.fsf.org/about/contact/.
+ */
+
+#ifndef OUROBOROS_IPCPD_UNICAST_CA_NOP_H
+#define OUROBOROS_IPCPD_UNICAST_CA_NOP_H
+
+#include "pol-ca-ops.h"
+
+void *   nop_ctx_create(void);
+
+void     nop_ctx_destroy(void * ctx);
+
+ca_wnd_t nop_ctx_update_snd(void *               ctx,
+                            struct shm_du_buff * sdb);
+
+bool     nop_ctx_update_rcv(void *     ctx,
+                            uint8_t    ecn,
+                            uint16_t * ece);
+
+void     nop_ctx_update_ece(void *   ctx,
+                            uint16_t ece);
+
+void     nop_wnd_wait(ca_wnd_t wnd);
+
+uint8_t  nop_calc_ecn(int                  fd,
+                      struct shm_du_buff * sdb);
+
+extern struct pol_ca_ops nop_ca_ops;
+
+#endif /* OUROBOROS_IPCPD_UNICAST_CA_NOP_H */
diff --git a/src/ipcpd/unicast/pol/link_state.c 
b/src/ipcpd/unicast/pol/link_state.c
index d948287..ca8a7c5 100644
--- a/src/ipcpd/unicast/pol/link_state.c
+++ b/src/ipcpd/unicast/pol/link_state.c
@@ -812,8 +812,12 @@ static void handle_event(void *       self,
         switch (event) {
         case NOTIFY_DT_CONN_ADD:
                 pthread_rwlock_rdlock(&ls.db_lock);
+
+                pthread_cleanup_push((void (*) (void *)) pthread_rwlock_unlock,
+                                     (void *) &ls.db_lock);
+
                 send_lsm(ipcpi.dt_addr, c->conn_info.addr, 0);
-                pthread_rwlock_unlock(&ls.db_lock);
+                pthread_cleanup_pop(true);
 
                 if (lsdb_add_nb(c->conn_info.addr, c->flow_info.fd, NB_DT))
                         log_dbg("Failed to add neighbor to LSDB.");
diff --git a/src/lib/dev.c b/src/lib/dev.c
index 4b78c1d..a6be762 100644
--- a/src/lib/dev.c
+++ b/src/lib/dev.c
@@ -1768,6 +1768,21 @@ int ipcp_flow_get_qoscube(int         fd,
         return 0;
 }
 
+size_t ipcp_flow_queued(int fd)
+{
+        size_t q;
+
+        pthread_rwlock_rdlock(&ai.lock);
+
+        assert(ai.flows[fd].flow_id >= 0);
+
+        q = shm_rbuff_queued(ai.flows[fd].tx_rb);
+
+        pthread_rwlock_unlock(&ai.lock);
+
+        return q;
+}
+
 ssize_t local_flow_read(int fd)
 {
         ssize_t ret;
diff --git a/src/lib/ipcp_config.proto b/src/lib/ipcp_config.proto
index 23c65e9..7bf5329 100644
--- a/src/lib/ipcp_config.proto
+++ b/src/lib/ipcp_config.proto
@@ -36,15 +36,16 @@ message ipcp_config_msg {
         optional uint32 max_ttl            =  5;
         optional uint32 addr_auth_type     =  6;
         optional uint32 routing_type       =  7;
+        optional uint32 cong_avoid         =  8;
         // Config for UDP
-        optional uint32 ip_addr            =  8;
-        optional uint32 dns_addr           =  9;
-        optional uint32 clt_port           = 10;
-        optional uint32 srv_port           = 11;
+        optional uint32 ip_addr            =  9;
+        optional uint32 dns_addr           = 10;
+        optional uint32 clt_port           = 11;
+        optional uint32 srv_port           = 12;
         // Config for the Ethernet
-        optional string dev                = 12;
+        optional string dev                = 13;
         // Config for DIX Ethernet
-        optional uint32 ethertype          = 13;
+        optional uint32 ethertype          = 14;
 }
 
 enum enroll_code {
diff --git a/src/lib/irm.c b/src/lib/irm.c
index 08dffb6..42ad74f 100644
--- a/src/lib/irm.c
+++ b/src/lib/irm.c
@@ -132,6 +132,8 @@ int irm_bootstrap_ipcp(pid_t                      pid,
                 config.addr_auth_type     = conf->addr_auth_type;
                 config.has_routing_type   = true;
                 config.routing_type       = conf->routing_type;
+                config.has_cong_avoid     = true;
+                config.cong_avoid         = conf->cong_avoid;
                 break;
         case IPCP_UDP:
                 config.has_ip_addr  = true;
diff --git a/src/tools/irm/irm_ipcp_bootstrap.c 
b/src/tools/irm/irm_ipcp_bootstrap.c
index 84b6759..ba57a50 100644
--- a/src/tools/irm/irm_ipcp_bootstrap.c
+++ b/src/tools/irm/irm_ipcp_bootstrap.c
@@ -50,7 +50,7 @@
 #include <sys/socket.h>
 #endif
 
-#define UNICAST                 "unicast"
+#define UNICAST                "unicast"
 #define BROADCAST              "broadcast"
 #define UDP                    "udp"
 #define ETH_LLC                "eth-llc"
@@ -70,6 +70,7 @@
 #define DEFAULT_TTL            60
 #define DEFAULT_ADDR_AUTH      ADDR_AUTH_FLAT_RANDOM
 #define DEFAULT_ROUTING        ROUTING_LINK_STATE
+#define DEFAULT_CONG_AVOID     CA_MB_ECN
 #define DEFAULT_HASH_ALGO      DIR_HASH_SHA3_256
 #define DEFAULT_ETHERTYPE      0xA000
 #define DEFAULT_CLIENT_PORT    0x0000 /* random port */
@@ -79,6 +80,8 @@
 #define LINK_STATE_ROUTING     "link_state"
 #define LINK_STATE_LFA_ROUTING "lfa"
 #define LINK_STATE_ECM_ROUTING "ecmp"
+#define NONE_CA                "none"
+#define MB_ECN_CA              "mb-ecn"
 
 static void usage(void)
 {
@@ -95,11 +98,13 @@ static void usage(void)
                "                [ttl (max time-to-live value, default: %d)]\n"
                "                [addr_auth <ADDRESS_POLICY> (default: %s)]\n"
                "                [routing <ROUTING_POLICY> (default: %s)]\n"
+               "                [congestion <CONG_POLICY> (default: %s)]\n"
                "                [hash [ALGORITHM] (default: %s)]\n"
                "                [autobind]\n"
-               "where ADDRESS_POLICY = {"FLAT_RANDOM_ADDR_AUTH"}\n"
-               "      ROUTING_POLICY = {"LINK_STATE_ROUTING " "
+               "where ADDRESS_POLICY = {" FLAT_RANDOM_ADDR_AUTH "}\n"
+               "      ROUTING_POLICY = {" LINK_STATE_ROUTING " "
                LINK_STATE_LFA_ROUTING " " LINK_STATE_ECM_ROUTING "}\n"
+               "      CONG_POLICY = {" NONE_CA " " MB_ECN_CA "}\n"
                "      ALGORITHM = {" SHA3_224 " " SHA3_256 " "
                SHA3_384 " " SHA3_512 "}\n\n"
                "if TYPE == " UDP "\n"
@@ -130,7 +135,7 @@ static void usage(void)
                "if TYPE == " BROADCAST "\n"
                "                [autobind]\n\n",
                DEFAULT_ADDR_SIZE, DEFAULT_EID_SIZE, DEFAULT_TTL,
-               FLAT_RANDOM_ADDR_AUTH, LINK_STATE_ROUTING,
+               FLAT_RANDOM_ADDR_AUTH, LINK_STATE_ROUTING, MB_ECN_CA,
                SHA3_256, DEFAULT_SERVER_PORT, SHA3_256, 0xA000, SHA3_256,
                SHA3_256, SHA3_256);
 }
@@ -138,29 +143,30 @@ static void usage(void)
 int do_bootstrap_ipcp(int     argc,
                       char ** argv)
 {
-        char *             ipcp           = NULL;
-        pid_t              pid            = -1;
-        struct ipcp_config conf;
-        uint8_t            addr_size      = DEFAULT_ADDR_SIZE;
-        uint8_t            eid_size       = DEFAULT_EID_SIZE;
-        uint8_t            max_ttl        = DEFAULT_TTL;
-        enum pol_addr_auth addr_auth_type = DEFAULT_ADDR_AUTH;
-        enum pol_routing   routing_type   = DEFAULT_ROUTING;
-        enum pol_dir_hash  hash_algo      = DEFAULT_HASH_ALGO;
-        uint32_t           ip_addr        = 0;
-        uint32_t           dns_addr       = DEFAULT_DDNS;
-        char *             ipcp_type      = NULL;
-        enum ipcp_type     type           = IPCP_INVALID;
-        char *             layer          = NULL;
-        char *             dev            = NULL;
-        uint16_t           ethertype      = DEFAULT_ETHERTYPE;
-        struct ipcp_info * ipcps;
-        ssize_t            len            = 0;
-        int                i              = 0;
-        bool               autobind       = false;
-        int                cargs;
-        int                cport          = DEFAULT_CLIENT_PORT;
-        int                sport          = DEFAULT_SERVER_PORT;
+        char *              ipcp           = NULL;
+        pid_t               pid            = -1;
+        struct ipcp_config  conf;
+        uint8_t             addr_size      = DEFAULT_ADDR_SIZE;
+        uint8_t             eid_size       = DEFAULT_EID_SIZE;
+        uint8_t             max_ttl        = DEFAULT_TTL;
+        enum pol_addr_auth  addr_auth_type = DEFAULT_ADDR_AUTH;
+        enum pol_routing    routing_type   = DEFAULT_ROUTING;
+        enum pol_dir_hash   hash_algo      = DEFAULT_HASH_ALGO;
+        enum pol_cong_avoid cong_avoid     = DEFAULT_CONG_AVOID;
+        uint32_t            ip_addr        = 0;
+        uint32_t            dns_addr       = DEFAULT_DDNS;
+        char *              ipcp_type      = NULL;
+        enum ipcp_type      type           = IPCP_INVALID;
+        char *              layer          = NULL;
+        char *              dev            = NULL;
+        uint16_t            ethertype      = DEFAULT_ETHERTYPE;
+        struct ipcp_info *  ipcps;
+        ssize_t             len            = 0;
+        int                 i              = 0;
+        bool                autobind       = false;
+        int                 cargs;
+        int                 cport          = DEFAULT_CLIENT_PORT;
+        int                 sport          = DEFAULT_SERVER_PORT;
 
         while (argc > 0) {
                 cargs = 2;
@@ -230,6 +236,14 @@ int do_bootstrap_ipcp(int     argc,
                                 routing_type = ROUTING_LINK_STATE_ECMP;
                         else
                                 goto unknown_param;
+                } else if (matches(*argv, "congestion") == 0) {
+                        if (strcmp(NONE_CA, *(argv + 1)) == 0)
+                                cong_avoid = CA_NONE;
+                        else if (strcmp(MB_ECN_CA,
+                                        *(argv + 1)) == 0)
+                                cong_avoid = CA_MB_ECN;
+                        else
+                                goto unknown_param;
                 } else {
                         printf("Unknown option: \"%s\".\n", *argv);
                         return -1;
@@ -315,6 +329,7 @@ int do_bootstrap_ipcp(int     argc,
                                 conf.max_ttl        = max_ttl;
                                 conf.addr_auth_type = addr_auth_type;
                                 conf.routing_type   = routing_type;
+                                conf.cong_avoid     = cong_avoid;
                                 break;
                         case IPCP_UDP:
                                 if (ip_addr == 0)
-- 
2.29.2


Other related posts: