Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 43 additions & 38 deletions cpp/arcticdb/version/local_versioned_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,14 @@ VersionedItem LocalVersionedEngine::update_internal(
return VersionedItem(*update_info.previous_index_key_);
}

auto write_options = get_write_options();
auto de_dup_map = get_de_dup_map(stream_id, update_info.previous_index_key_, write_options);
auto versioned_item = update_impl(store(),
update_info,
query,
frame,
get_write_options(),
std::move(de_dup_map),
std::move(write_options),
dynamic_schema,
cfg().write_options().empty_types());
write_version_and_prune_previous(
Expand Down Expand Up @@ -1504,43 +1507,45 @@ std::vector<std::variant<VersionedItem, DataError>> LocalVersionedEngine::batch_
for (const auto&& [idx, stream_update_info_fut] : enumerate(stream_update_info_futures)) {
update_versions_futs.push_back(
std::move(stream_update_info_fut)
.thenValue([this, frame = std::move(frames[idx]), stream_id = stream_ids[idx], update_query = update_queries[idx], upsert](auto&& update_info) {
auto index_key_fut = folly::Future<AtomKey>::makeEmpty();
auto write_options = get_write_options();
if (update_info.previous_index_key_.has_value()) {
const bool dynamic_schema = cfg().write_options().dynamic_schema();
const bool empty_types = cfg().write_options().empty_types();
index_key_fut = async_update_impl(
store(),
update_info,
update_query,
std::move(frame),
std::move(write_options),
dynamic_schema,
empty_types);
} else {
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
upsert,
"Cannot update non-existent symbol {}."
"Using \"upsert=True\" will create create the symbol instead of throwing this exception.",
stream_id);
index_key_fut = async_write_dataframe_impl(
store(),
0,
std::move(frame),
std::move(write_options),
std::make_shared<DeDupMap>(),
false,
true);
}
return std::move(index_key_fut).thenValueInline([update_info = std::move(update_info)](auto&& index_key) mutable {
return IndexKeyAndUpdateInfo{std::move(index_key), std::move(update_info)};
});
})
.thenValue([this, prune_previous_versions](auto&& index_key_and_update_info) {
auto&& [index_key, update_info] = index_key_and_update_info;
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions);
})
.thenValue([this, frame = std::move(frames[idx]), stream_id = stream_ids[idx], update_query = update_queries[idx], upsert](auto&& update_info) {
auto index_key_fut = folly::Future<AtomKey>::makeEmpty();
auto write_options = get_write_options();
if (update_info.previous_index_key_.has_value()) {
const bool dynamic_schema = cfg().write_options().dynamic_schema();
const bool empty_types = cfg().write_options().empty_types();
auto de_dup_map = get_de_dup_map(stream_id, update_info.previous_index_key_, get_write_options());
index_key_fut = async_update_impl(
store(),
update_info,
update_query,
std::move(frame),
std::move(de_dup_map),
std::move(write_options),
dynamic_schema,
empty_types);
} else {
missing_data::check<ErrorCode::E_NO_SUCH_VERSION>(
upsert,
"Cannot update non-existent symbol {}."
"Using \"upsert=True\" will create create the symbol instead of throwing this exception.",
stream_id);
index_key_fut = async_write_dataframe_impl(
store(),
0,
std::move(frame),
std::move(write_options),
std::make_shared<DeDupMap>(),
false,
true);
}
return std::move(index_key_fut).thenValueInline([update_info = std::move(update_info)](auto&& index_key) mutable {
return IndexKeyAndUpdateInfo{std::move(index_key), std::move(update_info)};
});
})
.thenValue([this, prune_previous_versions](auto&& index_key_and_update_info) {
auto&& [index_key, update_info] = index_key_and_update_info;
return write_index_key_to_version_map_async(version_map(), std::move(index_key), std::move(update_info), prune_previous_versions);
})
);
}

