Fixed for the case when we're read into a non-empty buffer.
The patch is below.
WBR, Alexander Turenko.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 08bfb3444..c6ed3e138 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -554,18 +554,18 @@ local function create_transport(host, port, user,
password, callback,
if buffer ~= nil then
-- Copy xrow.body to user-provided buffer
local body_len = body_end - body_rpos
- local wpos = buffer:alloc(body_len)
- ffi.copy(wpos, body_rpos, body_len)
- body_len = tonumber(body_len)
if request.skip_header then
-- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
local map_len, key
- map_len, buffer.rpos = decode_map(buffer.rpos, buffer:size())
+ map_len, body_rpos = decode_map(body_rpos, body_len)
assert(map_len == 1)
- key, buffer.rpos = decode(buffer.rpos)
+ key, body_rpos = decode(body_rpos)
assert(key == IPROTO_DATA_KEY)
- body_len = buffer:size()
+ body_len = body_end - body_rpos
end
+ local wpos = buffer:alloc(body_len)
+ ffi.copy(wpos, body_rpos, body_len)
+ body_len = tonumber(body_len)
if status == IPROTO_OK_KEY then
request.response = body_len
requests[id] = nil
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 1cba78a5f..37f615323 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1912,6 +1912,41 @@ result
---
- []
...
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
---
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 0fe948c29..9fda23088 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -737,6 +737,18 @@ c:eval("echo(...)", nil, {buffer = ibuf, skip_header =
true})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- make several request into a buffer with skip_header, then read
+-- results
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
c.space.test.index.primary:min({}, { buffer = ibuf})
On Fri, Feb 01, 2019 at 06:11:41PM +0300, Alexander Turenko wrote:
Splitted this patch to two ones:
* lua: add non-recursive msgpack decoding functions
* net.box: add skip_header option to use with buffer
I attached them at end of the email.
WBR, Alexander Turenko.
On Thu, Jan 10, 2019 at 08:29:33PM +0300, Vladimir Davydov wrote:
On Wed, Jan 09, 2019 at 11:20:13PM +0300, Alexander Turenko wrote:
Needed for #3276.
@TarantoolBot document
Title: net.box: helpers to decode msgpack headers
They allow to skip iproto packet and msgpack array headers and pass raw
msgpack data to some other function, say, merger.
Contracts:
```
net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
-> new_rpos
-> nil, err_msg
I'd prefer if this was done right in net.box.select or whatever function
writing the response to ibuf. Yes, this is going to break backward
compatibility, but IMO it's OK for 2.1 - I doubt anybody have used this
weird high perf API anyway.
1. This will break tarantool/shard.
2. Hey, Guido thinks it is okay to break compatibility btw Python 2 and
Python 3 and it seems that Python 2 is in use ten years or like so.
I can do it under a separate option: skip_iproto_header or skip_header.
It is not about a packet header, but part of body, however I have no
better variants.
msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
-> new_rpos, arr_len
-> nil, err_msg
This seems to be OK, although I'm not sure if we really need to check
the length in this function. Looks like we will definitely need it
because of net.box.call, which wraps function return value in an array.
Not sure about the name either, because it doesn't just checks the
msgpack - it decodes it, but can't come up with anything substantially
better. May be, msgpack.decode_array?
Re check length: the reason was to simplify user's code, but ok, it will
not much more complex if we'll factor this check out. Like so (except
from the merger's commit message):
```
conn:call('batch_select', <...>, {buffer = buf, skip_header = true})
local len, _
len, buf.rpos = msgpack.decode_array(buf.rpos, buf:size())
assert(len == 1)
_, buf.rpos = msgpack.decode_array(buf.rpos, buf:size())
```
Re name: now I understood: decode_unchecked() is like mp_decode(),
decode() is like mp_check() + mp_decode(). So it worth to rename it to
decode_array(). Done.
Also I changed order of return values to match msgpack.decode() (before
it matches msgpack.ibuf_decode()).
```
Below the example with msgpack.decode() as the function that need raw
msgpack data. It is just to illustrate the approach, there is no sense
to skip iproto/array headers manually in Lua and then decode the rest in
Lua. But it worth when the raw msgpack data is subject to process in a C
module.
```lua
local function single_select(space, ...)
return box.space[space]:select(...)
end
local function batch_select(spaces, ...)
local res = {}
for _, space in ipairs(spaces) do
table.insert(res, box.space[space]:select(...))
end
return res
end
_G.single_select = single_select
_G.batch_select = batch_select
local res
local buf = buffer.ibuf()
conn.space.s:select(nil, {buffer = buf})
-- check and skip iproto_data header
buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos -
buf.rpos))
-- check that we really got data from :select() as result
res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
-- check that the buffer ends
assert(buf.rpos == buf.wpos)
buf:recycle()
conn:call('single_select', {'s'}, {buffer = buf})
-- check and skip the iproto_data header
buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos -
buf.rpos))
-- check and skip the array around return values
buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1))
-- check that we really got data from :select() as result
res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
-- check that the buffer ends
assert(buf.rpos == buf.wpos)
buf:recycle()
local spaces = {'s', 't'}
conn:call('batch_select', {spaces}, {buffer = buf})
-- check and skip the iproto_data header
buf.rpos = assert(net_box.check_iproto_data(buf.rpos, buf.wpos -
buf.rpos))
-- check and skip the array around return values
buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, 1))
-- check and skip the array header before the first select result
buf.rpos = assert(msgpack.check_array(buf.rpos, buf.wpos - buf.rpos,
#spaces))
-- check that we really got data from s:select() as result
res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
-- t:select() data
res, buf.rpos = msgpack.decode(buf.rpos, buf.wpos - buf.rpos)
-- check that the buffer ends
assert(buf.rpos == buf.wpos)
```
---
src/box/lua/net_box.c | 49 +++++++++++
src/box/lua/net_box.lua | 1 +
src/lua/msgpack.c | 66 ++++++++++++++
test/app-tap/msgpack.test.lua | 157 +++++++++++++++++++++++++++++++++-
test/box/net.box.result | 58 +++++++++++++
test/box/net.box.test.lua | 26 ++++++
6 files changed, 356 insertions(+), 1 deletion(-)
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index c7063d9c8..d71f33768 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -51,6 +51,9 @@
#define cfg luaL_msgpack_default
+static uint32_t CTID_CHAR_PTR;
+static uint32_t CTID_CONST_CHAR_PTR;
+
static inline size_t
netbox_prepare_request(lua_State *L, struct mpstream *stream, uint32_t
r_type)
{
@@ -745,9 +748,54 @@ netbox_decode_execute(struct lua_State *L)
return 2;
}
+/**
+ * net_box.check_iproto_data(buf.rpos, buf.wpos - buf.rpos)
+ * -> new_rpos
+ * -> nil, err_msg
+ */
+int
+netbox_check_iproto_data(struct lua_State *L)
Instead of adding this function to net_box.c, I'd rather try to add
msgpack helpers for decoding a map, similar to msgpack.check_array added
by your patch, and use them in net_box.lua.
Done.
We discussed that we should add such helpers for all types like nil,
bool, number, string, maybe bin. I think we can reuse recursive
msgpack.decode() if we expect a scalar value.
+{
+ uint32_t ctypeid;
+ const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid);
+ if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR)
+ return luaL_error(L,
+ "net_box.check_iproto_data: 'char *' or "
+ "'const char *' expected");
+
+ if (!lua_isnumber(L, 2))
+ return luaL_error(L, "net_box.check_iproto_data: number "
+ "expected as 2nd argument");
+ const char *end = data + lua_tointeger(L, 2);
+
+ int ok = data < end &&
+ mp_typeof(*data) == MP_MAP &&
+ mp_check_map(data, end) <= 0 &&
+ mp_decode_map(&data) == 1 &&
+ data < end &&
+ mp_typeof(*data) == MP_UINT &&
+ mp_check_uint(data, end) <= 0 &&
+ mp_decode_uint(&data) == IPROTO_DATA;
+
+ if (!ok) {
+ lua_pushnil(L);
+ lua_pushstring(L,
+ "net_box.check_iproto_data: wrong iproto data packet");
+ return 2;
+ }
+
+ *(const char **) luaL_pushcdata(L, ctypeid) = data;
+ return 1;
+}
+
int
luaopen_net_box(struct lua_State *L)
{
+ CTID_CHAR_PTR = luaL_ctypeid(L, "char *");
+ assert(CTID_CHAR_PTR != 0);
+ CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
+ assert(CTID_CONST_CHAR_PTR != 0);
+
static const luaL_Reg net_box_lib[] = {
{ "encode_ping", netbox_encode_ping },
{ "encode_call_16", netbox_encode_call_16 },
@@ -765,6 +813,7 @@ luaopen_net_box(struct lua_State *L)
{ "communicate", netbox_communicate },
{ "decode_select", netbox_decode_select },
{ "decode_execute", netbox_decode_execute },
+ { "check_iproto_data", netbox_check_iproto_data },
{ NULL, NULL}
};
/* luaL_register_module polutes _G */
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 2bf772aa8..0a38efa5a 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -1424,6 +1424,7 @@ local this_module = {
new = connect, -- Tarantool < 1.7.1 compatibility,
wrap = wrap,
establish_connection = establish_connection,
+ check_iproto_data = internal.check_iproto_data,
}
function this_module.timeout(timeout, ...)
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index b47006038..fca440660 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -51,6 +51,7 @@ luamp_error(void *error_ctx)
}
static uint32_t CTID_CHAR_PTR;
+static uint32_t CTID_CONST_CHAR_PTR;
static uint32_t CTID_STRUCT_IBUF;
struct luaL_serializer *luaL_msgpack_default = NULL;
@@ -418,6 +419,68 @@ lua_ibuf_msgpack_decode(lua_State *L)
return 2;
}
+/**
+ * msgpack.check_array(buf.rpos, buf.wpos - buf.rpos, [, arr_len])
+ * -> new_rpos, arr_len
+ * -> nil, err_msg
+ */
+static int
+lua_check_array(lua_State *L)
+{
+ uint32_t ctypeid;
+ const char *data = *(const char **) luaL_checkcdata(L, 1, &ctypeid);
+ if (ctypeid != CTID_CHAR_PTR && ctypeid != CTID_CONST_CHAR_PTR)
Hm, msgpack.decode doesn't care about CTID_CONST_CHAR_PTR. Why should we?
It looks natural to support a const pointer where we allow non-const
one. But I don't have an example where we can obtain 'const char *'
buffer with msgpack in Lua (w/o ffi.cast()). Msgpackffi returns 'const
unsigned char *', but it is the bug and should be fixed in
https://github.com/tarantool/tarantool/issues/3926
+ return luaL_error(L, "msgpack.check_array: 'char *' or "
+ "'const char *' expected");
+
+ if (!lua_isnumber(L, 2))
+ return luaL_error(L, "msgpack.check_array: number expected as "
+ "2nd argument");
+ const char *end = data + lua_tointeger(L, 2);
+
+ if (!lua_isnoneornil(L, 3) && !lua_isnumber(L, 3))
+ return luaL_error(L, "msgpack.check_array: number or nil "
+ "expected as 3rd argument");
Why not simply luaL_checkinteger?
We can separatelly check lua_gettop() and use luaL_checkinteger(). It
looks shorter, now I see. Fixed.
+
+ static const char *end_of_buffer_msg = "msgpack.check_array: "
+ "unexpected end of buffer";
No point to make this variable static.
Ok. But now I removed it.
+
+ if (data >= end) {
+ lua_pushnil(L);
+ lua_pushstring(L, end_of_buffer_msg);
msgpack.decode throws an error when it fails to decode msgpack data, so
I think this function should throw too.
Or Lua code style states we should report errors with `nil, err`. But
this aspect is more about external modules as I see. It is quite unclear
what is the best option for built-in modules.
If one likely want to handle an error in Lua the `nil, err` approach
looks better. As far as I know at least some of our commercial projects
primarily use this approach and have to wrap many functions with pcall.
Don't sure how much the overhead is.
But anyway other msgpack functions just raise an error and it seems the
new functions should have similar contract.
Changed.
+ return 2;
+ }
+
+ if (mp_typeof(*data) != MP_ARRAY) {
+ lua_pushnil(L);
+ lua_pushstring(L, "msgpack.check_array: wrong array header");
+ return 2;
+ }
+
+ if (mp_check_array(data, end) > 0) {
+ lua_pushnil(L);
+ lua_pushstring(L, end_of_buffer_msg);
+ return 2;
+ }
+
+ uint32_t len = mp_decode_array(&data);
+
+ if (!lua_isnoneornil(L, 3)) {
+ uint32_t exp_len = (uint32_t) lua_tointeger(L, 3);
IMO it would be better if you set exp_len when you checked the arguments
(using luaL_checkinteger).
Expected length was removed from the function as you suggested.
+ if (len != exp_len) {
+ lua_pushnil(L);
+ lua_pushfstring(L, "msgpack.check_array: expected "
+ "array of length %d, got length %d",
+ len, exp_len);
+ return 2;
+ }
+ }
+
+ *(const char **) luaL_pushcdata(L, ctypeid) = data;
+ lua_pushinteger(L, len);
+ return 2;
+}
+
static int
lua_msgpack_new(lua_State *L);
@@ -426,6 +489,7 @@ static const luaL_Reg msgpacklib[] = {
{ "decode", lua_msgpack_decode },
{ "decode_unchecked", lua_msgpack_decode_unchecked },
{ "ibuf_decode", lua_ibuf_msgpack_decode },
+ { "check_array", lua_check_array },
{ "new", lua_msgpack_new },
{ NULL, NULL }
};
@@ -447,6 +511,8 @@ luaopen_msgpack(lua_State *L)
assert(CTID_STRUCT_IBUF != 0);
CTID_CHAR_PTR = luaL_ctypeid(L, "char *");
assert(CTID_CHAR_PTR != 0);
+ CTID_CONST_CHAR_PTR = luaL_ctypeid(L, "const char *");
+ assert(CTID_CONST_CHAR_PTR != 0);
luaL_msgpack_default = luaL_newserializer(L, "msgpack", msgpacklib);
return 1;
}
----
commit 8c820dff279734d79e26591dcb771f7c6ab13639
Author: Alexander Turenko <alexander.turenko@xxxxxxxxxxxxx>
Date: Thu Jan 31 01:45:22 2019 +0300
lua: add non-recursive msgpack decoding functions
Needed for #3276.
@TarantoolBot document
Title: Non-recursive msgpack decoding functions
Contracts:
```
msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
```
These functions are intended to be used with a msgpack buffer received
from net.box. A user may want to skip {[IPROTO_DATA_KEY] = ...} wrapper
and an array header before pass the buffer to decode in some C function.
See https://github.com/tarantool/tarantool/issues/2195 for more
information re this net.box's API.
Consider merger's docbot comment for usage examples.
diff --git a/src/lua/msgpack.c b/src/lua/msgpack.c
index b47006038..92a9efd25 100644
--- a/src/lua/msgpack.c
+++ b/src/lua/msgpack.c
@@ -418,6 +418,84 @@ lua_ibuf_msgpack_decode(lua_State *L)
return 2;
}
+/**
+ * Verify and set arguments: data and size.
+ *
+ * Always return 0. In case of any fail raise a Lua error.
+ */
+static int
+verify_decode_args(lua_State *L, const char *func_name, const char **data_p,
+ ptrdiff_t *size_p)
+{
+ /* Verify arguments count. */
+ if (lua_gettop(L) != 2)
+ return luaL_error(L, "Usage: %s(ptr, size)", func_name);
+
+ /* Verify ptr type. */
+ uint32_t ctypeid;
+ const char *data = *(char **) luaL_checkcdata(L, 1, &ctypeid);
+ if (ctypeid != CTID_CHAR_PTR)
+ return luaL_error(L, "%s: 'char *' expected", func_name);
+
+ /* Verify size type and value. */
+ ptrdiff_t size = (ptrdiff_t) luaL_checkinteger(L, 2);
+ if (size <= 0)
+ return luaL_error(L, "%s: non-positive size", func_name);
+
+ *data_p = data;
+ *size_p = size;
+
+ return 0;
+}
+
+/**
+ * msgpack.decode_array(buf.rpos, buf:size()) -> arr_len, new_rpos
+ */
+static int
+lua_decode_array(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_array";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_ARRAY)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_array(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_array(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
+/**
+ * msgpack.decode_map(buf.rpos, buf:size()) -> map_len, new_rpos
+ */
+static int
+lua_decode_map(lua_State *L)
+{
+ const char *func_name = "msgpack.decode_map";
+ const char *data;
+ ptrdiff_t size;
+ verify_decode_args(L, func_name, &data, &size);
+
+ if (mp_typeof(*data) != MP_MAP)
+ return luaL_error(L, "%s: unexpected msgpack type", func_name);
+
+ if (mp_check_map(data, data + size) > 0)
+ return luaL_error(L, "%s: unexpected end of buffer", func_name);
+
+ uint32_t len = mp_decode_map(&data);
+
+ lua_pushinteger(L, len);
+ *(const char **) luaL_pushcdata(L, CTID_CHAR_PTR) = data;
+ return 2;
+}
+
static int
lua_msgpack_new(lua_State *L);
@@ -426,6 +504,8 @@ static const luaL_Reg msgpacklib[] = {
{ "decode", lua_msgpack_decode },
{ "decode_unchecked", lua_msgpack_decode_unchecked },
{ "ibuf_decode", lua_ibuf_msgpack_decode },
+ { "decode_array", lua_decode_array },
+ { "decode_map", lua_decode_map },
{ "new", lua_msgpack_new },
{ NULL, NULL }
};
diff --git a/test/app-tap/msgpack.test.lua b/test/app-tap/msgpack.test.lua
index 0e1692ad9..ee215dfb1 100755
--- a/test/app-tap/msgpack.test.lua
+++ b/test/app-tap/msgpack.test.lua
@@ -49,9 +49,186 @@ local function test_misc(test, s)
test:ok(not st and e:match("null"), "null ibuf")
end
+local function test_decode_array_map(test, s)
+ local ffi = require('ffi')
+
+ local usage_err = 'Usage: msgpack%.decode_[^_(]+%(ptr, size%)'
+ local end_of_buffer_err = 'msgpack%.decode_[^_]+: unexpected end of
buffer'
+ local non_positive_size_err = 'msgpack.decode_[^_]+: non%-positive size'
+
+ local decode_cases = {
+ {
+ 'fixarray',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\x94'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'array 16',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdc\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated array 16',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdc\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated array 32',
+ func = s.decode_array,
+ data = ffi.cast('char *', '\xdd\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'fixmap',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\x84'),
+ size = 1,
+ exp_len = 4,
+ exp_rewind = 1,
+ },
+ {
+ 'map 16',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xde\x00\x04'),
+ size = 3,
+ exp_len = 4,
+ exp_rewind = 3,
+ },
+ {
+ 'array 32',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00\x04'),
+ size = 5,
+ exp_len = 4,
+ exp_rewind = 5,
+ },
+ {
+ 'truncated map 16',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xde\x00'),
+ size = 2,
+ exp_err = end_of_buffer_err,
+ },
+ {
+ 'truncated map 32',
+ func = s.decode_map,
+ data = ffi.cast('char *', '\xdf\x00\x00\x00'),
+ size = 4,
+ exp_err = end_of_buffer_err,
+ },
+ }
+
+ local bad_api_cases = {
+ {
+ 'wrong msgpack type',
+ data = ffi.cast('char *', '\xc0'),
+ size = 1,
+ exp_err = 'msgpack.decode_[^_]+: unexpected msgpack type',
+ },
+ {
+ 'zero size buffer',
+ data = ffi.cast('char *', ''),
+ size = 0,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'negative size buffer',
+ data = ffi.cast('char *', ''),
+ size = -1,
+ exp_err = non_positive_size_err,
+ },
+ {
+ 'size is nil',
+ data = ffi.cast('char *', ''),
+ size = nil,
+ exp_err = 'bad argument',
+ },
+ {
+ 'no arguments',
+ args = {},
+ exp_err = usage_err,
+ },
+ {
+ 'one argument',
+ args = {ffi.cast('char *', '')},
+ exp_err = usage_err,
+ },
+ {
+ 'data is nil',
+ args = {nil, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data is not cdata',
+ args = {1, 1},
+ exp_err = 'expected cdata as 1 argument',
+ },
+ {
+ 'data with wrong cdata type',
+ args = {box.tuple.new(), 1},
+ exp_err = "msgpack.decode_[^_]+: 'char %*' expected",
+ },
+ {
+ 'size has wrong type',
+ args = {ffi.cast('char *', ''), 'eee'},
+ exp_err = 'bad argument',
+ },
+ }
+
+ test:plan(#decode_cases + 2 * #bad_api_cases)
+
+ -- Decode cases.
+ for _, case in ipairs(decode_cases) do
+ if case.exp_err ~= nil then
+ local ok, err = pcall(case.func, case.data, case.size)
+ local description = ('bad; %s'):format(case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ else
+ local len, new_buf = case.func(case.data, case.size)
+ local rewind = new_buf - case.data
+ local description = ('good; %s'):format(case[1])
+ test:is_deeply({len, rewind}, {case.exp_len, case.exp_rewind},
+ description)
+ end
+ end
+
+ -- Bad api usage cases.
+ for _, func_name in ipairs({'decode_array', 'decode_map'}) do
+ for _, case in ipairs(bad_api_cases) do
+ local ok, err
+ if case.args ~= nil then
+ local args_len = table.maxn(case.args)
+ ok, err = pcall(s[func_name], unpack(case.args, 1, args_len))
+ else
+ ok, err = pcall(s[func_name], case.data, case.size)
+ end
+ local description = ('%s bad api usage; %s'):format(func_name,
+ case[1])
+ test:ok(ok == false and err:match(case.exp_err), description)
+ end
+ end
+end
+
tap.test("msgpack", function(test)
local serializer = require('msgpack')
- test:plan(10)
+ test:plan(11)
test:test("unsigned", common.test_unsigned, serializer)
test:test("signed", common.test_signed, serializer)
test:test("double", common.test_double, serializer)
@@ -62,4 +239,5 @@ tap.test("msgpack", function(test)
test:test("ucdata", common.test_ucdata, serializer)
test:test("offsets", test_offsets, serializer)
test:test("misc", test_misc, serializer)
+ test:test("decode_array_map", test_decode_array_map, serializer)
end)
----
commit 3868d5c2551c893f16bd05c79d4d52a564c6a833
Author: Alexander Turenko <alexander.turenko@xxxxxxxxxxxxx>
Date: Thu Jan 31 01:59:18 2019 +0300
net.box: add skip_header option to use with buffer
Needed for #3276.
@TarantoolBot document
Title: net.box: skip_header option
This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper
from a buffer. This may be needed to pass this buffer to some C function
when it expects some specific msgpack input.
See src/box/lua/net_box.lua for examples. Also consider merger's docbot
comment for more examples.
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 2bf772aa8..53c93cafb 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -15,6 +15,7 @@ local max = math.max
local fiber_clock = fiber.clock
local fiber_self = fiber.self
local decode = msgpack.decode_unchecked
+local decode_map = msgpack.decode_map
local table_new = require('table.new')
local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user,
password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
- local function perform_async_request(buffer, method, on_push,
on_push_ctx,
- ...)
+ local function perform_async_request(buffer, skip_header, method,
on_push,
+ on_push_ctx, ...)
if state ~= 'active' and state ~= 'fetch_schema' then
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user,
password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, ...)
next_request_id = next_id(id)
- -- Request in most cases has maximum 8 members:
- -- method, buffer, id, cond, errno, response, on_push,
- -- on_push_ctx.
- local request = setmetatable(table_new(0, 8), request_mt)
+ -- Request in most cases has maximum 9 members:
+ -- method, buffer, skip_header, id, cond, errno, response,
+ -- on_push, on_push_ctx.
+ local request = setmetatable(table_new(0, 9), request_mt)
request.method = method
request.buffer = buffer
+ request.skip_header = skip_header
request.id = id
request.cond = fiber.cond()
requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user,
password, callback,
-- @retval nil, error Error occured.
-- @retval not nil Response object.
--
- local function perform_request(timeout, buffer, method, on_push,
- on_push_ctx, ...)
+ local function perform_request(timeout, buffer, skip_header, method,
+ on_push, on_push_ctx, ...)
local request, err =
- perform_async_request(buffer, method, on_push, on_push_ctx, ...)
+ perform_async_request(buffer, skip_header, method, on_push,
+ on_push_ctx, ...)
if not request then
return nil, err
end
@@ -554,6 +557,15 @@ local function create_transport(host, port, user,
password, callback,
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
body_len = tonumber(body_len)
+ if request.skip_header then
+ -- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
+ local map_len, key
+ map_len, buffer.rpos = decode_map(buffer.rpos, buffer:size())
+ assert(map_len == 1)
+ key, buffer.rpos = decode(buffer.rpos)
+ assert(key == IPROTO_DATA_KEY)
+ body_len = buffer:size()
+ end
if status == IPROTO_OK_KEY then
request.response = body_len
requests[id] = nil
@@ -1047,17 +1059,18 @@ end
function remote_methods:_request(method, opts, ...)
local transport = self._transport
- local on_push, on_push_ctx, buffer, deadline
+ local on_push, on_push_ctx, buffer, skip_header, deadline
-- Extract options, set defaults, check if the request is
-- async.
if opts then
buffer = opts.buffer
+ skip_header = opts.skip_header
if opts.is_async then
if opts.on_push or opts.on_push_ctx then
error('To handle pushes in an async request use
future:pairs()')
end
- return transport.perform_async_request(buffer, method,
table.insert,
- {}, ...)
+ return transport.perform_async_request(buffer, skip_header,
method,
+ table.insert, {}, ...)
end
if opts.timeout then
-- conn.space:request(, { timeout = timeout })
@@ -1079,8 +1092,9 @@ function remote_methods:_request(method, opts, ...)
transport.wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
- local res, err = transport.perform_request(timeout, buffer, method,
- on_push, on_push_ctx, ...)
+ local res, err = transport.perform_request(timeout, buffer, skip_header,
+ method, on_push, on_push_ctx,
+ ...)
if err then
box.error(err)
end
@@ -1283,10 +1297,10 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
- res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
+ res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
else
assert(self.protocol == 'Lua console')
- res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
+ res, err = pr(timeout, nil, false, 'inject', nil, nil,
line..'$EOF$\n')
end
if err then
box.error(err)
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 2b5a84646..71d0e0a50 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -29,7 +29,7 @@ function x_select(cn, space_id, index_id, iterator, offset,
limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil,
nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false,
'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
---
...
@@ -1573,6 +1573,18 @@ result
---
- {48: [[2]]}
...
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[2]]
+...
-- insert
c.space.test:insert({3}, {buffer = ibuf})
---
@@ -1585,6 +1597,21 @@ result
---
- {48: [[3]]}
...
+-- insert + skip_header
+_ = space:delete({3})
+---
+...
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
---
@@ -1608,6 +1635,29 @@ result
---
- {48: [[3]]}
...
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header =
true})
+---
+- 7
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3]]
+...
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1620,6 +1670,18 @@ result
---
- {48: []}
...
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
---
@@ -1632,6 +1694,18 @@ result
---
- {48: []}
...
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
---
@@ -1644,6 +1718,18 @@ result
---
- {48: [[3], [2], [1, 'hello']]}
...
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf,
skip_header = true})
+---
+- 17
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [[3], [2], [1, 'hello']]
+...
-- select
len = c.space.test:select({}, {buffer = ibuf})
---
@@ -1667,6 +1753,29 @@ result
---
- {48: [[1, 'hello'], [2], [3], [4]]}
...
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+---
+...
+ibuf.rpos + len == ibuf.wpos
+---
+- true
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+ibuf.rpos == ibuf.wpos
+---
+- true
+...
+len
+---
+- 19
+...
+result
+---
+- [[1, 'hello'], [2], [3], [4]]
+...
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
---
@@ -1701,6 +1810,40 @@ result
---
- {48: []}
...
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 8
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- [1, 2, 3]
+...
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
---
@@ -1735,6 +1878,40 @@ result
---
- {48: []}
...
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+---
+- 5
+...
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+---
+...
+result
+---
+- []
+...
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
---
@@ -2571,7 +2748,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
c.state
@@ -3205,7 +3382,7 @@ c = net:connect(box.cfg.listen, {reconnect_after =
0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
@@ -3340,7 +3517,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil,
nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil,
'\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17',
nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject',
nil, nil, '\x80')
---
...
while f:status() ~= 'dead' do fiber.sleep(0.01) end
@@ -3359,7 +3536,7 @@ c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
---
...
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
---
- null
- Peer closed
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index 96d822820..48cc7147d 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -12,7 +12,7 @@ function x_select(cn, space_id, index_id, iterator, offset,
limit, key, opts)
offset, limit, key)
return ret
end
-function x_fatal(cn) cn._transport.perform_request(nil, nil, 'inject', nil,
nil, '\x80') end
+function x_fatal(cn) cn._transport.perform_request(nil, nil, false,
'inject', nil, nil, '\x80') end
test_run:cmd("setopt delimiter ''");
LISTEN = require('uri').parse(box.cfg.listen)
@@ -615,11 +615,22 @@ c.space.test:replace({2}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- replace + skip_header
+c.space.test:replace({2}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- insert
c.space.test:insert({3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- insert + skip_header
+_ = space:delete({3})
+c.space.test:insert({3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- update
c.space.test:update({3}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -628,21 +639,44 @@ c.space.test.index.primary:update({3}, {}, {buffer =
ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- update + skip_header
+c.space.test:update({3}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c.space.test.index.primary:update({3}, {}, {buffer = ibuf, skip_header =
true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- upsert
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- upsert + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- delete
c.space.test:upsert({4}, {}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- delete + skip_header
+c.space.test:upsert({4}, {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- select + skip_header
+c.space.test.index.primary:select({3}, {iterator = 'LE', buffer = ibuf,
skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- select
len = c.space.test:select({}, {buffer = ibuf})
ibuf.rpos + len == ibuf.wpos
@@ -651,6 +685,14 @@ ibuf.rpos == ibuf.wpos
len
result
+-- select + skip_header
+len = c.space.test:select({}, {buffer = ibuf, skip_header = true})
+ibuf.rpos + len == ibuf.wpos
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+ibuf.rpos == ibuf.wpos
+len
+result
+
-- call
c:call("echo", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -662,6 +704,17 @@ c:call("echo", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- call + skip_header
+c:call("echo", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:call("echo", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- eval
c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
@@ -673,6 +726,17 @@ c:eval("echo(...)", nil, {buffer = ibuf})
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
+-- eval + skip_header
+c:eval("echo(...)", {1, 2, 3}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", {}, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+c:eval("echo(...)", nil, {buffer = ibuf, skip_header = true})
+result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
+result
+
-- unsupported methods
c.space.test:get({1}, { buffer = ibuf})
c.space.test.index.primary:min({}, { buffer = ibuf})
@@ -1063,7 +1127,7 @@ c.space.test:delete{1}
--
-- Break a connection to test reconnect_after.
--
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
c.state
while not c:is_connected() do fiber.sleep(0.01) end
c:ping()
@@ -1291,7 +1355,7 @@ finalize_long()
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
-_ = c._transport.perform_request(nil, nil, 'inject', nil, nil, '\x80')
+_ = c._transport.perform_request(nil, nil, false, 'inject', nil, nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
finalize_long()
future:wait_result(100)
@@ -1348,7 +1412,7 @@ c:ping()
-- new attempts to read any data - the connection is closed
-- already.
--
-f = fiber.create(c._transport.perform_request, nil, nil, 'call_17', nil,
nil, 'long', {}) c._transport.perform_request(nil, nil, 'inject', nil, nil,
'\x80')
+f = fiber.create(c._transport.perform_request, nil, nil, false, 'call_17',
nil, nil, 'long', {}) c._transport.perform_request(nil, nil, false, 'inject',
nil, nil, '\x80')
while f:status() ~= 'dead' do fiber.sleep(0.01) end
c:close()
@@ -1358,7 +1422,7 @@ c:close()
--
c = net:connect(box.cfg.listen)
data = msgpack.encode(18400000000000000000)..'aaaaaaa'
-c._transport.perform_request(nil, nil, 'inject', nil, nil, data)
+c._transport.perform_request(nil, nil, false, 'inject', nil, nil, data)
c:close()
test_run:grep_log('default', 'too big packet size in the header') ~= nil