[tarantool-patches] [PATCH 5/5] session: introduce box.session.push

  • From: Vladislav Shpilevoy <v.shpilevoy@xxxxxxxxxxxxx>
  • To: tarantool-patches@xxxxxxxxxxxxx
  • Date: Mon, 19 Mar 2018 16:34:52 +0300

Box.session.push() allows to send a message to a client with no
finishing a main request.

Tarantool supports two push types: via text protocol and via
binary protocol. Text protocol push message contains "push:"
prefix followed by a YAML formatted text, where as a final
response always has '---' prefix. To catch pushed messages on a
client side use console.connect() on_push options which takes
a function with a single argument - message.

IProto message is encoded just like a regular response, but
instead of IPROTO_DATA it has IPROTO_PUSH with a single element -
pushed MessagePack encoded data.

Text push is trivial - it is just blocking write into a socket
with a prefix. Binary push is more complex.

TX thread to notify IProto thread about new data in obuf sends
a message 'push_msg'. IProto thread, got this message, notifies
libev about new data, and then sends 'push_msg' back with
updated write position. TX thread, received the message back,
updates its version of a write position. If IProto will not send
a write position, then TX will write to the same obuf again
and again, because it can not know that IProto already flushed
another obuf.

To avoid multiple 'push_msg' in fly between IProto and TX, the
only one 'push_msg' per connection is used. To deliver pushes,
appeared when 'push_msg' was in fly, TX thread sets a flag every
time when sees, that 'push_msg' is sent, and there is a new push.
When 'push_msg' returns, it checks this flag, and if it is set,
then the IProto is notified again.

Closes #2677

Signed-off-by: Vladislav Shpilevoy <v.shpilevoy@xxxxxxxxxxxxx>
---
 src/box/iproto.cc                        | 235 +++++++++++++++++---
 src/box/iproto_constants.c               |   3 +-
 src/box/iproto_constants.h               |   8 +
 src/box/lua/call.c                       |   1 +
 src/box/lua/console.c                    |   2 +-
 src/box/lua/console.h                    |   8 +
 src/box/lua/console.lua                  |   6 +-
 src/box/lua/net_box.c                    |  37 ++++
 src/box/lua/net_box.lua                  |  97 ++++++--
 src/box/lua/session.c                    | 171 +++++++++++++++
 src/box/port.c                           |   7 +
 src/box/port.h                           |  15 ++
 src/box/session.cc                       |  14 ++
 src/box/session.h                        |  17 ++
 src/box/xrow.c                           |  40 +++-
 src/box/xrow.h                           |  26 ++-
 src/fio.c                                |  12 +
 src/fio.h                                |  16 ++
 test/app-tap/console.test.lua            |   9 +-
 test/box/net.box.result                  |   2 +-
 test/box/net.box.test.lua                |   2 +-
 test/box/push.result                     | 364 +++++++++++++++++++++++++++++++
 test/box/push.test.lua                   | 163 ++++++++++++++
 test/replication/before_replace.result   |  14 ++
 test/replication/before_replace.test.lua |  11 +
 25 files changed, 1211 insertions(+), 69 deletions(-)
 create mode 100644 test/box/push.result
 create mode 100644 test/box/push.test.lua

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index ad4b1a757..4408293c1 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -63,6 +63,45 @@
 
 enum { IPROTO_SALT_SIZE = 32 };
 
+/**
+ * This structure represents a position in the output.
+ * Since we use rotating buffers to recycle memory,
+ * it includes not only a position in obuf, but also
+ * a pointer to obuf the position is for.
+ */
+struct iproto_wpos {
+       struct obuf *obuf;
+       struct obuf_svp svp;
+};
+
+static void
+iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
+{
+       wpos->obuf = out;
+       wpos->svp = obuf_create_svp(out);
+}
+
+/* {{{ IPROTO_PUSH declarations. */
+
+/**
+ * Message to notify IProto thread about new data in an output
+ * buffer. Struct iproto_msg is not used here, because push
+ * notification can be much more compact: it does not have
+ * request, ibuf, length, flags ...
+ */
+struct iproto_push_msg {
+       struct cmsg base;
+       /**
+        * Before sending to IProto thread, the wpos is set to a
+        * current position in an output buffer. Before IProto
+        * returns the message to TX, it sets wpos to the last
+        * flushed position (works like iproto_msg.wpos).
+        */
+       struct iproto_wpos wpos;
+       /** IProto connection to push into. */
+       struct iproto_connection *connection;
+};
+
 /** Owner of binary IProto sessions. */
 struct iproto_session_owner {
        struct session_owner base;
@@ -70,6 +109,37 @@ struct iproto_session_owner {
        char salt[IPROTO_SALT_SIZE];
        /** IProto connection. */
        struct iproto_connection *connection;
+       /**
+        * Is_push_in_progress is set, when a push_msg is sent to
+        * IProto thread, and reset, when the message is returned
+        * to TX. If a new push sees, that a push_msg is already
+        * sent to IProto, then has_new_pushes is set. After push
+        * notification is returned to TX, it checks
+        * has_new_pushes. If it is set, then the notification is
+        * sent again. This ping-pong continues, until TX stopped
+        * pushing. It allows to
+        * 1) avoid multiple push_msg from one session in fly,
+        * 2) do not block push() until a previous push() is
+        *    finished.
+        *
+        *      IProto                           TX
+        * -------------------------------------------------------
+        *                                 + [push message]
+        *    start socket <--- notification ----
+        *       write
+        *                                 + [push message]
+        *                                 + [push message]
+        *                                       ...
+        *      end socket
+        *        write    ----------------> check for new
+        *                                   pushes - found
+        *                 <--- notification ---
+        *                      ....
+        */
+       bool has_new_pushes;
+       bool is_push_in_progress;
+       /** Push notification for IProto thread. */
+       struct iproto_push_msg push_msg;
 };
 
 static struct session_owner *
@@ -78,10 +148,27 @@ iproto_session_owner_dup(struct session_owner *owner);
 static int
 iproto_session_owner_fd(const struct session_owner *owner);
 
+/**
+ * Push a message from @a port to a remote client.
+ * @param owner IProto session owner.
+ * @param sync Message sync. Must be the same as a request sync to
+ *        be able to detect their tie on a client side.
+ * @param port Port with data to send.
+ *
+ * @retval -1 Memory error.
+ * @retval  0 Success, a message is wrote to an output buffer. But
+ *          it is not guaranteed, that it will be sent
+ *          successfully.
+ */
+static int
+iproto_session_owner_push(struct session_owner *owner, uint64_t sync,
+                         struct port *port);
+
 static const struct session_owner_vtab iproto_session_owner_vtab = {
        /* .dup = */ iproto_session_owner_dup,
        /* .delete = */ (void (*)(struct session_owner *)) free,
        /* .fd = */ iproto_session_owner_fd,
+       /* .push = */ iproto_session_owner_push,
 };
 
 static struct session_owner *
@@ -105,6 +192,9 @@ iproto_session_owner_create(struct iproto_session_owner 
*owner,
        owner->base.type = SESSION_TYPE_BINARY;
        owner->base.vtab = &iproto_session_owner_vtab;
        owner->connection = connection;
+       owner->has_new_pushes = false;
+       owner->is_push_in_progress = false;
+       owner->push_msg.connection = connection;
        random_bytes(owner->salt, IPROTO_SALT_SIZE);
 }
 
@@ -117,6 +207,8 @@ iproto_session_salt(struct session *session)
        return session_owner->salt;
 }
 
+/** }}} */
+
 /* The number of iproto messages in flight */
 enum { IPROTO_MSG_MAX = 768 };
 
