Skip to content

Commit 098ef8e

Browse files
author
XiaoHongbo
authored
[python] Fix BINARY(N) type mapping to use variable-length binary (#7518)
1 parent e109ccd commit 098ef8e

File tree

5 files changed

+86
-30
lines changed

5 files changed

+86
-30
lines changed

paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public void testJavaWriteReadPkTable() throws Exception {
155155
.column("ts", DataTypes.TIMESTAMP())
156156
.column("ts_ltz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
157157
.column("t", DataTypes.TIME())
158+
.column("bin_data", DataTypes.BINARY(20))
158159
.column(
159160
"metadata",
160161
DataTypes.ROW(
@@ -186,8 +187,18 @@ public void testJavaWriteReadPkTable() throws Exception {
186187

187188
write.write(
188189
createRow7Cols(
189-
1, "Apple", "Fruit", 1.5, 1000000L, 2000000L, 1000, "store1", 1001L,
190-
"Beijing", "China"));
190+
1,
191+
"Apple",
192+
"Fruit",
193+
1.5,
194+
1000000L,
195+
2000000L,
196+
1000,
197+
"apple_bin_data".getBytes(),
198+
"store1",
199+
1001L,
200+
"Beijing",
201+
"China"));
191202
write.write(
192203
createRow7Cols(
193204
2,
@@ -197,6 +208,7 @@ public void testJavaWriteReadPkTable() throws Exception {
197208
1000001L,
198209
2000001L,
199210
2000,
211+
"banana_bin".getBytes(),
200212
"store1",
201213
1002L,
202214
"Shanghai",
@@ -210,6 +222,7 @@ public void testJavaWriteReadPkTable() throws Exception {
210222
1000002L,
211223
2000002L,
212224
3000,
225+
"carrot".getBytes(),
213226
"store2",
214227
1003L,
215228
"Tokyo",
@@ -223,18 +236,39 @@ public void testJavaWriteReadPkTable() throws Exception {
223236
1000003L,
224237
2000003L,
225238
4000,
239+
"broccoli_binary_data".getBytes(),
226240
"store2",
227241
1004L,
228242
"Seoul",
229243
"Korea"));
230244
write.write(
231245
createRow7Cols(
232-
5, "Chicken", "Meat", 5.0, 1000004L, 2000004L, 5000, "store3",
233-
1005L, "NewYork", "USA"));
246+
5,
247+
"Chicken",
248+
"Meat",
249+
5.0,
250+
1000004L,
251+
2000004L,
252+
5000,
253+
"chicken".getBytes(),
254+
"store3",
255+
1005L,
256+
"NewYork",
257+
"USA"));
234258
write.write(
235259
createRow7Cols(
236-
6, "Beef", "Meat", 8.0, 1000005L, 2000005L, 6000, "store3", 1006L,
237-
"London", "UK"));
260+
6,
261+
"Beef",
262+
"Meat",
263+
8.0,
264+
1000005L,
265+
2000005L,
266+
6000,
267+
"beef_data".getBytes(),
268+
"store3",
269+
1006L,
270+
"London",
271+
"UK"));
238272
// Row with null partition value -> __DEFAULT_PARTITION__
239273
write.write(
240274
GenericRow.of(
@@ -245,6 +279,7 @@ public void testJavaWriteReadPkTable() throws Exception {
245279
org.apache.paimon.data.Timestamp.fromEpochMillis(1000006L),
246280
org.apache.paimon.data.Timestamp.fromEpochMillis(2000006L),
247281
7000,
282+
"tofu".getBytes(),
248283
GenericRow.of(
249284
BinaryString.fromString("store4"),
250285
1007L,
@@ -262,13 +297,13 @@ public void testJavaWriteReadPkTable() throws Exception {
262297
getResult(read, splits, row -> rowToStringWithStruct(row, table.rowType()));
263298
assertThat(res)
264299
.containsExactlyInAnyOrder(
265-
"+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, 1000, (store1, 1001, (Beijing, China))]",
266-
"+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, 2000, (store1, 1002, (Shanghai, China))]",
267-
"+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, 3000, (store2, 1003, (Tokyo, Japan))]",
268-
"+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, 4000, (store2, 1004, (Seoul, Korea))]",
269-
"+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, 5000, (store3, 1005, (NewYork, USA))]",
270-
"+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, 6000, (store3, 1006, (London, UK))]",
271-
"+I[7, Tofu, NULL, 3.0, 1970-01-01T00:16:40.006, 1970-01-01T00:33:20.006, 7000, (store4, 1007, (Paris, France))]");
300+
"+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 1970-01-01T00:33:20, 1000, [97, 112, 112, 108, 101, 95, 98, 105, 110, 95, 100, 97, 116, 97], (store1, 1001, (Beijing, China))]",
301+
"+I[2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, 2000, [98, 97, 110, 97, 110, 97, 95, 98, 105, 110], (store1, 1002, (Shanghai, China))]",
302+
"+I[3, Carrot, Vegetable, 0.6, 1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, 3000, [99, 97, 114, 114, 111, 116], (store2, 1003, (Tokyo, Japan))]",
303+
"+I[4, Broccoli, Vegetable, 1.2, 1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, 4000, [98, 114, 111, 99, 99, 111, 108, 105, 95, 98, 105, 110, 97, 114, 121, 95, 100, 97, 116, 97], (store2, 1004, (Seoul, Korea))]",
304+
"+I[5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, 5000, [99, 104, 105, 99, 107, 101, 110], (store3, 1005, (NewYork, USA))]",
305+
"+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 1970-01-01T00:33:20.005, 6000, [98, 101, 101, 102, 95, 100, 97, 116, 97], (store3, 1006, (London, UK))]",
306+
"+I[7, Tofu, NULL, 3.0, 1970-01-01T00:16:40.006, 1970-01-01T00:33:20.006, 7000, [116, 111, 102, 117], (store4, 1007, (Paris, France))]");
272307
}
273308
}
274309

@@ -792,6 +827,7 @@ private static InternalRow createRow7Cols(
792827
long ts,
793828
long tsLtz,
794829
int timeMillis,
830+
byte[] binData,
795831
String metadataSource,
796832
long metadataCreatedAt,
797833
String city,
@@ -809,6 +845,7 @@ private static InternalRow createRow7Cols(
809845
org.apache.paimon.data.Timestamp.fromEpochMillis(ts),
810846
org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz),
811847
timeMillis,
848+
binData,
812849
metadataRow);
813850
}
814851

paimon-python/pypaimon/schema/data_types.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -477,18 +477,10 @@ def from_paimon_type(data_type: DataType) -> pyarrow.DataType:
477477
return pyarrow.bool_()
478478
elif type_name == 'STRING' or type_name.startswith('CHAR') or type_name.startswith('VARCHAR'):
479479
return pyarrow.string()
480-
elif type_name == 'BYTES' or type_name.startswith('VARBINARY'):
480+
elif type_name == 'BYTES' or type_name.startswith('VARBINARY') or type_name.startswith('BINARY'):
481481
return pyarrow.binary()
482482
elif type_name == 'BLOB':
483483
return pyarrow.large_binary()
484-
elif type_name.startswith('BINARY'):
485-
if type_name == 'BINARY':
486-
return pyarrow.binary(1)
487-
match = re.fullmatch(r'BINARY\((\d+)\)', type_name)
488-
if match:
489-
length = int(match.group(1))
490-
if length > 0:
491-
return pyarrow.binary(length)
492484
elif type_name.startswith('DECIMAL'):
493485
if type_name == 'DECIMAL':
494486
return pyarrow.decimal128(10, 0) # default to 10, 0

paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,10 @@ def test_read_pk_table(self, file_format):
240240
self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)")
241241
self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH LOCAL TIME ZONE")
242242
self.assertEqual(table.fields[6].type.type, "TIME(0)")
243+
self.assertEqual(table.fields[7].type.type, "BINARY(20)")
243244
from pypaimon.schema.data_types import RowType
244-
self.assertIsInstance(table.fields[7].type, RowType)
245-
metadata_fields = table.fields[7].type.fields
245+
self.assertIsInstance(table.fields[8].type, RowType)
246+
metadata_fields = table.fields[8].type.fields
246247
self.assertEqual(len(metadata_fields), 3)
247248
self.assertEqual(metadata_fields[0].name, 'source')
248249
self.assertEqual(metadata_fields[1].name, 'created_at')
@@ -259,6 +260,12 @@ def test_read_pk_table(self, file_format):
259260
self.assertEqual(len(tofu_row), 1)
260261
self.assertTrue(pd.isna(tofu_row['category'].iloc[0]))
261262

263+
if file_format != "lance" and 'bin_data' in res.columns:
264+
apple_row = res[res['name'] == 'Apple']
265+
self.assertEqual(apple_row['bin_data'].iloc[0], b'apple_bin_data')
266+
carrot_row = res[res['name'] == 'Carrot']
267+
self.assertEqual(carrot_row['bin_data'].iloc[0], b'carrot')
268+
262269
# Verify metadata column can be read and contains nested structures
263270
if 'metadata' in res.columns:
264271
self.assertFalse(res['metadata'].isnull().all())

paimon-python/pypaimon/tests/reader_base_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ def test_types(self):
645645
DataField(5, "f5", AtomicType('DOUBLE'), 'desc'),
646646
DataField(6, "f6", AtomicType('BOOLEAN'), 'desc'),
647647
DataField(7, "f7", AtomicType('STRING'), 'desc'),
648-
DataField(8, "f8", AtomicType('BINARY(12)'), 'desc'),
648+
DataField(8, "f8", AtomicType('BYTES'), 'desc'),
649649
DataField(9, "f9", AtomicType('DECIMAL(10, 6)'), 'desc'),
650650
DataField(10, "f10", AtomicType('BYTES'), 'desc'),
651651
DataField(11, "f11", AtomicType('DATE'), 'desc'),

paimon-python/pypaimon/write/table_write.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,31 @@ def close(self):
102102
self.file_store_write.close()
103103

104104
def _validate_pyarrow_schema(self, data_schema: pa.Schema):
105-
if data_schema != self.table_pyarrow_schema and data_schema.names != self.file_store_write.write_cols:
106-
raise ValueError(f"Input schema isn't consistent with table schema and write cols. "
107-
f"Input schema is: {data_schema} "
108-
f"Table schema is: {self.table_pyarrow_schema} "
109-
f"Write cols is: {self.file_store_write.write_cols}")
105+
if data_schema == self.table_pyarrow_schema:
106+
return
107+
if data_schema.names == self.file_store_write.write_cols:
108+
return
109+
# Allow compatible binary types: binary, fixed_size_binary[N] are interchangeable
110+
if data_schema.names == self.table_pyarrow_schema.names:
111+
compatible = True
112+
for i in range(len(data_schema)):
113+
input_type = data_schema.field(i).type
114+
table_type = self.table_pyarrow_schema.field(i).type
115+
if input_type != table_type:
116+
if self._is_binary_family(input_type) and self._is_binary_family(table_type):
117+
continue
118+
compatible = False
119+
break
120+
if compatible:
121+
return
122+
raise ValueError(f"Input schema isn't consistent with table schema and write cols. "
123+
f"Input schema is: {data_schema} "
124+
f"Table schema is: {self.table_pyarrow_schema} "
125+
f"Write cols is: {self.file_store_write.write_cols}")
126+
127+
@staticmethod
128+
def _is_binary_family(arrow_type) -> bool:
129+
return pa.types.is_binary(arrow_type) or pa.types.is_fixed_size_binary(arrow_type)
110130

111131

112132
class BatchTableWrite(TableWrite):

0 commit comments

Comments
 (0)