Skip to content

Commit e17f0f9

Browse files
author
XiaoHongbo
authored
[python] Fix update_by_row_id conflict when file rolling splits (#7574)
When calling `update_by_arrow_with_row_id` or `upsert_by_arrow_with_key` on large files, there is `RuntimeError: For Data Evolution table, multiple 'MERGE INTO' and 'COMPACT' operations have encountered conflicts`. This PR fixes the above issue by disabling rolling for one file when `update_by_arrow_with_row_id`.
1 parent 779c841 commit e17f0f9

File tree

3 files changed

+74
-1
lines changed

3 files changed

+74
-1
lines changed

paimon-python/pypaimon/tests/table_update_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,73 @@ def update_worker(thread_index, update_spec):
10521052
self.assertEqual(40, ages[3], "Row 3 should remain unchanged")
10531053
self.assertEqual(45, ages[4], "Row 4 should remain unchanged")
10541054

1055+
def test_update_with_large_file(self):
1056+
import uuid
1057+
import random
1058+
import string
1059+
1060+
table_name = f'test_row_id_split_{uuid.uuid4().hex[:8]}'
1061+
schema = Schema.from_pyarrow_schema(
1062+
pa.schema([('id', pa.int64()), ('name', pa.string())]),
1063+
options={
1064+
'row-tracking.enabled': 'true',
1065+
'data-evolution.enabled': 'true',
1066+
'write-only': 'true',
1067+
}
1068+
)
1069+
self.catalog.create_table(
1070+
f'default.{table_name}', schema, False)
1071+
table = self.catalog.get_table(f'default.{table_name}')
1072+
1073+
N = 5000
1074+
data = pa.table({
1075+
'id': list(range(N)),
1076+
'name': [
1077+
''.join(random.choices(
1078+
string.ascii_letters, k=200))
1079+
for _ in range(N)],
1080+
})
1081+
wb = table.new_batch_write_builder()
1082+
tw = wb.new_write()
1083+
tc = wb.new_commit()
1084+
tw.write_arrow(data)
1085+
tc.commit(tw.prepare_commit())
1086+
tw.close()
1087+
tc.close()
1088+
1089+
from pypaimon.schema.schema_change import SetOption
1090+
self.catalog.alter_table(
1091+
f'default.{table_name}',
1092+
[SetOption('target-file-size', '10kb')])
1093+
table = self.catalog.get_table(f'default.{table_name}')
1094+
1095+
wb = table.new_batch_write_builder()
1096+
updator = wb.new_update()
1097+
updator.with_update_type(['name'])
1098+
update_data = pa.table({
1099+
'_ROW_ID': pa.array(
1100+
list(range(N)), type=pa.int64()),
1101+
'name': [
1102+
''.join(random.choices(
1103+
string.ascii_letters, k=200))
1104+
for _ in range(N)],
1105+
})
1106+
msgs = updator.update_by_arrow_with_row_id(update_data)
1107+
1108+
all_files = []
1109+
for msg in msgs:
1110+
all_files.extend(msg.new_files)
1111+
1112+
self.assertEqual(
1113+
len(all_files), 1,
1114+
"Update should produce exactly one file per group")
1115+
self.assertEqual(all_files[0].first_row_id, 0)
1116+
self.assertEqual(all_files[0].row_count, N)
1117+
1118+
tc = wb.new_commit()
1119+
tc.commit(msgs)
1120+
tc.close()
1121+
10551122

10561123
if __name__ == '__main__':
10571124
unittest.main()

paimon-python/pypaimon/write/file_store_write.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ def __init__(self, table, commit_user):
4646
(f"{self.options.data_file_prefix()}-u-{commit_user}"
4747
f"-s-{random.randint(0, 2 ** 31 - 2)}-w-"))
4848

49+
def disable_rolling(self):
50+
"""Disable file rolling by setting target_file_size to max."""
51+
self.options.set(
52+
CoreOptions.TARGET_FILE_SIZE, str(2 ** 63 - 1))
53+
4954
def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
5055
key = (partition, bucket)
5156
if key not in self.data_writers:

paimon-python/pypaimon/write/table_update_by_row_id.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,9 @@ def _write_group(self, partition: GenericRow, first_row_id: int,
293293
merged_data = self._merge_update_with_original(original_data, data, column_names, first_row_id)
294294

295295
# Create a file store write for this partition
296+
# Disable rolling to ensure one output file per first_row_id group,
296297
file_store_write = FileStoreWrite(self.table, self.commit_user)
298+
file_store_write.disable_rolling()
297299

298300
# Set write columns to only update specific columns
299301
write_cols = column_names
@@ -313,7 +315,6 @@ def _write_group(self, partition: GenericRow, first_row_id: int,
313315
for msg in commit_messages:
314316
msg.check_from_snapshot = self.snapshot_id
315317
for file in msg.new_files:
316-
# Assign the same first_row_id as the original file
317318
file.first_row_id = first_row_id
318319
file.write_cols = write_cols
319320

0 commit comments

Comments
 (0)