@@ -164,24 +256,6 @@ iproto_reset_input(struct ibuf *ibuf)
        }
 }
 
-/**
- * This structure represents a position in the output.
- * Since we use rotating buffers to recycle memory,
- * it includes not only a position in obuf, but also
- * a pointer to obuf the position is for.
- */
-struct iproto_wpos {
-       struct obuf *obuf;
-       struct obuf_svp svp;
-};
-
-static void
-iproto_wpos_create(struct iproto_wpos *wpos, struct obuf *out)
-{
-       wpos->obuf = out;
-       wpos->svp = obuf_create_svp(out);
-}
-
 /* {{{ iproto_msg - declaration */
 
 /**
@@ -1168,15 +1242,16 @@ tx_discard_input(struct iproto_msg *msg)
  *   not, the empty buffer is selected.
  * - if neither of the buffers are empty, the function
  *   does not rotate the buffer.
+ *
+ * @param con IProto connection.
+ * @param wpos Last flushed write position, received from IProto
+ *        thread.
  */
-static struct iproto_msg *
-tx_accept_msg(struct cmsg *m)
+static void
+tx_accept_msg(struct iproto_connection *con, struct iproto_wpos *wpos)
 {
-       struct iproto_msg *msg = (struct iproto_msg *) m;
-       struct iproto_connection *con = msg->connection;
-
        struct obuf *prev = &con->obuf[con->tx.p_obuf == con->obuf];
-       if (msg->wpos.obuf == con->tx.p_obuf) {
+       if (wpos->obuf == con->tx.p_obuf) {
                /*
                 * We got a message advancing the buffer which
                 * is being appended to. The previous buffer is
@@ -1194,7 +1269,6 @@ tx_accept_msg(struct cmsg *m)
                 */
                con->tx.p_obuf = prev;
        }
-       return msg;
 }
 
 /**
@@ -1217,7 +1291,8 @@ tx_reply_error(struct iproto_msg *msg)
 static void
 tx_reply_iproto_error(struct cmsg *m)
 {
-       struct iproto_msg *msg = tx_accept_msg(m);
+       struct iproto_msg *msg = (struct iproto_msg *) m;
+       tx_accept_msg(msg->connection, &msg->wpos);
        struct obuf *out = msg->connection->tx.p_obuf;
        iproto_reply_error(out, diag_last_error(&msg->diag),
                           msg->header.sync, ::schema_version);
@@ -1227,8 +1302,8 @@ tx_reply_iproto_error(struct cmsg *m)
 static void
 tx_process1(struct cmsg *m)
 {
-       struct iproto_msg *msg = tx_accept_msg(m);
-       struct obuf *out = msg->connection->tx.p_obuf;
+       struct iproto_msg *msg = (struct iproto_msg *) m;
+       tx_accept_msg(msg->connection, &msg->wpos);
 
        tx_fiber_init(msg->connection->session, msg->header.sync);
        if (tx_check_schema(msg->header.schema_version))
@@ -1236,8 +1311,11 @@ tx_process1(struct cmsg *m)
 
        struct tuple *tuple;
        struct obuf_svp svp;
-       if (box_process1(&msg->dml, &tuple) ||
-           iproto_prepare_select(out, &svp))
+       struct obuf *out;
+       if (box_process1(&msg->dml, &tuple) != 0)
+               goto error;
+       out = msg->connection->tx.p_obuf;
+       if (iproto_prepare_select(out, &svp) != 0)
                goto error;
        if (tuple && tuple_to_obuf(tuple, out))
                goto error;
@@ -1252,8 +1330,9 @@ error:
 static void
 tx_process_select(struct cmsg *m)
 {
-       struct iproto_msg *msg = tx_accept_msg(m);
-       struct obuf *out = msg->connection->tx.p_obuf;
+       struct iproto_msg *msg = (struct iproto_msg *) m;
+       tx_accept_msg(msg->connection, &msg->wpos);
+       struct obuf *out;
        struct obuf_svp svp;
        struct port port;
        int count;
@@ -1270,6 +1349,7 @@ tx_process_select(struct cmsg *m)
                        req->key, req->key_end, &port);
        if (rc < 0)
                goto error;
+       out = msg->connection->tx.p_obuf;
        if (iproto_prepare_select(out, &svp) != 0) {
                port_destroy(&port);
                goto error;
@@ -1305,7 +1385,8 @@ tx_process_call_on_yield(struct trigger *trigger, void 
*event)
 static void
 tx_process_call(struct cmsg *m)
 {
-       struct iproto_msg *msg = tx_accept_msg(m);
+       struct iproto_msg *msg = (struct iproto_msg *) m;
+       tx_accept_msg(msg->connection, &msg->wpos);
 
        tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1387,7 +1468,8 @@ error:
 static void
 tx_process_misc(struct cmsg *m)
 {
-       struct iproto_msg *msg = tx_accept_msg(m);
+       struct iproto_msg *msg = (struct iproto_msg *) m;
+       tx_accept_msg(msg->connection, &msg->wpos);
        struct obuf *out = msg->connection->tx.p_obuf;
        struct session *session = msg->connection->session;
 
@@ -1428,8 +1510,9 @@ error:
 static void
 tx_process_join_subscribe(struct cmsg *m)
 {
-       struct iproto_msg *msg = tx_accept_msg(m);
+       struct iproto_msg *msg = (struct iproto_msg *) m;
        struct iproto_connection *con = msg->connection;
+       tx_accept_msg(con, &msg->wpos);
 
        tx_fiber_init(con->session, msg->header.sync);
 
@@ -1612,6 +1695,88 @@ static const struct cmsg_hop connect_route[] = {
 
 /** }}} */
 
+/** {{{ IPROTO_PUSH implementation. */
+
+/**
+ * Send to IProto thread a notification about new pushes.
+ * @param owner IProto session owner.
+ */
+static void
+tx_begin_push(struct iproto_session_owner *owner);
+
+/**
+ * Create an event to send push.
+ * @param m IProto push message.
+ */
+static void
+net_push_msg(struct cmsg *m)
+{
+       struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+       struct iproto_connection *con = msg->connection;
+       con->wend = msg->wpos;
+       msg->wpos = con->wpos;
+       if (evio_has_fd(&con->output) && !ev_is_active(&con->output))
+               ev_feed_event(con->loop, &con->output, EV_WRITE);
+}
+
+/**
+ * After a message notifies IProto thread about pushed data, TX
+ * thread can already have a new push in one of obufs. This
+ * function checks for new pushes and possibly re-sends push
+ * notification to IProto thread.
+ */
+static void
+tx_check_for_new_push(struct cmsg *m)
+{
+       struct iproto_push_msg *msg = (struct iproto_push_msg *) m;
+       struct iproto_session_owner *owner =
+               container_of(msg, struct iproto_session_owner, push_msg);
+       tx_accept_msg(msg->connection, &msg->wpos);
+       owner->is_push_in_progress = false;
+       if (owner->has_new_pushes)
+               tx_begin_push(owner);
+}
+
+static const struct cmsg_hop push_route[] = {
+       { net_push_msg, &tx_pipe },
+       { tx_check_for_new_push, NULL }
+};
+
+static void
+tx_begin_push(struct iproto_session_owner *owner)
+{
+       assert(! owner->is_push_in_progress);
+       cmsg_init((struct cmsg *) &owner->push_msg, push_route);
+       iproto_wpos_create(&owner->push_msg.wpos, owner->connection->tx.p_obuf);
+       owner->has_new_pushes = false;
+       owner->is_push_in_progress = true;
+       cpipe_push(&net_pipe, (struct cmsg *) &owner->push_msg);
+}
+
+static int
+iproto_session_owner_push(struct session_owner *session_owner, uint64_t sync,
+                         struct port *port)
+{
+       struct iproto_session_owner *owner =
+               (struct iproto_session_owner *) session_owner;
+       struct iproto_connection *con = owner->connection;
+       struct obuf_svp svp;
+       if (iproto_prepare_push(con->tx.p_obuf, &svp) != 0)
+               return -1;
+       if (port_dump(port, con->tx.p_obuf) != 0) {
+               obuf_rollback_to_svp(con->tx.p_obuf, &svp);
+               return -1;
+       }
+       iproto_reply_push(con->tx.p_obuf, &svp, sync, ::schema_version);
+       if (! owner->is_push_in_progress)
+               tx_begin_push(owner);
+       else
+               owner->has_new_pushes = true;
+       return 0;
+}
+
+/** }}} */
+
 /**
  * Create a connection and start input.
  */
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index cd7b1d03b..893275c7b 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -174,7 +174,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
        NULL,               /* 0x2e */
        NULL,               /* 0x2f */
        "data",             /* 0x30 */
-       "error"             /* 0x31 */
+       "error",            /* 0x31 */
+       "push",             /* 0x32 */
 };
 
 const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = {
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 951842485..05d262a0b 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -79,6 +79,14 @@ enum iproto_key {
        /* Leave a gap between request keys and response keys */
        IPROTO_DATA = 0x30,
        IPROTO_ERROR = 0x31,
+       /**
+        * Tarantool supports two push types: binary and text.
+        * A text push can be distinguished from a response by a
+        * prefix "push:".
+        * Binary push is encoded using IPROTO_PUSH key in a
+        * message body, which replaces IPROTO_DATA.
+        */
+       IPROTO_PUSH = 0x32,
        IPROTO_KEY_MAX
 };
 
diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index be13812aa..fc70ed430 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -418,6 +418,7 @@ port_lua_destroy(struct port *base)
 static const struct port_vtab port_lua_vtab = {
        .dump = port_lua_dump,
        .dump_16 = port_lua_dump_16,
+       .dump_raw = NULL,
        .destroy = port_lua_destroy,
 };
 
diff --git a/src/box/lua/console.c b/src/box/lua/console.c
index 450745c90..3bc4b6425 100644
--- a/src/box/lua/console.c
+++ b/src/box/lua/console.c
@@ -329,7 +329,7 @@ lbox_console_add_history(struct lua_State *L)
        return 0;
 }
 
