diff --git a/.github/workflows/test_on_push.yaml b/.github/workflows/test_on_push.yaml index ade54e06..883e6375 100644 --- a/.github/workflows/test_on_push.yaml +++ b/.github/workflows/test_on_push.yaml @@ -11,23 +11,17 @@ jobs: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name != github.repository strategy: matrix: - # We need 1.10.6 here to check that module works with - # old Tarantool versions that don't have "tuple-keydef"/"tuple-merger" support. # We test old metrics with Tarantool 2.10 because since Tarantool 2.11.1 # it uses its own metrics package. # We test old metrics with Cartridge 2.7.9 because since 2.8.0 it # requires metrics 1.0.0. - tarantool-version: ["1.10.6", "1.10", "2.2", "2.3", "2.4", "2.5", "2.6", "2.7", "2.8", "2.10", "2.11"] + tarantool-version: ["1.10", "2.8", "2.10", "2.11"] metrics-version: [""] cartridge-version: ["2.8.0"] - remove-merger: [false] include: - tarantool-version: "1.10" metrics-version: "1.0.0" cartridge-version: "2.8.0" - - tarantool-version: "2.7" - remove-merger: true - cartridge-version: "2.8.0" - tarantool-version: "2.10" metrics-version: "0.10.0" cartridge-version: "2.7.9" @@ -85,11 +79,6 @@ jobs: if: matrix.tarantool-version == 'master' run: echo "${GITHUB_WORKSPACE}/bin" >> $GITHUB_PATH - - name: Fix luarocks in Tarantool CE 1.10.6 - if: matrix.tarantool-version == '1.10.6' - run: | - sudo patch -p1 /usr/share/tarantool/luarocks/manif.lua luarocks.patch - - name: Install requirements for community run: | tarantool --version @@ -102,10 +91,6 @@ jobs: if: matrix.metrics-version != '' run: tt rocks install metrics ${{ matrix.metrics-version }} - - name: Remove external merger if needed - if: ${{ matrix.remove-merger }} - run: rm .rocks/lib/tarantool/tuple/merger.so - # This server starts and listen on 8084 port that is used for tests - name: Stop Mono server run: sudo kill -9 $(sudo lsof -t -i tcp:8084) || true diff --git a/CHANGELOG.md b/CHANGELOG.md index 47629ee1..3867831b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## Unreleased + +### Changed +* Drop testing on Tarantool 1.10.6 and Tarantool 2.x older than 2.8 (#365). +* Drop support of Tarantool 1.10.x older than 1.10.8 and + Tarantool 2.x older than 2.5.2 (#365). +* Unlock `tuple-keydef` and `tuple-merger` version dependencies (#365). +* Require `tuple-keydef` and `tuple-merger` for work (#365). + ## [1.4.0] - 16-10-23 ### Added diff --git a/CMakeLists.txt b/CMakeLists.txt index 3485b788..04bdfefa 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -95,13 +95,3 @@ install( DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/cartridge DESTINATION ${TARANTOOL_INSTALL_LUADIR} ) - -# Don't include to rockspec as some Tarantool versions (e.g. 2.2 and 2.3) -# don't have symbols required by "tuple-merger" and "tuple-keydef" modules. -execute_process( - COMMAND bash "-c" "tarantoolctl rocks install tuple-keydef 0.0.2" -) - -execute_process( - COMMAND bash "-c" "tarantoolctl rocks install tuple-merger 0.0.2" -) diff --git a/crud-scm-1.rockspec b/crud-scm-1.rockspec index 54b04fdf..87272455 100644 --- a/crud-scm-1.rockspec +++ b/crud-scm-1.rockspec @@ -10,6 +10,8 @@ dependencies = { 'checks >= 3.1.0-1', 'errors >= 2.2.1-1', 'vshard >= 0.1.18-1', + 'tuple-merger >= 0.0.2', + 'tuple-keydef >= 0.0.2', } build = { diff --git a/crud/common/stash.lua b/crud/common/stash.lua index bfc20639..da8bfe48 100644 --- a/crud/common/stash.lua +++ b/crud/common/stash.lua @@ -20,16 +20,12 @@ local stash = {} -- @tfield string stats_metrics_registry -- Stash for metrics rocks statistics registry. -- --- @tfield string select_module_compat_info --- Stash for select compatability version registry. --- stash.name = { cfg = '__crud_cfg', stats_internal = '__crud_stats_internal', stats_local_registry = '__crud_stats_local_registry', stats_metrics_registry = '__crud_stats_metrics_registry', ddl_triggers = '__crud_ddl_spaces_triggers', - select_module_compat_info = '__select_module_compat_info', storage_readview = '__crud_storage_readview', } diff --git a/crud/readview.lua b/crud/readview.lua index 19742175..a6202140 100644 --- a/crud/readview.lua +++ b/crud/readview.lua @@ -15,9 +15,6 @@ local stats = require('crud.stats') local ReadviewError = errors.new_class('ReadviewError', {capture_stack = false}) -local has_merger = (utils.tarantool_supports_external_merger() and - package.search('tuple.merger')) or utils.tarantool_has_builtin_merger() - local OPEN_FUNC_NAME = 'readview_open_on_storage' local CRUD_OPEN_FUNC_NAME = utils.get_storage_call(OPEN_FUNC_NAME) local SELECT_FUNC_NAME = 'select_readview_on_storage' @@ -25,13 +22,13 @@ local CLOSE_FUNC_NAME = 'readview_close_on_storage' local CRUD_CLOSE_FUNC_NAME = utils.get_storage_call(CLOSE_FUNC_NAME) if (not utils.tarantool_version_at_least(2, 11, 0)) -or (tarantool.package ~= 'Tarantool Enterprise') or (not has_merger) then +or (tarantool.package ~= 'Tarantool Enterprise') then return { new = function() return nil, ReadviewError:new("Tarantool does not support readview") end, init = function() return nil end} end -local select = require('crud.select.compat.select') +local select = require('crud.select.module') local readview = {} @@ -188,13 +185,6 @@ local function select_readview_on_storage(space_name, index_id, conditions, opts local result = {cursor, filtered_tuples} - local select_module_compat_info = stash.get(stash.name.select_module_compat_info) - if not select_module_compat_info.has_merger then - if opts.fetch_latest_metadata then - result[3] = cursor.storage_info.replica_schema_version - end - end - return unpack(result) end diff --git a/crud/select.lua b/crud/select.lua index e879cc0a..fa285aeb 100644 --- a/crud/select.lua +++ b/crud/select.lua @@ -1,6 +1,5 @@ local errors = require('errors') -local stash = require('crud.common.stash') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local select_executor = require('crud.select.executor') @@ -10,20 +9,9 @@ local schema = require('crud.common.schema') local SelectError = errors.new_class('SelectError') -local select_module - local SELECT_FUNC_NAME = 'select_on_storage' -local select_module_compat_info = stash.get(stash.name.select_module_compat_info) -local has_merger = (utils.tarantool_supports_external_merger() and - package.search('tuple.merger')) or utils.tarantool_has_builtin_merger() -if has_merger then - select_module = require('crud.select.compat.select') - select_module_compat_info.has_merger = true -else - select_module = require('crud.select.compat.select_old') - select_module_compat_info.has_merger = false -end +local select_module = require('crud.select.module') function checkers.vshard_call_mode(p) return p == 'write' or p == 'read' @@ -114,14 +102,6 @@ local function select_on_storage(space_name, index_id, conditions, opts) local filtered_tuples = schema.filter_tuples_fields(resp.tuples, opts.field_names) local result = {cursor, filtered_tuples} - - local select_module_compat_info = stash.get(stash.name.select_module_compat_info) - if not select_module_compat_info.has_merger then - if opts.fetch_latest_metadata then - result[3] = cursor.storage_info.replica_schema_version - end - end - return unpack(result) end diff --git a/crud/select/compat/common.lua b/crud/select/compat/common.lua deleted file mode 100644 index 6d98e490..00000000 --- a/crud/select/compat/common.lua +++ /dev/null @@ -1,30 +0,0 @@ -local ratelimit = require('crud.ratelimit') -local utils = require('crud.common.utils') -local check_select_safety_rl = ratelimit.new() - -local common = {} - -common.SELECT_FUNC_NAME = utils.get_storage_call('select_on_storage') -common.READVIEW_SELECT_FUNC_NAME = utils.get_storage_call('select_readview_on_storage') -common.DEFAULT_BATCH_SIZE = 100 - -common.check_select_safety = function(space_name, plan, opts) - if opts.fullscan == true then - return - end - - if opts.first ~= nil and math.abs(opts.first) <= 1000 then - return - end - - local iter = plan.tarantool_iter - if iter == box.index.EQ or iter == box.index.REQ then - return - end - - local rl = check_select_safety_rl - local traceback = debug.traceback() - rl:log_crit("Potentially long select from space '%s'\n %s", space_name, traceback) -end - -return common diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua deleted file mode 100644 index 930ba454..00000000 --- a/crud/select/compat/select_old.lua +++ /dev/null @@ -1,450 +0,0 @@ -local checks = require('checks') -local errors = require('errors') -local fun = require('fun') - -local call = require('crud.common.call') -local const = require('crud.common.const') -local utils = require('crud.common.utils') -local sharding = require('crud.common.sharding') -local dev_checks = require('crud.common.dev_checks') -local schema = require('crud.common.schema') -local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') -local stats = require('crud.stats') - -local compare_conditions = require('crud.compare.conditions') -local select_plan = require('crud.compare.plan') -local select_comparators = require('crud.compare.comparators') -local common = require('crud.select.compat.common') - -local Iterator = require('crud.select.iterator') - -local SelectError = errors.new_class('SelectError') - -local select_module = {} - -local function select_iteration(space_name, plan, opts) - dev_checks('string', '?table', { - after_tuple = '?table', - replicasets = 'table', - limit = 'number', - call_opts = 'table', - sharding_hash = 'table', - vshard_router = 'table', - yield_every = 'number', - }) - - local call_opts = opts.call_opts - - -- call select on storages - local storage_select_opts = { - scan_value = plan.scan_value, - after_tuple = opts.after_tuple, - tarantool_iter = plan.tarantool_iter, - limit = opts.limit, - scan_condition_num = plan.scan_condition_num, - field_names = plan.field_names, - sharding_func_hash = opts.sharding_hash.sharding_func_hash, - sharding_key_hash = opts.sharding_hash.sharding_key_hash, - skip_sharding_hash_check = opts.sharding_hash.skip_sharding_hash_check, - yield_every = opts.yield_every, - fetch_latest_metadata = true, - } - - local storage_select_args = { - space_name, plan.index_id, plan.conditions, storage_select_opts, - } - - local results, err, storages_info = call.map(opts.vshard_router, common.SELECT_FUNC_NAME, storage_select_args, { - replicasets = opts.replicasets, - timeout = call_opts.timeout, - mode = call_opts.mode or 'read', - prefer_replica = call_opts.prefer_replica, - balance = call_opts.balance, - }) - - if err ~= nil then - return nil, err, storages_info - end - - local tuples = {} - for replicaset_uuid, replicaset_results in pairs(results) do - -- Stats extracted with callback here and not passed - -- outside to wrapper because fetch for pairs can be - -- called even after pairs() return from generators. - local cursor = replicaset_results[1] - if cursor.stats ~= nil then - stats.update_fetch_stats(cursor.stats, space_name) - end - - tuples[replicaset_uuid] = replicaset_results[2] - end - - return tuples, nil, storages_info -end - --- returns result, err, need_reload --- need_reload indicates if reloading schema could help --- see crud.common.schema.wrap_func_reload() -local function build_select_iterator(vshard_router, space_name, user_conditions, opts) - dev_checks('table', 'string', '?table', { - after = '?table', - first = '?number', - batch_size = '?number', - bucket_id = '?number|cdata', - force_map_call = '?boolean', - field_names = '?table', - yield_every = '?number', - call_opts = 'table', - }) - - opts = opts or {} - - if opts.batch_size ~= nil and opts.batch_size < 1 then - return nil, SelectError:new("batch_size should be > 0") - end - - if opts.yield_every ~= nil and opts.yield_every < 1 then - return nil, SelectError:new("yield_every should be > 0") - end - - local yield_every = opts.yield_every or const.DEFAULT_YIELD_EVERY - - local batch_size = opts.batch_size or common.DEFAULT_BATCH_SIZE - - -- check conditions - local conditions, err = compare_conditions.parse(user_conditions) - if err ~= nil then - return nil, SelectError:new("Failed to parse conditions: %s", err) - end - - local space, err, netbox_schema_version = utils.get_space(space_name, vshard_router) - if err ~= nil then - return nil, SelectError:new("An error occurred during the operation: %s", err), const.NEED_SCHEMA_RELOAD - end - if space == nil then - return nil, SelectError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD - end - - local sharding_hash = {} - local sharding_key_as_index_obj = nil - -- We don't need sharding info if bucket_id specified. - if opts.bucket_id == nil then - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) - if err ~= nil then - return nil, err - end - - sharding_key_as_index_obj = sharding_key_data.value - sharding_hash.sharding_key_hash = sharding_key_data.hash - else - sharding_hash.skip_sharding_hash_check = true - end - - -- plan select - local plan, err = select_plan.new(space, conditions, { - first = opts.first, - after_tuple = opts.after, - field_names = opts.field_names, - force_map_call = opts.force_map_call, - sharding_key_as_index_obj = sharding_key_as_index_obj, - bucket_id = opts.bucket_id, - }) - - if err ~= nil then - return nil, SelectError:new("Failed to plan select: %s", err), const.NEED_SCHEMA_RELOAD - end - - -- set replicasets to select from - local replicasets_to_select, err = vshard_router:routeall() - if err ~= nil then - return nil, SelectError:new("Failed to get router replicasets: %s", err) - end - - -- See explanation of this logic in - -- crud/select/compat/select.lua. - local perform_map_reduce = opts.force_map_call == true or - (opts.bucket_id == nil and plan.sharding_key == nil) - if not perform_map_reduce then - local bucket_id_data, err = sharding.key_get_bucket_id(vshard_router, space_name, - plan.sharding_key, opts.bucket_id) - if err ~= nil then - return nil, err - end - - assert(bucket_id_data.bucket_id ~= nil) - - local err - replicasets_to_select, err = sharding.get_replicasets_by_bucket_id(vshard_router, bucket_id_data.bucket_id) - if err ~= nil then - return nil, err, const.NEED_SCHEMA_RELOAD - end - - sharding_hash.sharding_func_hash = bucket_id_data.sharding_func_hash - else - stats.update_map_reduces(space_name) - sharding_hash.skip_sharding_hash_check = true - end - - -- generate tuples comparator - local scan_index = space.index[plan.index_id] - local primary_index = space.index[0] - local cmp_key_parts = utils.merge_primary_key_parts(scan_index.parts, primary_index.parts) - local cmp_operator = select_comparators.get_cmp_operator(plan.tarantool_iter) - - -- generator of tuples comparator needs field_names and space_format - -- to update fieldno in each part in cmp_key_parts because storage result contains - -- fields in order specified by field_names - local tuples_comparator = select_comparators.gen_tuples_comparator( - cmp_operator, cmp_key_parts, plan.field_names, space:format() - ) - - local function comparator(node1, node2) - return not tuples_comparator(node1.obj, node2.obj) - end - - local iter = Iterator.new({ - space_name = space_name, - space = space, - netbox_schema_version = netbox_schema_version, - iteration_func = select_iteration, - comparator = comparator, - - plan = plan, - - batch_size = batch_size, - replicasets = replicasets_to_select, - - call_opts = opts.call_opts, - sharding_hash = sharding_hash, - vshard_router = vshard_router, - yield_every = yield_every, - }) - - return iter -end - -function select_module.pairs(space_name, user_conditions, opts) - checks('string', '?table', { - after = '?table', - first = '?number', - batch_size = '?number', - use_tomap = '?boolean', - bucket_id = '?number|cdata', - force_map_call = '?boolean', - fields = '?table', - fetch_latest_metadata = '?boolean', - - mode = '?vshard_call_mode', - prefer_replica = '?boolean', - balance = '?boolean', - timeout = '?number', - - vshard_router = '?string|table', - - yield_every = '?number', - }) - - opts = opts or {} - - if opts.first ~= nil and opts.first < 0 then - error(string.format("Negative first isn't allowed for pairs")) - end - - local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) - if err ~= nil then - error(err) - end - - local iterator_opts = { - after = opts.after, - first = opts.first, - batch_size = opts.batch_size, - bucket_id = opts.bucket_id, - force_map_call = opts.force_map_call, - field_names = opts.fields, - yield_every = opts.yield_every, - call_opts = { - mode = opts.mode, - prefer_replica = opts.prefer_replica, - balance = opts.balance, - timeout = opts.timeout, - fetch_latest_metadata = opts.fetch_latest_metadata, - }, - } - - local iter, err = schema.wrap_func_reload( - vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts - ) - - if err ~= nil then - error(string.format("Failed to generate iterator: %s", err)) - end - - local tuples_limit = opts.first - if tuples_limit ~= nil then - tuples_limit = math.abs(tuples_limit) - end - - -- filter space format by plan.field_names (user defined fields + primary key + scan key) - -- to pass it user as metadata - local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) - if err ~= nil then - return nil, err - end - - local gen = function(_, iter) - local tuple, err = iter:get() - if err ~= nil then - if sharding.result_needs_sharding_reload(err) then - sharding_metadata_module.reload_sharding_cache(vshard_router, space_name) - end - - error(string.format("Failed to get next object: %s", err)) - end - - if tuple == nil then - return nil - end - - if opts.fetch_latest_metadata then - -- This option is temporary and is related to [1], [2]. - -- [1] https://github.com/tarantool/crud/issues/236 - -- [2] https://github.com/tarantool/crud/issues/361 - iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts, - iter.storages_info, iter) - filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) - if err ~= nil then - return nil, err - end - end - - local result = tuple - if opts.use_tomap == true then - result, err = utils.unflatten(tuple, filtered_space_format) - if err ~= nil then - error(string.format("Failed to unflatten next object: %s", err)) - end - end - - return iter, result - end - - local gen, param, state = fun.iter(gen, nil, iter) - - if tuples_limit ~= nil then - gen, param, state = gen:take_n(tuples_limit) - end - - return gen, param, state -end - -local function select_module_call_xc(vshard_router, space_name, user_conditions, opts) - dev_checks('table', 'string', '?table', 'table') - - if opts.first ~= nil and opts.first < 0 then - if opts.after == nil then - return nil, SelectError:new("Negative first should be specified only with after option") - end - end - - local iterator_opts = { - after = opts.after, - first = opts.first, - batch_size = opts.batch_size, - bucket_id = opts.bucket_id, - force_map_call = opts.force_map_call, - field_names = opts.fields, - yield_every = opts.yield_every, - call_opts = { - mode = opts.mode, - prefer_replica = opts.prefer_replica, - balance = opts.balance, - timeout = opts.timeout, - fetch_latest_metadata = opts.fetch_latest_metadata, - }, - } - - local iter, err = schema.wrap_func_reload( - vshard_router, build_select_iterator, space_name, user_conditions, iterator_opts - ) - if err ~= nil then - return nil, err - end - common.check_select_safety(space_name, iter.plan, opts) - - local tuples = {} - - while iter:has_next() do - local tuple, err = iter:get() - if err ~= nil then - if sharding.result_needs_sharding_reload(err) then - return nil, err, const.NEED_SHARDING_RELOAD - end - - return nil, SelectError:new("Failed to get next object: %s", err) - end - - if tuple == nil then - break - end - - table.insert(tuples, tuple) - end - - if opts.first ~= nil and opts.first < 0 then - utils.reverse_inplace(tuples) - end - - if opts.fetch_latest_metadata then - -- This option is temporary and is related to [1], [2]. - -- [1] https://github.com/tarantool/crud/issues/236 - -- [2] https://github.com/tarantool/crud/issues/361 - iter = utils.fetch_latest_metadata_for_select(space_name, vshard_router, opts, - iter.storages_info, iter) - end - - -- filter space format by plan.field_names (user defined fields + primary key + scan key) - -- to pass it user as metadata - local filtered_space_format, err = utils.get_fields_format(iter.space:format(), iter.plan.field_names) - if err ~= nil then - return nil, err - end - - return { - metadata = table.copy(filtered_space_format), - rows = tuples, - } -end - -function select_module.call(space_name, user_conditions, opts) - checks('string', '?table', { - after = '?table', - first = '?number', - batch_size = '?number', - bucket_id = '?number|cdata', - force_map_call = '?boolean', - fields = '?table', - fullscan = '?boolean', - fetch_latest_metadata = '?boolean', - - mode = '?vshard_call_mode', - prefer_replica = '?boolean', - balance = '?boolean', - timeout = '?number', - - vshard_router = '?string|table', - - yield_every = '?number', - }) - - opts = opts or {} - - local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router) - if err ~= nil then - return nil, SelectError:new(err) - end - - return sharding.wrap_method(vshard_router, select_module_call_xc, space_name, user_conditions, opts) -end - -return select_module diff --git a/crud/select/executor.lua b/crud/select/executor.lua index d432c62e..de92ff54 100644 --- a/crud/select/executor.lua +++ b/crud/select/executor.lua @@ -2,16 +2,10 @@ local errors = require('errors') local fiber = require('fiber') local fun = require('fun') +local keydef_lib = require('tuple.keydef') + local dev_checks = require('crud.common.dev_checks') local select_comparators = require('crud.compare.comparators') -local compat = require('crud.common.compat') -local has_keydef = compat.exists('tuple.keydef', 'key_def') - -local keydef_lib -if has_keydef then - keydef_lib = compat.require('tuple.keydef', 'key_def') -end - local utils = require('crud.common.utils') local ExecuteSelectError = errors.new_class('ExecuteSelectError') @@ -46,31 +40,15 @@ local function scroll_to_after_tuple(gen, space, scan_index, tarantool_iter, aft end end -local generate_value - -if has_keydef then - generate_value = function(after_tuple, scan_value, index_parts, tarantool_iter) - local key_def = keydef_lib.new(index_parts) - if #scan_value == 0 and after_tuple ~= nil then - return key_def:extract_key(after_tuple) - end - local cmp_operator = select_comparators.get_cmp_operator(tarantool_iter) - local cmp = key_def:compare_with_key(after_tuple, scan_value) - if (cmp_operator == '<' and cmp < 0) or (cmp_operator == '>' and cmp > 0) then - return key_def:extract_key(after_tuple) - end +local function generate_value(after_tuple, scan_value, index_parts, tarantool_iter) + local key_def = keydef_lib.new(index_parts) + if #scan_value == 0 and after_tuple ~= nil then + return key_def:extract_key(after_tuple) end -else - generate_value = function(after_tuple, scan_value, index_parts, tarantool_iter) - local after_tuple_key = utils.extract_key(after_tuple, index_parts) - if #scan_value == 0 and after_tuple ~= nil then - return after_tuple_key - end - local cmp_operator = select_comparators.get_cmp_operator(tarantool_iter) - local scan_comparator = select_comparators.gen_tuples_comparator(cmp_operator, index_parts) - if scan_comparator(after_tuple_key, scan_value) then - return after_tuple_key - end + local cmp_operator = select_comparators.get_cmp_operator(tarantool_iter) + local cmp = key_def:compare_with_key(after_tuple, scan_value) + if (cmp_operator == '<' and cmp < 0) or (cmp_operator == '>' and cmp > 0) then + return key_def:extract_key(after_tuple) end end @@ -106,21 +84,11 @@ function executor.execute(space, index, filter_func, opts) end local is_eq = iter == box.index.EQ - local is_after_bigger - if has_keydef then - local key_def = keydef_lib.new(parts) - local cmp = key_def:compare_with_key(opts.after_tuple, value) - is_after_bigger = (is_eq and cmp > 0) or (not is_eq and cmp < 0) - else - local comparator - if is_eq then - comparator = select_comparators.gen_func('<=', parts) - else - comparator = select_comparators.gen_func('>=', parts) - end - local after_key = utils.extract_key(opts.after_tuple, parts) - is_after_bigger = not comparator(after_key, value) - end + + local key_def = keydef_lib.new(parts) + local cmp = key_def:compare_with_key(opts.after_tuple, value) + local is_after_bigger = (is_eq and cmp > 0) or (not is_eq and cmp < 0) + if is_after_bigger then -- it makes no sence to continue return resp diff --git a/crud/select/iterator.lua b/crud/select/iterator.lua deleted file mode 100644 index 3aeee9c8..00000000 --- a/crud/select/iterator.lua +++ /dev/null @@ -1,227 +0,0 @@ -local errors = require('errors') -local fiber = require('fiber') - -local dev_checks = require('crud.common.dev_checks') -local sharding = require('crud.common.sharding') -local utils = require('crud.common.utils') - -local UpdateTuplesError = errors.new_class('UpdateTuplesError') -local GetTupleError = errors.new_class('GetTupleError') - -local Heap = require('vshard.heap') - -local Iterator = {} -Iterator.__index = Iterator - -function Iterator.new(opts) - dev_checks({ - space_name = 'string', - space = 'table', - netbox_schema_version = '?number', - comparator = 'function', - iteration_func = 'function', - - plan = 'table', - field_names = '?table', - - batch_size = 'number', - replicasets = 'table', - - call_opts = 'table', - sharding_hash = 'table', - - vshard_router = 'table', - yield_every = '?number', - }) - - local iter = { - space_name = opts.space_name, - space = opts.space, - netbox_schema_version = opts.netbox_schema_version, - storages_info = {}, - iteration_func = opts.iteration_func, - - plan = opts.plan, - field_names = opts.field_names, - - call_opts = opts.call_opts, - - replicasets = table.copy(opts.replicasets), - replicasets_count = utils.table_count(opts.replicasets), - empty_replicasets = {}, - empty_replicasets_count = 0, - - batch_size = opts.batch_size, - - tuples_by_replicasets = {}, - next_tuple_indexes = {}, - - heap = Heap.new(opts.comparator), - tuples_count = 0, - - update_tuples_channel = fiber.channel(1), - wait_for_update = false, - - sharding_hash = opts.sharding_hash, - - vshard_router = opts.vshard_router, - yield_every = opts.yield_every, - } - - setmetatable(iter, Iterator) - - iter:_update_replicasets_tuples(iter.plan.after_tuple) - - return iter -end - -function Iterator:has_next() - if self.heap:count() == 0 and self.empty_replicasets_count >= self.replicasets_count then - return false - end - - if self.plan.total_tuples_count ~= nil and self.tuples_count >= self.plan.total_tuples_count then - return false - end - - return true -end - -local function get_next_replicaset_tuple(iter, replicaset_uuid) - local replicaset_tuples = iter.tuples_by_replicasets[replicaset_uuid] - local next_tuple = replicaset_tuples[iter.next_tuple_indexes[replicaset_uuid]] - - iter.next_tuple_indexes[replicaset_uuid] = iter.next_tuple_indexes[replicaset_uuid] + 1 - - return next_tuple -end - -local function update_replicasets_tuples(iter, after_tuple, replicaset_uuid) - local replicasets = {} - if replicaset_uuid == nil then - replicasets = iter.replicasets - else - replicasets[replicaset_uuid] = iter.replicasets[replicaset_uuid] - end - - local limit_per_storage_call = iter.batch_size - if iter.total_tuples_count ~= nil then - limit_per_storage_call = math.min(iter.batch_size, iter.total_tuples_count - iter.tuples_count) - end - - local results_map, err, storages_info = iter.iteration_func(iter.space_name, iter.plan, { - after_tuple = after_tuple, - replicasets = replicasets, - limit = limit_per_storage_call, - field_names = iter.field_names, - call_opts = iter.call_opts, - sharding_hash = iter.sharding_hash, - vshard_router = iter.vshard_router, - yield_every = iter.yield_every, - }) - iter.storages_info = storages_info - if err ~= nil then - if sharding.result_needs_sharding_reload(err) then - return false, err - end - - return false, UpdateTuplesError:new('Failed to select tuples from storages: %s', err) - end - - for replicaset_uuid, tuples in pairs(results_map) do - if #tuples == 0 or #tuples < limit_per_storage_call then - iter.empty_replicasets[replicaset_uuid] = true - iter.empty_replicasets_count = iter.empty_replicasets_count + 1 - end - - iter.tuples_by_replicasets[replicaset_uuid] = tuples - iter.next_tuple_indexes[replicaset_uuid] = 1 - - local next_tuple = get_next_replicaset_tuple(iter, replicaset_uuid) - - if next_tuple ~= nil then - iter.heap:push({ - obj = next_tuple, - meta = {replicaset_uuid = replicaset_uuid}, - }) - end - end - - return true -end - -function Iterator:_update_replicasets_tuples(after_tuple, replicaset_uuid) - self.wait_for_update = true - - local function _update_replicasets_tuples(channel, iter, after_tuple, replicaset_uuid) - local ok, err = update_replicasets_tuples(iter, after_tuple, replicaset_uuid) - channel:put({ - ok = ok, - err = err, - }) - end - - fiber.create(_update_replicasets_tuples, self.update_tuples_channel, self, after_tuple, replicaset_uuid) -end - -function Iterator:get() - - if self.wait_for_update then - -- wait for _update_replicasets_tuples - - self.wait_for_update = false - - local res = self.update_tuples_channel:get() - - if res == nil then - if self.update_tuples_channel:is_closed() then - return nil, GetTupleError:new("Channel is closed") - end - - return nil, GetTupleError:new("Timeout was reached") - end - - if not res.ok then - if sharding.result_needs_sharding_reload(res.err) then - return false, res.err - end - - return nil, GetTupleError:new("Failed to get tuples from storages: %s", res.err) - end - end - - local node = self.heap:pop() - - if node == nil then - return nil - end - - local tuple = node.obj - local last_tuple_replicaset_uuid = node.meta.replicaset_uuid - - self.tuples_count = self.tuples_count + 1 - - if self.plan.total_tuples_count == nil or self.tuples_count < self.plan.total_tuples_count then - local replicaset_tuples_count = #self.tuples_by_replicasets[last_tuple_replicaset_uuid] - local next_tuple_index = self.next_tuple_indexes[last_tuple_replicaset_uuid] - - if next_tuple_index <= replicaset_tuples_count then - local next_tuple = get_next_replicaset_tuple(self, last_tuple_replicaset_uuid) - - self.heap:push({ - obj = next_tuple, - meta = {replicaset_uuid = last_tuple_replicaset_uuid}, - }) - - elseif not self.empty_replicasets[last_tuple_replicaset_uuid] then - self:_update_replicasets_tuples( - tuple, - last_tuple_replicaset_uuid - ) - end - end - - return tuple -end - -return Iterator diff --git a/crud/select/merger.lua b/crud/select/merger.lua index b56e993e..e7a53102 100644 --- a/crud/select/merger.lua +++ b/crud/select/merger.lua @@ -7,8 +7,7 @@ local fiber = require('fiber') local sharding = require('crud.common.sharding') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') -local compat = require('crud.common.compat') -local merger_lib = compat.require('tuple.merger', 'merger') +local merger_lib = require('tuple.merger') local Keydef = require('crud.compare.keydef') local stats = require('crud.stats') diff --git a/crud/select/compat/select.lua b/crud/select/module.lua similarity index 93% rename from crud/select/compat/select.lua rename to crud/select/module.lua index cc309b70..69320bd9 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/module.lua @@ -6,10 +6,10 @@ local const = require('crud.common.const') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local dev_checks = require('crud.common.dev_checks') -local common = require('crud.select.compat.common') local schema = require('crud.common.schema') local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local stats = require('crud.stats') +local ratelimit = require('crud.ratelimit') local compare_conditions = require('crud.compare.conditions') local select_plan = require('crud.compare.plan') @@ -20,6 +20,31 @@ local SelectError = errors.new_class('SelectError') local select_module = {} +local check_select_safety_rl = ratelimit.new() + +local SELECT_FUNC_NAME = utils.get_storage_call('select_on_storage') +local READVIEW_SELECT_FUNC_NAME = utils.get_storage_call('select_readview_on_storage') +local DEFAULT_BATCH_SIZE = 100 + +local function check_select_safety(space_name, plan, opts) + if opts.fullscan == true then + return + end + + if opts.first ~= nil and math.abs(opts.first) <= 1000 then + return + end + + local iter = plan.tarantool_iter + if iter == box.index.EQ or iter == box.index.REQ then + return + end + + local rl = check_select_safety_rl + local traceback = debug.traceback() + rl:log_crit("Potentially long select from space '%s'\n %s", space_name, traceback) +end + local function build_select_iterator(vshard_router, space_name, user_conditions, opts) dev_checks('table', 'string', '?table', { after = '?table|cdata', @@ -153,10 +178,10 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, -- If opts.batch_size is missed we should specify it to min(tuples_limit, DEFAULT_BATCH_SIZE) local batch_size if opts.batch_size == nil then - if tuples_limit ~= nil and tuples_limit < common.DEFAULT_BATCH_SIZE then + if tuples_limit ~= nil and tuples_limit < DEFAULT_BATCH_SIZE then batch_size = tuples_limit else - batch_size = common.DEFAULT_BATCH_SIZE + batch_size = DEFAULT_BATCH_SIZE end else batch_size = opts.batch_size @@ -179,13 +204,13 @@ local function build_select_iterator(vshard_router, space_name, user_conditions, if opts.readview then merger = Merger.new_readview(vshard_router, replicasets_to_select, opts.readview_uuid, - space, plan.index_id, common.READVIEW_SELECT_FUNC_NAME, + space, plan.index_id, READVIEW_SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} ) else merger = Merger.new(vshard_router, replicasets_to_select, space, plan.index_id, - common.SELECT_FUNC_NAME, + SELECT_FUNC_NAME, {space_name, plan.index_id, plan.conditions, select_opts}, {tarantool_iter = plan.tarantool_iter, field_names = plan.field_names, call_opts = opts.call_opts} ) @@ -389,7 +414,7 @@ local function select_module_call_xc(vshard_router, space_name, user_conditions, if err ~= nil then return nil, err end - common.check_select_safety(space_name, iter.plan, opts) + check_select_safety(space_name, iter.plan, opts) local tuples = {}