-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathselect.lua
112 lines (93 loc) · 3.61 KB
/
select.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
local errors = require('errors')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local select_executor = require('crud.select.executor')
local select_filters = require('crud.compare.filters')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local SelectError = errors.new_class('SelectError')
local SELECT_FUNC_NAME = 'select_on_storage'
local select_module = require('crud.select.module')
function checkers.vshard_call_mode(p)
return p == 'write' or p == 'read'
end
local function select_on_storage(space_name, index_id, conditions, opts)
dev_checks('string', 'number', '?table', {
scan_value = 'table',
after_tuple = '?table',
tarantool_iter = 'number',
limit = 'number',
scan_condition_num = '?number',
field_names = '?table',
sharding_key_hash = '?number',
sharding_func_hash = '?number',
skip_sharding_hash_check = '?boolean',
yield_every = '?number',
fetch_latest_metadata = '?boolean',
})
local cursor = {}
if opts.fetch_latest_metadata then
local replica_schema_version
if box.info.schema_version ~= nil then
replica_schema_version = box.info.schema_version
else
replica_schema_version = box.internal.schema_version()
end
cursor.storage_info = {
replica_uuid = box.info().uuid,
replica_schema_version = replica_schema_version,
}
end
local space = box.space[space_name]
if space == nil then
return cursor, SelectError:new("Space %q doesn't exist", space_name)
end
local index = space.index[index_id]
if index == nil then
return cursor, SelectError:new("Index with ID %s doesn't exist", index_id)
end
local _, err = sharding.check_sharding_hash(space_name,
opts.sharding_func_hash,
opts.sharding_key_hash,
opts.skip_sharding_hash_check)
if err ~= nil then
return nil, err
end
local filter_func, err = select_filters.gen_func(space, conditions, {
tarantool_iter = opts.tarantool_iter,
scan_condition_num = opts.scan_condition_num,
})
if err ~= nil then
return cursor, SelectError:new("Failed to generate tuples filter: %s", err)
end
-- execute select
local resp, err = select_executor.execute(space, index, filter_func, {
scan_value = opts.scan_value,
after_tuple = opts.after_tuple,
tarantool_iter = opts.tarantool_iter,
limit = opts.limit,
yield_every = opts.yield_every,
})
if err ~= nil then
return cursor, SelectError:new("Failed to execute select: %s", err)
end
if resp.tuples_fetched < opts.limit or opts.limit == 0 then
cursor.is_end = true
else
local last_tuple = resp.tuples[#resp.tuples]
cursor.after_tuple = last_tuple:totable()
end
cursor.stats = {
tuples_lookup = resp.tuples_lookup,
tuples_fetched = resp.tuples_fetched,
}
-- getting tuples with user defined fields (if `fields` option is specified)
-- and fields that are needed for comparison on router (primary key + scan key)
local filtered_tuples = schema.filter_tuples_fields(resp.tuples, opts.field_names)
local result = {cursor, filtered_tuples}
return unpack(result)
end
function select_module.init(user)
utils.init_storage_call(user, SELECT_FUNC_NAME, select_on_storage)
end
return select_module