-static int
+int
 lbox_console_format(struct lua_State *L)
 {
        int arg_count = lua_gettop(L);
diff --git a/src/box/lua/console.h b/src/box/lua/console.h
index 208b31490..92a4af035 100644
--- a/src/box/lua/console.h
+++ b/src/box/lua/console.h
@@ -39,6 +39,14 @@ struct lua_State;
 void
 tarantool_lua_console_init(struct lua_State *L);
 
+/**
+ * Encode Lua object into YAML string.
+ * @param Lua object to encode on top of a stack.
+ * @retval Lua string.
+ */
+int
+lbox_console_format(struct lua_State *L);
+
 #if defined(__cplusplus)
 } /* extern "C" */
 #endif /* defined(__cplusplus) */
diff --git a/src/box/lua/console.lua b/src/box/lua/console.lua
index b4199ef85..295bfcbaa 100644
--- a/src/box/lua/console.lua
+++ b/src/box/lua/console.lua
@@ -59,7 +59,7 @@ end
 --
 -- Evaluate command on remote instance
 --
-local function remote_eval(self, line)
+local function remote_eval(self, line, opts)
     if not line or self.remote.state ~= 'active' then
         local err = self.remote.error
         self.remote:close()
@@ -74,7 +74,7 @@ local function remote_eval(self, line)
     --
     -- execute line
     --
-    local ok, res = pcall(self.remote.eval, self.remote, line)
+    local ok, res = pcall(self.remote.eval, self.remote, line, opts)
     return ok and res or format(false, res)
 end
 
@@ -310,7 +310,7 @@ local function connect(uri, opts)
 
     -- override methods
     self.remote = remote
-    self.eval = remote_eval
+    self.eval = function(s, l) return remote_eval(s, l, {on_push = 
opts.on_push}) end
     self.prompt = string.format("%s:%s", self.remote.host, self.remote.port)
     self.completion = function (str, pos1, pos2)
         local c = string.format(
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index db2d2dbb4..9fa7935f7 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -554,6 +554,41 @@ handle_error:
        return 2;
 }
 