Expand Down
8 changes: 6 additions & 2 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ folly::Future<AtomKey> async_update_impl(
const UpdateInfo& update_info,
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const std::shared_ptr<DeDupMap>& de_dup_map,
WriteOptions&& options,
bool dynamic_schema,
bool empty_types) {
Expand All @@ -439,19 +440,21 @@ folly::Future<AtomKey> async_update_impl(
update_info,
query,
frame,
de_dup_map,
options=std::move(options),
dynamic_schema,
empty_types
](index::IndexSegmentReader&& index_segment_reader) {
check_can_update(*frame, index_segment_reader, update_info, dynamic_schema, empty_types);
ARCTICDB_DEBUG(log::version(), "Update versioned dataframe for stream_id: {} , version_id = {}", frame->desc.id(), update_info.previous_index_key_->version_id());
frame->set_bucketize_dynamic(index_segment_reader.bucketize_dynamic());
return slice_and_write(frame, get_slicing_policy(options, *frame), IndexPartialKey{frame->desc.id(), update_info.next_version_id_} , store
return slice_and_write(frame, get_slicing_policy(options, *frame), IndexPartialKey{frame->desc.id(), update_info.next_version_id_} , store, de_dup_map
).via(&async::cpu_executor()).thenValue([
store,
update_info,
query,
frame,
de_dup_map,
dynamic_schema,
index_segment_reader=std::move(index_segment_reader)
](std::vector<SliceAndKey>&& new_slice_and_keys) mutable {
Expand Down Expand Up @@ -501,10 +504,11 @@ VersionedItem update_impl(
const UpdateInfo& update_info,
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const std::shared_ptr<DeDupMap>& de_dup_map,
WriteOptions&& options,
bool dynamic_schema,
bool empty_types) {
auto versioned_item = VersionedItem(async_update_impl(store, update_info, query, frame, std::move(options), dynamic_schema, empty_types).get());
auto versioned_item = VersionedItem(async_update_impl(store, update_info, query, frame, de_dup_map, std::move(options), dynamic_schema, empty_types).get());
ARCTICDB_DEBUG(log::version(), "updated stream_id: {} , version_id: {}", frame->desc.id(), update_info.next_version_id_);
return versioned_item;
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/arcticdb/version/version_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ VersionedItem update_impl(
const UpdateInfo& update_info,
const UpdateQuery & query,
const std::shared_ptr<InputTensorFrame>& frame,
const std::shared_ptr<DeDupMap>& de_dup_map,
WriteOptions&& options,
bool dynamic_schema,
bool empty_types);
Expand All @@ -126,6 +127,7 @@ folly::Future<AtomKey> async_update_impl(
const UpdateInfo& update_info,
const UpdateQuery& query,
const std::shared_ptr<InputTensorFrame>& frame,
const std::shared_ptr<DeDupMap>& de_dup_map,
WriteOptions&& options,
bool dynamic_schema,
bool empty_types);
Expand Down
18 changes: 14 additions & 4 deletions python/tests/integration/arcticdb/version_store/test_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,25 +62,35 @@ def test_basic_de_dup(basic_store_factory):


@pytest.mark.storage
def test_de_dup_same_value_written(basic_store_factory):
@pytest.mark.parametrize("op_type", ["write", "update"])
def test_de_dup_same_value_written(basic_store_factory, op_type):
lib = basic_store_factory(column_group_size=2, segment_row_size=2, de_duplication=True)
symbol = "test_de_dup_same_value_written"

# This will insert 50 data keys
d1 = {"x": np.arange(0, 100, dtype=np.int64)}
df1 = pd.DataFrame(data=d1)
lib.write(symbol, df1)
if op_type == "write":
lib.write(symbol, df1)
else:
lib.update(symbol, df1)
vit = lib.read(symbol)
assert_frame_equal(vit.data, df1)

num_keys = len(get_data_keys(lib, symbol))

lib.write(symbol, df1)
if op_type == "write":
lib.write(symbol, df1)
else:
lib.update(symbol, df1)

assert len(lib.list_versions(symbol)) == 2
assert len(get_data_keys(lib, symbol)) == num_keys

lib.write(symbol, df1, prune_previous_version=True)
if op_type == "write":
lib.write(symbol, df1, prune_previous_version=True)
else:
lib.update(symbol, df1, prune_previous_version=True)
assert len(lib.list_versions(symbol)) == 1
assert len(get_data_keys(lib, symbol)) == num_keys

Expand Down
Loading