diff --git a/cpp/arcticdb/version/local_versioned_engine.cpp b/cpp/arcticdb/version/local_versioned_engine.cpp index ffffcf71df..da15fdf6d3 100644 --- a/cpp/arcticdb/version/local_versioned_engine.cpp +++ b/cpp/arcticdb/version/local_versioned_engine.cpp @@ -615,11 +615,14 @@ VersionedItem LocalVersionedEngine::update_internal( return VersionedItem(*update_info.previous_index_key_); } + auto write_options = get_write_options(); + auto de_dup_map = get_de_dup_map(stream_id, update_info.previous_index_key_, write_options); auto versioned_item = update_impl(store(), update_info, query, frame, - get_write_options(), + std::move(de_dup_map), + std::move(write_options), dynamic_schema, cfg().write_options().empty_types()); write_version_and_prune_previous( @@ -1504,43 +1507,45 @@ std::vector> LocalVersionedEngine::batch_ for (const auto&& [idx, stream_update_info_fut] : enumerate(stream_update_info_futures)) { update_versions_futs.push_back( std::move(stream_update_info_fut) - .thenValue([this, frame = std::move(frames[idx]), stream_id = stream_ids[idx], update_query = update_queries[idx], upsert](auto&& update_info) { - auto index_key_fut = folly::Future::makeEmpty(); - auto write_options = get_write_options(); - if (update_info.previous_index_key_.has_value()) { - const bool dynamic_schema = cfg().write_options().dynamic_schema(); - const bool empty_types = cfg().write_options().empty_types(); - index_key_fut = async_update_impl( - store(), - update_info, - update_query, - std::move(frame), - std::move(write_options), - dynamic_schema, - empty_types); - } else { - missing_data::check( - upsert, - "Cannot update non-existent symbol {}." - "Using \"upsert=True\" will create create the symbol instead of throwing this exception.", - stream_id); - index_key_fut = async_write_dataframe_impl( - store(), - 0, - std::move(frame), - std::move(write_options), - std::make_shared(), - false, - true); - } - return std::move(index_key_fut).thenValueInline([update_info = std::move(update_info)](auto&& index_key) mutable { - return IndexKeyAndUpdateInfo{std::move(index_key), std::move(update_info)}; - }); - }) - .thenValue([this, prune_previous_versions](auto&& index_key_and_update_info) { - auto&& [index_key, update_info] = index_key_and_update_info; - return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions); - }) + .thenValue([this, frame = std::move(frames[idx]), stream_id = stream_ids[idx], update_query = update_queries[idx], upsert](auto&& update_info) { + auto index_key_fut = folly::Future::makeEmpty(); + auto write_options = get_write_options(); + if (update_info.previous_index_key_.has_value()) { + const bool dynamic_schema = cfg().write_options().dynamic_schema(); + const bool empty_types = cfg().write_options().empty_types(); + auto de_dup_map = get_de_dup_map(stream_id, update_info.previous_index_key_, get_write_options()); + index_key_fut = async_update_impl( + store(), + update_info, + update_query, + std::move(frame), + std::move(de_dup_map), + std::move(write_options), + dynamic_schema, + empty_types); + } else { + missing_data::check( + upsert, + "Cannot update non-existent symbol {}." + "Using \"upsert=True\" will create create the symbol instead of throwing this exception.", + stream_id); + index_key_fut = async_write_dataframe_impl( + store(), + 0, + std::move(frame), + std::move(write_options), + std::make_shared(), + false, + true); + } + return std::move(index_key_fut).thenValueInline([update_info = std::move(update_info)](auto&& index_key) mutable { + return IndexKeyAndUpdateInfo{std::move(index_key), std::move(update_info)}; + }); + }) + .thenValue([this, prune_previous_versions](auto&& index_key_and_update_info) { + auto&& [index_key, update_info] = index_key_and_update_info; + return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions); + }) ); } diff --git a/cpp/arcticdb/version/version_core.cpp b/cpp/arcticdb/version/version_core.cpp index 0ea101eee4..ca6ab05fb5 100644 --- a/cpp/arcticdb/version/version_core.cpp +++ b/cpp/arcticdb/version/version_core.cpp @@ -431,6 +431,7 @@ folly::Future async_update_impl( const UpdateInfo& update_info, const UpdateQuery& query, const std::shared_ptr& frame, + const std::shared_ptr& de_dup_map, WriteOptions&& options, bool dynamic_schema, bool empty_types) { @@ -439,6 +440,7 @@ folly::Future async_update_impl( update_info, query, frame, + de_dup_map, options=std::move(options), dynamic_schema, empty_types @@ -446,12 +448,13 @@ folly::Future async_update_impl( check_can_update(*frame, index_segment_reader, update_info, dynamic_schema, empty_types); ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", frame->desc.id(), update_info.previous_index_key_->version_id()); frame->set_bucketize_dynamic(index_segment_reader.bucketize_dynamic()); - return slice_and_write(frame, get_slicing_policy(options, *frame), IndexPartialKey{frame->desc.id(), update_info.next_version_id_} , store + return slice_and_write(frame, get_slicing_policy(options, *frame), IndexPartialKey{frame->desc.id(), update_info.next_version_id_} , store, de_dup_map ).via(&async::cpu_executor()).thenValue([ store, update_info, query, frame, + de_dup_map, dynamic_schema, index_segment_reader=std::move(index_segment_reader) ](std::vector&& new_slice_and_keys) mutable { @@ -501,10 +504,11 @@ VersionedItem update_impl( const UpdateInfo& update_info, const UpdateQuery& query, const std::shared_ptr& frame, + const std::shared_ptr& de_dup_map, WriteOptions&& options, bool dynamic_schema, bool empty_types) { - auto versioned_item = VersionedItem(async_update_impl(store, update_info, query, frame, std::move(options), dynamic_schema, empty_types).get()); + auto versioned_item = VersionedItem(async_update_impl(store, update_info, query, frame, de_dup_map, std::move(options), dynamic_schema, empty_types).get()); ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_); return versioned_item; } diff --git a/cpp/arcticdb/version/version_core.hpp b/cpp/arcticdb/version/version_core.hpp index cf367c42d8..9252d29675 100644 --- a/cpp/arcticdb/version/version_core.hpp +++ b/cpp/arcticdb/version/version_core.hpp @@ -117,6 +117,7 @@ VersionedItem update_impl( const UpdateInfo& update_info, const UpdateQuery & query, const std::shared_ptr& frame, + const std::shared_ptr& de_dup_map, WriteOptions&& options, bool dynamic_schema, bool empty_types); @@ -126,6 +127,7 @@ folly::Future async_update_impl( const UpdateInfo& update_info, const UpdateQuery& query, const std::shared_ptr& frame, + const std::shared_ptr& de_dup_map, WriteOptions&& options, bool dynamic_schema, bool empty_types); diff --git a/python/tests/integration/arcticdb/version_store/test_dedup.py b/python/tests/integration/arcticdb/version_store/test_dedup.py index 42170c15e1..b760dda410 100644 --- a/python/tests/integration/arcticdb/version_store/test_dedup.py +++ b/python/tests/integration/arcticdb/version_store/test_dedup.py @@ -62,25 +62,35 @@ def test_basic_de_dup(basic_store_factory): @pytest.mark.storage -def test_de_dup_same_value_written(basic_store_factory): +@pytest.mark.parametrize("op_type", ["write", "update"]) +def test_de_dup_same_value_written(basic_store_factory, op_type): lib = basic_store_factory(column_group_size=2, segment_row_size=2, de_duplication=True) symbol = "test_de_dup_same_value_written" # This will insert 50 data keys d1 = {"x": np.arange(0, 100, dtype=np.int64)} df1 = pd.DataFrame(data=d1) - lib.write(symbol, df1) + if op_type == "write": + lib.write(symbol, df1) + else: + lib.update(symbol, df1) vit = lib.read(symbol) assert_frame_equal(vit.data, df1) num_keys = len(get_data_keys(lib, symbol)) - lib.write(symbol, df1) + if op_type == "write": + lib.write(symbol, df1) + else: + lib.update(symbol, df1) assert len(lib.list_versions(symbol)) == 2 assert len(get_data_keys(lib, symbol)) == num_keys - lib.write(symbol, df1, prune_previous_version=True) + if op_type == "write": + lib.write(symbol, df1, prune_previous_version=True) + else: + lib.update(symbol, df1, prune_previous_version=True) assert len(lib.list_versions(symbol)) == 1 assert len(get_data_keys(lib, symbol)) == num_keys