+/**
+ * Search for a "push:" prefix in a message, received from a
+ * server using a text protocol.
+ */
+static int
+netbox_text_is_push(struct lua_State *L)
+{
+       assert(lua_gettop(L) == 2);
+       uint32_t ctypeid;
+       const char *text = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
+       assert(ctypeid == luaL_ctypeid(L, "char *"));
+       uint32_t len = (uint32_t) lua_tonumber(L, 2);
+       uint32_t push_len = strlen("push:");
+       lua_pushboolean(L, len >= 5 && memcmp(text, "push:", push_len) == 0);
+       return 1;
+}
+
+/**
+ * Search for IPROTO_PUSH key in a MessagePack encoded response
+ * body. It is needed without entire message decoding, when a user
+ * wants to store raw responses and pushes in its own buffer.
+ */
+static int
+netbox_body_is_push(struct lua_State *L)
+{
+       uint32_t ctypeid;
+       const char *body = *(const char **)luaL_checkcdata(L, 1, &ctypeid);
+       assert(ctypeid == luaL_ctypeid(L, "char *"));
+       assert(mp_typeof(*body) == MP_MAP);
+       lua_pushboolean(L, mp_decode_map(&body) == 1 &&
+                          mp_typeof(*body) == MP_UINT &&
+                          mp_decode_uint(&body) == IPROTO_PUSH);
+       return 1;
+}
+
 int
 luaopen_net_box(struct lua_State *L)
 {
@@ -571,6 +606,8 @@ luaopen_net_box(struct lua_State *L)
                { "encode_auth",    netbox_encode_auth },
                { "decode_greeting",netbox_decode_greeting },
                { "communicate",    netbox_communicate },
+               { "text_is_push",   netbox_text_is_push },
+               { "body_is_push",   netbox_body_is_push },
                { NULL, NULL}
        };
        /* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 87c8c548b..7ef6a6a2d 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -26,6 +26,8 @@ local communicate     = internal.communicate
 local encode_auth     = internal.encode_auth
 local encode_select   = internal.encode_select
 local decode_greeting = internal.decode_greeting
+local text_is_push    = internal.text_is_push
+local body_is_push    = internal.body_is_push
 
 local sequence_mt      = { __serialize = 'sequence' }
 local TIMEOUT_INFINITY = 500 * 365 * 86400
@@ -38,6 +40,7 @@ local IPROTO_SYNC_KEY      = 0x01
 local IPROTO_SCHEMA_VERSION_KEY = 0x05
 local IPROTO_DATA_KEY      = 0x30
 local IPROTO_ERROR_KEY     = 0x31
+local IPROTO_PUSH_KEY      = 0x32
 local IPROTO_GREETING_SIZE = 128
 
 -- select errors from box.error
@@ -229,7 +232,8 @@ local function create_transport(host, port, user, password, 
callback)
     end
 
     -- REQUEST/RESPONSE --
-    local function perform_request(timeout, buffer, method, schema_version, 
...)
+    local function perform_request(timeout, buffer, method, on_push,
+                                   schema_version, ...)
         if state ~= 'active' then
             return last_errno or E_NO_CONNECTION, last_error
         end
@@ -242,7 +246,7 @@ local function create_transport(host, port, user, password, 
callback)
         local id = next_request_id
         method_codec[method](send_buf, id, schema_version, ...)
         next_request_id = next_id(id)
-        local request = table_new(0, 6) -- reserve space for 6 keys
+        local request = table_new(0, 7) -- reserve space for 7 keys
         request.client = fiber_self()
         request.method = method
         request.schema_version = schema_version
@@ -254,6 +258,17 @@ local function create_transport(host, port, user, 
password, callback)
                 requests[id] = nil
                 return E_TIMEOUT, 'Timeout exceeded'
             end
+            if request.messages then
+                -- Multiple push messages can appear on a single
+                -- event loop iteration.
+                local messages = request.messages
+                request.messages = nil
+                if on_push then
+                    for _, m in pairs(messages) do
+                        on_push(m)
+                    end
+                end
+            end
         until requests[id] == nil -- i.e. completed (beware spurious wakeups)
         return request.errno, request.response
     end
@@ -270,12 +285,12 @@ local function create_transport(host, port, user, 
password, callback)
         if request == nil then -- nobody is waiting for the response
             return
         end
-        requests[id] = nil
         local status = hdr[IPROTO_STATUS_KEY]
         local body, body_end_check
 
         if status ~= 0 then
             -- Handle errors
+            requests[id] = nil
             body, body_end_check = decode(body_rpos)
             assert(body_end == body_end_check, "invalid xrow length")
             request.errno = band(status, IPROTO_ERRNO_MASK)
@@ -290,7 +305,16 @@ local function create_transport(host, port, user, 
password, callback)
             local body_len = body_end - body_rpos
             local wpos = buffer:alloc(body_len)
             ffi.copy(wpos, body_rpos, body_len)
-            request.response = tonumber(body_len)
+            if body_is_push(body_rpos) then
+                if request.messages then
+                    table.insert(request.messages, tonumber(body_len))
+                else
+                    request.messages = {tonumber(body_len)}
+                end
+            else
+                requests[id] = nil
+                request.response = tonumber(body_len)
+            end
             wakeup_client(request.client)
             return
         end
@@ -298,7 +322,18 @@ local function create_transport(host, port, user, 
password, callback)
         -- Decode xrow.body[DATA] to Lua objects
         body, body_end_check = decode(body_rpos)
         assert(body_end == body_end_check, "invalid xrow length")
-        request.response = body[IPROTO_DATA_KEY]
+        if body[IPROTO_PUSH_KEY] then
+            assert(#body[IPROTO_PUSH_KEY] == 1)
+            assert(not body[IPROTO_DATA_KEY])
+            if request.messages then
+                table.insert(request.messages, body[IPROTO_PUSH_KEY][1])
+            else
+                request.messages = {body[IPROTO_PUSH_KEY][1]}
+            end
+        else
+            requests[id] = nil
+            request.response = body[IPROTO_DATA_KEY]
+        end
         wakeup_client(request.client)
     end
 
@@ -345,9 +380,16 @@ local function create_transport(host, port, user, 
password, callback)
         if err then
             return err, delim_pos
         else
-            local response = ffi.string(recv_buf.rpos, delim_pos + #delim)
+            local response
+            local is_push = text_is_push(recv_buf.rpos, delim_pos + #delim)
+            if not is_push then
+                response = ffi.string(recv_buf.rpos, delim_pos + #delim)
+            else
+                -- 5 - len of 'push:' prefix of a message.
+                response = ffi.string(recv_buf.rpos + 5, delim_pos + #delim - 
5)
+            end
             recv_buf.rpos = recv_buf.rpos + delim_pos + #delim
-            return nil, response
+            return nil, response, is_push
         end
     end
 
@@ -408,18 +450,26 @@ local function create_transport(host, port, user, 
password, callback)
 
     console_sm = function(rid)
         local delim = '\n...\n'
-        local err, response = send_and_recv_console()
+        local err, response, is_push = send_and_recv_console()
         if err then
             return error_sm(err, response)
         else
             local request = requests[rid]
-            if request == nil then -- nobody is waiting for the response
-                return
+            if request then
+                if is_push then
+                    -- In a console mode it is impossible to
+                    -- get multiple pushes on a single event loop
+                    -- iteration.
+                    assert(not request.messages)
+                    request.messages = {response}
+                else
+                    requests[rid] = nil
+                    request.response = response
+                    rid = next_id(rid)
+                end
+                wakeup_client(request.client)
+                return console_sm(rid)
             end
-            requests[rid] = nil
-            request.response = response
-            wakeup_client(request.client)
-            return console_sm(next_id(rid))
         end
     end
 
@@ -761,7 +811,8 @@ function remote_methods:_request(method, opts, ...)
             timeout = deadline and max(0, deadline - fiber_clock())
         end
         err, res = perform_request(timeout, buffer, method,
-                                   self.schema_version, ...)
+                                   opts and opts.on_push, self.schema_version,
+                                   ...)
         if not err and buffer ~= nil then
             return res -- the length of xrow.body
         elseif not err then
@@ -791,7 +842,7 @@ function remote_methods:ping(opts)
         timeout = deadline and max(0, deadline - fiber_clock())
                             or (opts and opts.timeout)
     end
-    local err = self._transport.perform_request(timeout, nil, 'ping',
+    local err = self._transport.perform_request(timeout, nil, 'ping', nil,
                                                 self.schema_version)
     return not err or err == E_WRONG_SCHEMA_VERSION
 end
@@ -956,8 +1007,16 @@ console_methods.on_disconnect = 
remote_methods.on_disconnect
 console_methods.on_connect = remote_methods.on_connect
 console_methods.is_connected = remote_methods.is_connected
 console_methods.wait_state = remote_methods.wait_state
-function console_methods:eval(line, timeout)
+function console_methods:eval(line, opts)
     check_remote_arg(self, 'eval')
+    local timeout
+    local on_push
+    if type(opts) == 'table' then
+        timeout = opts.timeout or TIMEOUT_INFINITY
+        on_push = opts.on_push
+    else
+        timeout = opts
+    end
     local err, res
     local transport = self._transport
     local pr = transport.perform_request
@@ -968,10 +1027,10 @@ function console_methods:eval(line, timeout)
     end
     if self.protocol == 'Binary' then
         local loader = 'return require("console").eval(...)'
-        err, res = pr(timeout, nil, 'eval', nil, loader, {line})
+        err, res = pr(timeout, nil, 'eval', on_push, nil, loader, {line})
     else
         assert(self.protocol == 'Lua console')
-        err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
+        err, res = pr(timeout, nil, 'inject', on_push, nil, line..'$EOF$\n')
     end
     if err then
         box.error({code = err, reason = res})
diff --git a/src/box/lua/session.c b/src/box/lua/session.c
index af8411068..73cc9da8f 100644
--- a/src/box/lua/session.c
+++ b/src/box/lua/session.c
@@ -31,6 +31,7 @@
 #include "session.h"
 #include "lua/utils.h"
 #include "lua/trigger.h"
+#include "lua/msgpack.h"
 
 #include <lua.h>
 #include <lauxlib.h>
@@ -41,6 +42,10 @@
 #include "box/session.h"
 #include "box/user.h"
 #include "box/schema.h"
+#include "box/port.h"
+#include "box/lua/console.h"
+#include "fio.h"
+#include "small/obuf.h"
 
 /** Owner of a console session. */
 struct console_session_owner {
@@ -55,10 +60,26 @@ console_session_owner_dup(struct session_owner *owner);
 static int
 console_session_owner_fd(const struct session_owner *owner);
 
+/**
+ * Send "push:" prefix + message in a blocking mode, with no
+ * yields, to a console socket.
+ * @param owner Console session owner.
+ * @param sync Sync. It is unused since a text protocol has no
+ *        syncs.
+ * @param port Port with text to dump.
+ *
+ * @retval -1 Memory or IO error.
+ * @retval  0 Success.
+ */
+static int
+console_session_owner_push(struct session_owner *owner, uint64_t sync,
+                          struct port *port);
+
 static const struct session_owner_vtab console_session_owner_vtab = {
        /* .dup = */ console_session_owner_dup,
        /* .delete = */ (void (*)(struct session_owner *)) free,
        /* .fd = */ console_session_owner_fd,
+       /* .push = */ console_session_owner_push,
 };
 
 static struct session_owner *
@@ -90,6 +111,45 @@ console_session_owner_create(struct console_session_owner 
*owner, int fd)
        owner->fd = fd;
 }
 
+/**
+ * Write @a text into @a fd in a blocking mode, ignoring transient
+ * socket errors.
+ * @param fd Console descriptor.
+ * @param text Text to send.
+ * @param len Length of @a text.
+ */
+static inline int
+console_do_push(int fd, const char *text, uint32_t len)
+{
+       while (len > 0) {
+               int written = fio_write_silent(fd, text, len);
+               if (written < 0)
+                       return -1;
+               assert((uint32_t) written <= len);
+               len -= written;
+               text += written;
+       }
+       return 0;
+}
+
+static int
+console_session_owner_push(struct session_owner *owner, uint64_t sync,
+                          struct port *port)
+{
+       /* Console has no sync. */
+       (void) sync;
+       assert(owner->vtab == &console_session_owner_vtab);
+       int fd = console_session_owner_fd(owner);
+       uint32_t text_len;
+       const char *text = port_dump_raw(port, &text_len);
+       if (text == NULL ||
+           console_do_push(fd, "push:", strlen("push:")) != 0 ||
+           console_do_push(fd, text, text_len) != 0)
+               return -1;
+       else
+               return 0;
+}
+
 static const char *sessionlib_name = "box.session";
 
 /* Create session and pin it to fiber */
@@ -418,6 +478,116 @@ lbox_push_on_access_denied_event(struct lua_State *L, 
void *event)
        return 3;
 }
 
+/**
+ * Port to push a message from Lua.
+ */
+struct lua_push_port {
+       const struct port_vtab *vtab;
+       /**
+        * Lua state, containing data to dump on top of the stack.
+        */
+       struct lua_State *L;
+};
+
+/**
+ * Lua push port supports two dump types: usual and raw. Raw dump
+ * encodes a message as a YAML formatted text, usual dump encodes
+ * the message as MessagePack right into an output buffer.
+ */
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *out);
+
+static const char *
+lua_push_port_dump_text(struct port *port, uint32_t *size);
+
+static const struct port_vtab lua_push_port_vtab = {
+       .dump = lua_push_port_dump_msgpack,
+       /*
+        * Dump_16 has no sense, since push appears since 1.10
+        * protocol.
+        */
+       .dump_16 = NULL,
+       .dump_raw = lua_push_port_dump_text,
+       .destroy = NULL,
+};
+
+static void
+obuf_error_cb(void *ctx)
+{
+       *((int *)ctx) = -1;
+}
+
+static int
+lua_push_port_dump_msgpack(struct port *port, struct obuf *out)
+{
+       struct lua_push_port *lua_port = (struct lua_push_port *) port;
+       assert(lua_port->vtab == &lua_push_port_vtab);
+       struct mpstream stream;
+       int rc = 0;
+       /*
+        * Do not use luamp_error to allow a caller to clear the
+        * obuf, if it already has allocated something (for
+        * example, iproto push reserves memory for a header).
+        */
+       mpstream_init(&stream, out, obuf_reserve_cb, obuf_alloc_cb,
+                     obuf_error_cb, &rc);
+       luamp_encode(lua_port->L, luaL_msgpack_default, &stream, 1);
+       if (rc != 0)
+               return -1;
+       mpstream_flush(&stream);
+       return 0;
+}
+
+static const char *
+lua_push_port_dump_text(struct port *port, uint32_t *size)
+{
+       struct lua_push_port *lua_port = (struct lua_push_port *) port;
+       assert(lua_port->vtab == &lua_push_port_vtab);
+       lbox_console_format(lua_port->L);
+       assert(lua_isstring(lua_port->L, -1));
+       size_t len;
+       const char *result = lua_tolstring(lua_port->L, -1, &len);
+       *size = (uint32_t) len;
+       return result;
+}
+
+/**
+ * Push a message using a protocol, depending on a session type.
+ * @param data Data to push, first argument on a stack.
+ * @param opts Options. Now requires a single possible option -
+ *        sync. Second argument on a stack.
+ */
+static int
+lbox_session_push(struct lua_State *L)
+{
+       if (lua_gettop(L) != 2 || !lua_istable(L, 2)) {
+usage_error:
+               return luaL_error(L, "Usage: box.session.push(data, opts)");
+       }
+       lua_getfield(L, 2, "sync");
+       if (! lua_isnumber(L, 3))
+               goto usage_error;
+       double lua_sync = lua_tonumber(L, 3);
+       lua_pop(L, 1);
+       uint64_t sync = (uint64_t) lua_sync;
+       if (lua_sync != sync)
+               goto usage_error;
+       struct lua_push_port port;
+       port.vtab = &lua_push_port_vtab;
+       port.L = L;
+       /*
+        * Pop the opts - they must not be pushed. Leave only data
+        * on a stack.
+        */
+       lua_remove(L, 2);
+       if (session_push(current_session(), sync, (struct port *) &port) != 0) {
+               return luaT_error(L);
+       } else {
+               lua_pushboolean(L, true);
+               return 1;
+       }
+}
+
 /**
  * Sets trigger on_access_denied.
  * For test purposes only.
@@ -489,6 +659,7 @@ box_lua_session_init(struct lua_State *L)
                {"on_disconnect", lbox_session_on_disconnect},
                {"on_auth", lbox_session_on_auth},
                {"on_access_denied", lbox_session_on_access_denied},
+               {"push", lbox_session_push},
                {NULL, NULL}
        };
        luaL_register_module(L, sessionlib_name, sessionlib);
diff --git a/src/box/port.c b/src/box/port.c
index 03f6be79d..7f3cea5e7 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -143,6 +143,12 @@ port_dump_16(struct port *port, struct obuf *out)
        return port->vtab->dump_16(port, out);
 }
 
+const char *
+port_dump_raw(struct port *port, uint32_t *size)
+{
+       return port->vtab->dump_raw(port, size);
+}
+
 void
 port_init(void)
 {
@@ -159,5 +165,6 @@ port_free(void)
 const struct port_vtab port_tuple_vtab = {
        .dump = port_tuple_dump,
        .dump_16 = port_tuple_dump_16,
+       .dump_raw = NULL,
        .destroy = port_tuple_destroy,
 };
diff --git a/src/box/port.h b/src/box/port.h
index 7cf3339b5..d1db74909 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -76,6 +76,11 @@ struct port_vtab {
         * format.
         */
        int (*dump_16)(struct port *port, struct obuf *out);
+       /**
+        * Same as dump, but find a memory for an output buffer
+        * for itself.
+        */
+       const char *(*dump_raw)(struct port *port, uint32_t *size);
        /**
         * Destroy a port and release associated resources.
         */
@@ -158,6 +163,16 @@ port_dump(struct port *port, struct obuf *out);
 int
 port_dump_16(struct port *port, struct obuf *out);
 
+/**
+ * Same as port_dump(), but find a memory for an output buffer for
+ * itself.
+ * @param port Port to dump.
+ * @param[out] size Size of the data.
+ * @retval Data.
+ */
+const char *
+port_dump_raw(struct port *port, uint32_t *size);
+
 void
 port_init(void);
 
diff --git a/src/box/session.cc b/src/box/session.cc
index 908ec9c4e..793393fc8 100644
--- a/src/box/session.cc
+++ b/src/box/session.cc
@@ -63,6 +63,7 @@ static const struct session_owner_vtab 
generic_session_owner_vtab = {
        /* .dup = */ generic_session_owner_dup,
        /* .delete = */ (void (*)(struct session_owner *)) free,
        /* .fd = */ generic_session_owner_fd,
+       /* .push = */ generic_session_owner_push,
 };
 
 static struct session_owner *
@@ -95,6 +96,19 @@ session_owner_create(struct session_owner *owner, enum 
session_type type)
        owner->vtab = &generic_session_owner_vtab;
 }
 
+int
+generic_session_owner_push(struct session_owner *owner, uint64_t sync,
+                          struct port *port)
+{
+       (void) owner;
+       (void) sync;
+       (void) port;
+       const char *session =
+               tt_sprintf("Session '%s'", session_type_strs[owner->type]);
+       diag_set(ClientError, ER_UNSUPPORTED, session, "push()");
+       return -1;
+}
+
 static inline uint64_t
 sid_max()
 {
diff --git a/src/box/session.h b/src/box/session.h
index 105dcab17..b5f4955d8 100644
--- a/src/box/session.h
+++ b/src/box/session.h
@@ -59,6 +59,7 @@ enum session_type {
 extern const char *session_type_strs[];
 
 struct session_owner_vtab;
+struct port;
 
 /**
  * Object to store session type specific data. For example, IProto
@@ -78,8 +79,18 @@ struct session_owner_vtab {
        void (*free)(struct session_owner *);
        /** Get the descriptor of an owner, if has. Else -1. */
        int (*fd)(const struct session_owner *);
+       /** Push a port data into a session owner's channel. */
+       int (*push)(struct session_owner *, uint64_t, struct port *);
 };
 
+/**
+ * In a common case, a session does not support push. This
+ * function always returns -1 and sets ER_UNSUPPORTED error.
+ */
+int
+generic_session_owner_push(struct session_owner *owner, uint64_t sync,
+                          struct port *port);
+
 static inline struct session_owner *
 session_owner_dup(struct session_owner *owner)
 {
@@ -142,6 +153,12 @@ session_fd(const struct session *session)
        return session->owner->vtab->fd(session->owner);
 }
 
+static inline int
+session_push(struct session *session, uint64_t sync, struct port *port)
+{
+       return session->owner->vtab->push(session->owner, sync, port);
+}
+
 /**
  * Find a session by id.
  */
diff --git a/src/box/xrow.c b/src/box/xrow.c
index f48525645..f3e929f69 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -43,6 +43,9 @@
 #include "scramble.h"
 #include "iproto_constants.h"
 
+static_assert(IPROTO_DATA < 0x7f && IPROTO_PUSH < 0x7f,
+             "encoded IPROTO_BODY keys must fit into one byte");
+
 int
 xrow_header_decode(struct xrow_header *header, const char **pos,
                   const char *end)
@@ -231,6 +234,9 @@ struct PACKED iproto_body_bin {
        uint32_t v_data_len;               /* string length of array size */
 };
 
+static_assert(sizeof(struct iproto_body_bin) + IPROTO_HEADER_LEN ==
+             IPROTO_SELECT_HEADER_LEN, "size of the prepared select");
+
 static const struct iproto_body_bin iproto_body_bin = {
        0x81, IPROTO_DATA, 0xdd, 0
 };
@@ -239,6 +245,19 @@ static const struct iproto_body_bin iproto_error_bin = {
        0x81, IPROTO_ERROR, 0xdb, 0
 };
 
+struct PACKED iproto_body_push_bin {
+       uint8_t m_body;       /* MP_MAP */
+       uint8_t k_data;       /* IPROTO_PUSH */
+       uint8_t v_data;       /* 1-size MP_ARRAY */
+};
+
+static_assert(sizeof(struct iproto_body_push_bin) + IPROTO_HEADER_LEN ==
+             IPROTO_PUSH_HEADER_LEN, "size of the prepared push");
+
+static const struct iproto_body_push_bin iproto_push_bin = {
+       0x81, IPROTO_PUSH, 0x91
+};
+
 /** Return a 4-byte numeric error code, with status flags. */
 static inline uint32_t
 iproto_encode_error(uint32_t error)
@@ -337,23 +356,21 @@ iproto_write_error(int fd, const struct error *e, 
uint32_t schema_version,
        (void) write(fd, e->errmsg, msg_len);
 }
 
-enum { SVP_SIZE = IPROTO_HEADER_LEN  + sizeof(iproto_body_bin) };
-
 int
-iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
+iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size)
 {
        /**
         * Reserve memory before taking a savepoint.
         * This ensures that we get a contiguous chunk of memory
         * and the savepoint is pointing at the beginning of it.
         */
-       void *ptr = obuf_reserve(buf, SVP_SIZE);
+       void *ptr = obuf_reserve(buf, size);
        if (ptr == NULL) {
-               diag_set(OutOfMemory, SVP_SIZE, "obuf", "reserve");
+               diag_set(OutOfMemory, size, "obuf_reserve", "ptr");
                return -1;
        }
        *svp = obuf_create_svp(buf);
-       ptr = obuf_alloc(buf, SVP_SIZE);
+       ptr = obuf_alloc(buf, size);
        assert(ptr !=  NULL);
        return 0;
 }
@@ -373,6 +390,17 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp 
*svp, uint64_t sync,
        memcpy(pos + IPROTO_HEADER_LEN, &body, sizeof(body));
 }
 
+void
+iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+                 uint32_t schema_version)
+{
+       char *pos = (char *) obuf_svp_to_ptr(buf, svp);
+       iproto_header_encode(pos, IPROTO_OK, sync, schema_version,
+                            obuf_size(buf) - svp->used - IPROTO_HEADER_LEN);
+       memcpy(pos + IPROTO_HEADER_LEN, &iproto_push_bin,
+              sizeof(iproto_push_bin));
+}
+
 int
 xrow_decode_dml(struct xrow_header *row, struct request *request,
                uint64_t key_map)
diff --git a/src/box/xrow.h b/src/box/xrow.h
index d407d151b..d2ec394c5 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -50,6 +50,10 @@ enum {
        XROW_HEADER_LEN_MAX = 40,
        XROW_BODY_LEN_MAX = 128,
        IPROTO_HEADER_LEN = 28,
+       /** 7 = sizeof(iproto_body_bin). */
+       IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
+       /** 3 = sizeof(iproto_body_push_bin). */
+       IPROTO_PUSH_HEADER_LEN = IPROTO_HEADER_LEN + 3,
 };
 
 struct xrow_header {
@@ -344,8 +348,18 @@ iproto_header_encode(char *data, uint32_t type, uint64_t 
sync,
 struct obuf;
 struct obuf_svp;
 
+/**
+ * Reserve obuf space for a header, which depends on a response
+ * size.
+ */
 int
-iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp);
+iproto_prepare_header(struct obuf *buf, struct obuf_svp *svp, size_t size);
+
+static inline int
+iproto_prepare_select(struct obuf *buf, struct obuf_svp *svp)
+{
+       return iproto_prepare_header(buf, svp, IPROTO_SELECT_HEADER_LEN);
+}
 
 /**
  * Write select header to a preallocated buffer.
@@ -355,6 +369,16 @@ void
 iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
                    uint32_t schema_version, uint32_t count);
 
+static inline int
+iproto_prepare_push(struct obuf *buf, struct obuf_svp *svp)
+{
+       return iproto_prepare_header(buf, svp, IPROTO_PUSH_HEADER_LEN);
+}
+
+void
+iproto_reply_push(struct obuf *buf, struct obuf_svp *svp, uint64_t sync,
+                 uint32_t schema_version);
+
 /**
  * Encode iproto header with IPROTO_OK response code.
  * @param out Encode to.
diff --git a/src/fio.c b/src/fio.c
index b79d3d058..a813b7760 100644
--- a/src/fio.c
+++ b/src/fio.c
@@ -134,6 +134,18 @@ fio_writen(int fd, const void *buf, size_t count)
        return 0;
 }
 
+int
+fio_write_silent(int fd, const void *buf, size_t count)
+{
+       ssize_t nwr = write(fd, buf, count);
+       if (nwr >= 0)
+               return nwr;
+       if (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN)
+               return 0;
+       else
+               return -1;
+}
+
 ssize_t
 fio_writev(int fd, struct iovec *iov, int iovcnt)
 {
diff --git a/src/fio.h b/src/fio.h
index 12749afcb..c88e19258 100644
--- a/src/fio.h
+++ b/src/fio.h
@@ -108,6 +108,22 @@ fio_pread(int fd, void *buf, size_t count, off_t offset);
 int
 fio_writen(int fd, const void *buf, size_t count);
 
+/**
+ * Write the given buffer with no retrying for partial writes,
+ * and ignore transient write errors: EINTR, EWOULDBLOCK and
+ * EAGAIN - these errors are not logged, and the function returns
+ * 0.
+ * @param fd File descriptor to write to.
+ * @param buf Buffer to write.
+ * @param count Size of @a buf.
+ *
+ * @retval  -1 Non-transient error.
+ * @retval   0 Nothing to write, or a transient error.
+ * @retval > 0 Written byte count.
+ */
+int
+fio_write_silent(int fd, const void *buf, size_t count);
+
 /**
  * A simple wrapper around writev().
  * Re-tries write in case of EINTR.
diff --git a/test/app-tap/console.test.lua b/test/app-tap/console.test.lua
index 48d28bd6d..c91576dc2 100755
--- a/test/app-tap/console.test.lua
+++ b/test/app-tap/console.test.lua
@@ -21,7 +21,7 @@ local EOL = "\n...\n"
 
 test = tap.test("console")
 
-test:plan(59)
+test:plan(61)
 
 -- Start console and connect to it
 local server = console.listen(CONSOLE_SOCKET)
@@ -31,6 +31,13 @@ local handshake = client:read{chunk = 128}
 test:ok(string.match(handshake, '^Tarantool .*console') ~= nil, 'Handshake')
 test:ok(client ~= nil, "connect to console")
 
+--
+-- gh-2677: box.session.push, text protocol support.
+--
+client:write('box.session.push(200, {sync = 0})\n')
+test:is(client:read(EOL):find('push:'), 1, "read pushed value")
+test:is(client:read(EOL):find('true'), 7, "read box.session.push result")
+
 -- Execute some command
 client:write("1\n")
 test:is(yaml.decode(client:read(EOL))[1], 1, "eval")
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 46d85b327..ecf86af0d 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -28,7 +28,7 @@ function x_select(cn, space_id, index_id, iterator, offset, 
limit, key, opts)
     return cn:_request('select', opts, space_id, index_id, iterator,
                        offset, limit, key)
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, 
'\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, 
nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 ---
 ...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 87e26f84c..2c19ee479 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -11,7 +11,7 @@ function x_select(cn, space_id, index_id, iterator, offset, 
limit, key, opts)
     return cn:_request('select', opts, space_id, index_id, iterator,
                        offset, limit, key)
 end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, 
'\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil, 
nil, '\x80') end
 test_run:cmd("setopt delimiter ''");
 
 LISTEN = require('uri').parse(box.cfg.listen)
diff --git a/test/box/push.result b/test/box/push.result
new file mode 100644
index 000000000..747503f59
--- /dev/null
+++ b/test/box/push.result
@@ -0,0 +1,364 @@
+--
+-- gh-2677: box.session.push.
+--
+--
+-- Usage.
+--
+box.session.push()
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+box.session.push(100)
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+box.session.push(100, {sync = -200})
+---
+- error: 'Usage: box.session.push(data, opts)'
+...
+--
+-- Test text protocol.
+--
+test_run = require('test_run').new()
+---
+...
+console = require('console')
+---
+...
+netbox = require('net.box')
+---
+...
+fiber = require('fiber')
+---
+...
+s = console.listen(3434)
+---
+...
+c = netbox.connect(3434, {console = true})
+---
+...
+c:eval('100')
+---
+- '---
+
+  - 100
+
+  ...
+
+'
+...
+messages = {}
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function on_push(message)
+    table.insert(messages, message)
+end;
+---
+...
+function do_pushes()
+    local sync = box.session.sync()
+    for i = 1, 5 do
+        box.session.push(i, {sync = sync})
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+-- Ensure a pushed message can ignored on a client.
+c:eval('do_pushes()')
+---
+- '---
+
+  - 300
+
+  ...
+
+'
+...
+-- Now start catching pushes.
+c:eval('do_pushes()', {on_push = on_push})
+---
+- '---
+
+  - 300
+
+  ...
+
+'
+...
+messages
+---
+- - '---
+
+    - 1
+
+    ...
+
+'
+  - '---
+
+    - 2
+
+    ...
+
+'
+  - '---
+
+    - 3
+
+    ...
+
+'
+  - '---
+
+    - 4
+
+    ...
+
+'
+  - '---
+
+    - 5
+
+    ...
+
+'
+...
+c:close()
+---
+...
+s:close()
+---
+- true
+...
+--
+-- Test binary protocol.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+---
+...
+c = netbox.connect(box.cfg.listen)
+---
+...
+c:ping()
+---
+- true
+...
+messages = {}
+---
+...
+c:call('do_pushes', {}, {on_push = on_push})
+---
+- 300
+...
+messages
+---
+- - 1
+  - 2
+  - 3
+  - 4
+  - 5
+...
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+catchers = {}
+---
+...
+started = 0
+---
+...
+finished = 0
+---
+...
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+---
+...
+pk = s:create_index('pk')
+---
+...
+c:reload_schema()
+---
+...
+test_run:cmd("setopt delimiter ';'")
+---
+- true
+...
+function dml_push_and_dml(key)
+    local sync = box.session.sync()
+    box.session.push('started dml', {sync = sync})
+    s:replace{key}
+    box.session.push('continued dml', {sync = sync})
+    s:replace{-key}
+    box.session.push('finished dml', {sync = sync})
+    return key
+end;
+---
+...
+function do_pushes(val)
+    local sync = box.session.sync()
+    for i = 1, 5 do
+        box.session.push(i, {sync = sync})
+        fiber.yield()
+    end
+    return val
+end;
+---
+...
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started}, {on_push = 
function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = 
function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+---
+...
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+---
+- 300
+...
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+---
+...
+while finished ~= 400 do fiber.sleep(0.1) end;
+---
+...
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3)
+        assert(c.messages[1] == 'started dml')
+        assert(c.messages[2] == 'continued dml')
+        assert(c.messages[3] == 'finished dml')
+        assert(s:get{c.retval})
+        assert(s:get{-c.retval})
+    else
+        assert(c.retval)
+        assert(#c.messages == 5)
+        for k, v in pairs(c.messages) do
+            assert(k == v)
+        end
+    end
+end;
+---
+...
+test_run:cmd("setopt delimiter ''");
+---
+- true
+...
+#s:select{}
+---
+- 400
+...
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+---
+...
+msgpack = require('msgpack')
+---
+...
+messages = {}
+---
+...
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+---
+...
+resp_len
+---
+- 10
+...
+messages
+---
+- - 4
+  - 4
+  - 4
+  - 4
+  - 4
+...
+decoded = {}
+---
+...
+r = nil
+---
+...
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) 
table.insert(decoded, r) end
+---
+...
+decoded
+---
+- - {50: [1]}
+  - {50: [2]}
+  - {50: [3]}
+  - {50: [4]}
+  - {50: [5]}
+...
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+r
+---
+- {48: [300]}
+...
+c:close()
+---
+...
+s:drop()
+---
+...
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+---
+...
+--
+-- Ensure can not push in background.
+--
+ok = nil
+---
+...
+err = nil
+---
+...
+function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end
+---
+...
+f = fiber.create(back_push_f)
+---
+...
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+---
+...
+ok, err
+---
+- false
+- Session 'background' does not support push()
+...
diff --git a/test/box/push.test.lua b/test/box/push.test.lua
new file mode 100644
index 000000000..02d571876
--- /dev/null
+++ b/test/box/push.test.lua
@@ -0,0 +1,163 @@
+--
+-- gh-2677: box.session.push.
+--
+
+--
+-- Usage.
+--
+box.session.push()
+box.session.push(100)
+box.session.push(100, {sync = -200})
+
+--
+-- Test text protocol.
+--
+test_run = require('test_run').new()
+console = require('console')
+netbox = require('net.box')
+fiber = require('fiber')
+s = console.listen(3434)
+c = netbox.connect(3434, {console = true})
+c:eval('100')
+messages = {}
+test_run:cmd("setopt delimiter ';'")
+function on_push(message)
+    table.insert(messages, message)
+end;
+
+function do_pushes()
+    local sync = box.session.sync()
+    for i = 1, 5 do
+        box.session.push(i, {sync = sync})
+        fiber.sleep(0.01)
+    end
+    return 300
+end;
+test_run:cmd("setopt delimiter ''");
+-- Ensure a pushed message can ignored on a client.
+c:eval('do_pushes()')
+-- Now start catching pushes.
+c:eval('do_pushes()', {on_push = on_push})
+messages
+
+c:close()
+s:close()
+
+--
+-- Test binary protocol.
+--
+box.schema.user.grant('guest','read,write,execute','universe')
+
+c = netbox.connect(box.cfg.listen)
+c:ping()
+messages = {}
+c:call('do_pushes', {}, {on_push = on_push})
+messages
+
+-- Add a little stress: many pushes with different syncs, from
+-- different fibers and DML/DQL requests.
+
+catchers = {}
+started = 0
+finished = 0
+s = box.schema.create_space('test', {format = {{'field1', 'integer'}}})
+pk = s:create_index('pk')
+c:reload_schema()
+test_run:cmd("setopt delimiter ';'")
+function dml_push_and_dml(key)
+    local sync = box.session.sync()
+    box.session.push('started dml', {sync = sync})
+    s:replace{key}
+    box.session.push('continued dml', {sync = sync})
+    s:replace{-key}
+    box.session.push('finished dml', {sync = sync})
+    return key
+end;
+function do_pushes(val)
+    local sync = box.session.sync()
+    for i = 1, 5 do
+        box.session.push(i, {sync = sync})
+        fiber.yield()
+    end
+    return val
+end;
+function push_catcher_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = false}
+    catcher.retval = c:call('do_pushes', {started}, {on_push = 
function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+function dml_push_and_dml_f()
+    fiber.yield()
+    started = started + 1
+    local catcher = {messages = {}, retval = nil, is_dml = true}
+    catcher.retval = c:call('dml_push_and_dml', {started}, {on_push = 
function(message)
+        table.insert(catcher.messages, message)
+    end})
+    table.insert(catchers, catcher)
+    finished = finished + 1
+end;
+-- At first check that a pushed message can be ignored in a binary
+-- protocol too.
+c:call('do_pushes', {300});
+-- Then do stress.
+for i = 1, 200 do
+    fiber.create(dml_push_and_dml_f)
+    fiber.create(push_catcher_f)
+end;
+while finished ~= 400 do fiber.sleep(0.1) end;
+
+for _, c in pairs(catchers) do
+    if c.is_dml then
+        assert(#c.messages == 3)
+        assert(c.messages[1] == 'started dml')
+        assert(c.messages[2] == 'continued dml')
+        assert(c.messages[3] == 'finished dml')
+        assert(s:get{c.retval})
+        assert(s:get{-c.retval})
+    else
+        assert(c.retval)
+        assert(#c.messages == 5)
+        for k, v in pairs(c.messages) do
+            assert(k == v)
+        end
+    end
+end;
+test_run:cmd("setopt delimiter ''");
+
+#s:select{}
+
+--
+-- Test binary pushes.
+--
+ibuf = require('buffer').ibuf()
+msgpack = require('msgpack')
+messages = {}
+resp_len = c:call('do_pushes', {300}, {on_push = on_push, buffer = ibuf})
+resp_len
+messages
+decoded = {}
+r = nil
+for i = 1, #messages do r, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos) 
table.insert(decoded, r) end
+decoded
+r, _ = msgpack.decode_unchecked(ibuf.rpos)
+r
+
+c:close()
+s:drop()
+
+box.schema.user.revoke('guest', 'read,write,execute', 'universe')
+
+--
+-- Ensure can not push in background.
+--
+ok = nil
+err = nil
+function back_push_f() ok, err = pcall(box.session.push, 100, {sync = 100}) end
+f = fiber.create(back_push_f)
+while f:status() ~= 'dead' do fiber.sleep(0.01) end
+ok, err
diff --git a/test/replication/before_replace.result 
b/test/replication/before_replace.result
index d561b4813..7b0f2993f 100644
--- a/test/replication/before_replace.result
+++ b/test/replication/before_replace.result
@@ -49,7 +49,17 @@ test_run:cmd("switch autobootstrap3");
 ---
 - true
 ...
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
 _ = box.space.test:before_replace(function(old, new)
+    if box.session.type() == 'applier' and not push_err then
+        push_ok, push_err = pcall(box.session.push, 100, {sync = 100})
+    end
     if old ~= nil and new ~= nil then
         return new[2] > old[2] and new or old
     end
@@ -187,6 +197,10 @@ box.space.test:select()
   - [9, 90]
   - [10, 100]
 ...
+push_err
+---
+- Session 'applier' does not support push()
+...
 test_run:cmd('restart server autobootstrap3')
 box.space.test:select()
 ---
diff --git a/test/replication/before_replace.test.lua 
b/test/replication/before_replace.test.lua
index 2c6912d06..04e5a2172 100644
--- a/test/replication/before_replace.test.lua
+++ b/test/replication/before_replace.test.lua
@@ -26,7 +26,17 @@ _ = box.space.test:before_replace(function(old, new)
     end
 end);
 test_run:cmd("switch autobootstrap3");
+--
+-- gh-2677 - test that an applier can not push() messages. Applier
+-- session is available in Lua, so the test is here instead of
+-- box/push.test.lua.
+--
+push_ok = nil
+push_err = nil
 _ = box.space.test:before_replace(function(old, new)
+    if box.session.type() == 'applier' and not push_err then
+        push_ok, push_err = pcall(box.session.push, 100, {sync = 100})
+    end
     if old ~= nil and new ~= nil then
         return new[2] > old[2] and new or old
     end
@@ -62,6 +72,7 @@ test_run:cmd('restart server autobootstrap2')
 box.space.test:select()
 test_run:cmd("switch autobootstrap3")
 box.space.test:select()
+push_err
 test_run:cmd('restart server autobootstrap3')
 box.space.test:select()
 
-- 
2.14.3 (Apple Git-98)


Other related posts: