Skip to content

Commit cbd8724

Browse files
committed
Support for fixed-length string/bytes cols during parquet imports
1 parent 0fc5419 commit cbd8724

3 files changed

Lines changed: 192 additions & 10 deletions

File tree

src/blosc2/cli/parquet_to_blosc2.py

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,24 @@ def build_parser() -> argparse.ArgumentParser:
146146
help="Output path. Defaults depend on the mode and input path.",
147147
)
148148
parser.add_argument("--parquet-batch-size", type=int, default=DEFAULT_BATCH_SIZE)
149+
parser.add_argument(
150+
"--fixed-str-maxlen",
151+
type=int,
152+
default=None,
153+
help=(
154+
"Pre-scan string columns and import columns whose maximum character length is at most "
155+
"this value as fixed-width, indexable strings. Other string columns remain vlstring."
156+
),
157+
)
158+
parser.add_argument(
159+
"--fixed-bytes-maxlen",
160+
type=int,
161+
default=None,
162+
help=(
163+
"Pre-scan binary columns and import columns whose maximum byte length is at most this value "
164+
"as fixed-width, indexable bytes. Other binary columns remain vlbytes."
165+
),
166+
)
149167
parser.add_argument(
150168
"--max-rows",
151169
type=int,
@@ -217,12 +235,19 @@ def _release_arrow_temporaries(pa) -> None:
217235
pa.default_memory_pool().release_unused()
218236

219237

220-
def classify_columns(pa, schema):
238+
def classify_columns(
239+
pa,
240+
schema,
241+
fixed_string_lengths: dict[str, int] | None = None,
242+
fixed_bytes_lengths: dict[str, int] | None = None,
243+
):
221244
"""Classify Parquet schema columns into importable categories."""
222245
fixed_cols: dict[str, object] = {}
223246
struct_wrap_cols: dict[str, object] = {}
224247
conversions: dict[str, dict[str, Any]] = {}
225248
nullable_scalars: list[str] = []
249+
fixed_string_lengths = fixed_string_lengths or {}
250+
fixed_bytes_lengths = fixed_bytes_lengths or {}
226251

227252
for field in schema:
228253
t = field.type
@@ -251,11 +276,25 @@ def classify_columns(pa, schema):
251276
continue
252277
if pa.types.is_string(t) or pa.types.is_large_string(t):
253278
fixed_cols[field.name] = field
254-
conversions[field.name] = {"conversion": "vlstring_nullable" if field.nullable else "vlstring"}
279+
if field.name in fixed_string_lengths:
280+
conversions[field.name] = {
281+
"conversion": "fixed_string_nullable" if field.nullable else "fixed_string",
282+
"max_length": fixed_string_lengths[field.name],
283+
}
284+
else:
285+
conversions[field.name] = {
286+
"conversion": "vlstring_nullable" if field.nullable else "vlstring"
287+
}
255288
continue
256289
if pa.types.is_binary(t) or pa.types.is_large_binary(t):
257290
fixed_cols[field.name] = field
258-
conversions[field.name] = {"conversion": "vlbytes_nullable" if field.nullable else "vlbytes"}
291+
if field.name in fixed_bytes_lengths:
292+
conversions[field.name] = {
293+
"conversion": "fixed_bytes_nullable" if field.nullable else "fixed_bytes",
294+
"max_length": fixed_bytes_lengths[field.name],
295+
}
296+
else:
297+
conversions[field.name] = {"conversion": "vlbytes_nullable" if field.nullable else "vlbytes"}
259298
continue
260299
conversions[field.name] = {"conversion": "skipped", "reason": f"unsupported: {t}"}
261300

@@ -273,6 +312,101 @@ def build_import_schema(pa, original_schema, fixed_cols: dict, struct_wrap_cols:
273312
return pa.schema(fields)
274313

275314

315+
def candidate_fixed_scalar_columns(pa, schema, *, scan_strings: bool, scan_bytes: bool) -> list[str]:
316+
columns = []
317+
for field in schema:
318+
if (scan_strings and (pa.types.is_string(field.type) or pa.types.is_large_string(field.type))) or (
319+
scan_bytes and (pa.types.is_binary(field.type) or pa.types.is_large_binary(field.type))
320+
):
321+
columns.append(field.name)
322+
return columns
323+
324+
325+
def update_string_and_bytes_max_lengths(pa, pc, batch, max_lengths: dict[str, int]) -> None:
326+
for field in batch.schema:
327+
arr = batch.column(field.name)
328+
if pa.types.is_string(field.type) or pa.types.is_large_string(field.type):
329+
lengths = pc.utf8_length(arr)
330+
else:
331+
lengths = pc.binary_length(arr)
332+
batch_max = pc.max(lengths).as_py()
333+
if batch_max is not None:
334+
max_lengths[field.name] = max(max_lengths[field.name], int(batch_max))
335+
336+
337+
def nullable_sentinel_adjusted_length(pa, field, max_length: int, null_policy) -> int:
338+
if not field.nullable:
339+
return max_length
340+
null_value = null_policy.column_null_values.get(
341+
field.name, null_policy.sentinel_for_arrow_type(pa, field.type)
342+
)
343+
return max(max_length, len(null_value)) if null_value is not None else max_length
344+
345+
346+
def fixed_string_and_bytes_lengths_from_scan(pa, schema, args, max_lengths: dict[str, int]):
347+
from blosc2.ctable import get_null_policy
348+
349+
null_policy = get_null_policy()
350+
fixed_string_lengths = {}
351+
fixed_bytes_lengths = {}
352+
for field in schema:
353+
max_length = max_lengths.get(field.name)
354+
if max_length is None:
355+
continue
356+
max_length = nullable_sentinel_adjusted_length(pa, field, max_length, null_policy)
357+
if (
358+
args.fixed_str_maxlen is not None
359+
and (pa.types.is_string(field.type) or pa.types.is_large_string(field.type))
360+
and max_length <= args.fixed_str_maxlen
361+
):
362+
fixed_string_lengths[field.name] = args.fixed_str_maxlen
363+
elif (
364+
args.fixed_bytes_maxlen is not None
365+
and (pa.types.is_binary(field.type) or pa.types.is_large_binary(field.type))
366+
and max_length <= args.fixed_bytes_maxlen
367+
):
368+
fixed_bytes_lengths[field.name] = args.fixed_bytes_maxlen
369+
return fixed_string_lengths, fixed_bytes_lengths
370+
371+
372+
def scan_string_and_bytes_lengths(pa, pf, args, schema) -> tuple[dict[str, int], dict[str, int]]:
373+
if args.fixed_str_maxlen is None and args.fixed_bytes_maxlen is None:
374+
return {}, {}
375+
376+
import pyarrow.compute as pc
377+
378+
columns = candidate_fixed_scalar_columns(
379+
pa,
380+
schema,
381+
scan_strings=args.fixed_str_maxlen is not None,
382+
scan_bytes=args.fixed_bytes_maxlen is not None,
383+
)
384+
if not columns:
385+
return {}, {}
386+
387+
print("Pre-scanning string/binary column lengths...")
388+
rows_done = 0
389+
total = pf.metadata.num_rows if args.max_rows is None else min(args.max_rows, pf.metadata.num_rows)
390+
max_lengths = dict.fromkeys(columns, 0)
391+
for batch in pf.iter_batches(batch_size=args.parquet_batch_size, columns=columns):
392+
remaining = total - rows_done
393+
if remaining <= 0:
394+
break
395+
if len(batch) > remaining:
396+
batch = batch.slice(0, remaining)
397+
update_string_and_bytes_max_lengths(pa, pc, batch, max_lengths)
398+
rows_done += len(batch)
399+
400+
fixed_string_lengths, fixed_bytes_lengths = fixed_string_and_bytes_lengths_from_scan(
401+
pa, schema, args, max_lengths
402+
)
403+
print(
404+
f" fixed string columns: {len(fixed_string_lengths):,}; "
405+
f"fixed bytes columns: {len(fixed_bytes_lengths):,}"
406+
)
407+
return fixed_string_lengths, fixed_bytes_lengths
408+
409+
276410
def transform_batch(pa, batch, selected_cols: list[str], struct_wrap_cols: dict):
277411
"""Wrap struct-valued cells as singleton lists; pass everything else through."""
278412
if not struct_wrap_cols:
@@ -337,6 +471,12 @@ def print_import_plan(
337471
vlbytes_cols = [
338472
n for n, e in conversions.items() if e.get("conversion") in {"vlbytes", "vlbytes_nullable"}
339473
]
474+
fixed_string_cols = [
475+
n for n, e in conversions.items() if e.get("conversion") in {"fixed_string", "fixed_string_nullable"}
476+
]
477+
fixed_bytes_cols = [
478+
n for n, e in conversions.items() if e.get("conversion") in {"fixed_bytes", "fixed_bytes_nullable"}
479+
]
340480
wrapped_structs = list(struct_wrap_cols)
341481
skipped = {n: e for n, e in conversions.items() if e.get("conversion") == "skipped"}
342482
print(f"Input: {input_path} ({input_path.stat().st_size / 1e6:.1f} MB)")
@@ -348,13 +488,19 @@ def print_import_plan(
348488
print(f"Parquet columns: {len(parquet_schema)}")
349489
print(f"Imported columns: {len(fixed_cols) + len(struct_wrap_cols)}")
350490
print(f" Fixed-width: {len(fixed_cols) - len(vlstring_cols) - len(vlbytes_cols)}")
491+
print(f" Fixed strings: {len(fixed_string_cols)}")
492+
print(f" Fixed bytes: {len(fixed_bytes_cols)}")
351493
print(f" vlstring: {len(vlstring_cols)}")
352494
print(f" vlbytes: {len(vlbytes_cols)}")
353495
print(f" Struct→list: {len(wrapped_structs)}")
354496
print(f" Nullable scalars: {len(nullable_scalars)}")
355497
print(f" Skipped unsupported: {len(skipped)}")
356498
for name, entry in skipped.items():
357499
print(f" - {name}: {entry['reason']}")
500+
if args.fixed_str_maxlen is not None:
501+
print(f"Fixed string maxlen: {args.fixed_str_maxlen:,} characters")
502+
if args.fixed_bytes_maxlen is not None:
503+
print(f"Fixed bytes maxlen: {args.fixed_bytes_maxlen:,} bytes")
358504
print(f"Parquet batch size: {args.parquet_batch_size:,}")
359505
print(f"Blosc2 batch size: {args.blosc2_batch_size:,}")
360506
print(f"Codec / level: {args.codec} / {args.clevel}")
@@ -401,6 +547,10 @@ def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
401547
raise ValueError("--parquet-batch-size must be positive")
402548
if args.blosc2_batch_size <= 0:
403549
raise ValueError("--blosc2-batch-size must be positive")
550+
if args.fixed_str_maxlen is not None and args.fixed_str_maxlen <= 0:
551+
raise ValueError("--fixed-str-maxlen must be positive")
552+
if args.fixed_bytes_maxlen is not None and args.fixed_bytes_maxlen <= 0:
553+
raise ValueError("--fixed-bytes-maxlen must be positive")
404554
if args.max_rows is not None and args.max_rows < 0:
405555
raise ValueError("--max-rows must be non-negative")
406556
if args.mem_every <= 0:
@@ -419,11 +569,17 @@ def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
419569
maybe_memory_report(args, "after ParquetFile open", pa)
420570
parquet_schema = pf.schema_arrow
421571

422-
fixed_cols, struct_wrap_cols, conversions, nullable_scalars = classify_columns(pa, parquet_schema)
572+
fixed_string_lengths, fixed_bytes_lengths = scan_string_and_bytes_lengths(pa, pf, args, parquet_schema)
573+
maybe_memory_report(args, "after string/binary length scan", pa)
574+
575+
fixed_cols, struct_wrap_cols, conversions, nullable_scalars = classify_columns(
576+
pa, parquet_schema, fixed_string_lengths, fixed_bytes_lengths
577+
)
423578
maybe_memory_report(args, "after column classification", pa)
424579

425580
selected_cols = [f.name for f in parquet_schema if f.name in fixed_cols or f.name in struct_wrap_cols]
426581
import_schema = build_import_schema(pa, parquet_schema, fixed_cols, struct_wrap_cols)
582+
fixed_scalar_lengths = {**fixed_string_lengths, **fixed_bytes_lengths} or None
427583
maybe_memory_report(args, "after import schema build", pa)
428584

429585
print_import_plan(
@@ -450,7 +606,7 @@ def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
450606
capacity_hint=(
451607
pf.metadata.num_rows if args.max_rows is None else min(args.max_rows, pf.metadata.num_rows)
452608
),
453-
string_max_length=None,
609+
string_max_length=fixed_scalar_lengths,
454610
auto_null_sentinels=True,
455611
blosc2_batch_size=args.blosc2_batch_size,
456612
)

src/blosc2/ctable.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2950,6 +2950,12 @@ def _arrow_type_to_spec( # noqa: C901
29502950
"uint8/16/32/64, float32/64, bool, string, binary, and list."
29512951
)
29522952

2953+
@staticmethod
2954+
def _string_max_length_for_column(string_max_length, name: str):
2955+
if isinstance(string_max_length, Mapping):
2956+
return string_max_length.get(name)
2957+
return string_max_length
2958+
29532959
@classmethod
29542960
def _compiled_columns_from_arrow(
29552961
cls,
@@ -2974,10 +2980,11 @@ def _compiled_columns_from_arrow(
29742980
arrow_col = table_for_inference.column(name) if table_for_inference is not None else None
29752981
field_is_list = pa.types.is_list(field.type) or pa.types.is_large_list(field.type)
29762982
field_is_struct = pa.types.is_struct(field.type)
2983+
column_string_max_length = cls._string_max_length_for_column(string_max_length, name)
29772984
field_is_varlen_scalar = (
29782985
not field_is_list
29792986
and not field_is_struct
2980-
and string_max_length is None
2987+
and column_string_max_length is None
29812988
and (
29822989
pa.types.is_string(field.type)
29832990
or pa.types.is_large_string(field.type)
@@ -3016,7 +3023,7 @@ def _compiled_columns_from_arrow(
30163023
pa,
30173024
field.type,
30183025
arrow_col,
3019-
string_max_length=string_max_length,
3026+
string_max_length=column_string_max_length,
30203027
null_value=null_value,
30213028
nullable=field.nullable,
30223029
)
@@ -3189,7 +3196,7 @@ def from_arrow(
31893196
dparams=None,
31903197
validate: bool = False,
31913198
capacity_hint: int | None = None,
3192-
string_max_length: int | None = None,
3199+
string_max_length: int | Mapping[str, int] | None = None,
31933200
auto_null_sentinels: bool = True,
31943201
blosc2_batch_size: int | None = _BATCH_SIZE_DEFAULT,
31953202
) -> CTable:
@@ -3204,7 +3211,9 @@ def from_arrow(
32043211
When *string_max_length* is set to a positive integer, scalar string
32053212
and binary columns are imported as fixed-width
32063213
:func:`~blosc2.string` / :func:`~blosc2.bytes` columns whose dtype is
3207-
sized to *string_max_length* characters/bytes.
3214+
sized to *string_max_length* characters/bytes. It may also be a mapping
3215+
from column name to max length; omitted string/binary columns remain
3216+
:func:`~blosc2.vlstring` / :func:`~blosc2.vlbytes` columns.
32083217
32093218
``blosc2_batch_size`` controls how many rows are buffered before
32103219
BatchArray-backed imported columns (list columns and varlen scalar
@@ -3217,7 +3226,7 @@ def from_arrow(
32173226
batches = iter(batches)
32183227
first_batch = None
32193228
table_for_inference = None
3220-
if string_max_length is None:
3229+
if string_max_length is None or isinstance(string_max_length, Mapping):
32213230
first_batch = next(batches, None)
32223231
if first_batch is not None:
32233232
table_for_inference = pa.Table.from_batches([first_batch], schema=schema)

src/blosc2/schema_compiler.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from __future__ import annotations
1212

13+
import base64
1314
import copy
1415
import dataclasses
1516
import typing
@@ -348,12 +349,22 @@ def compile_schema(row_cls: type[Any]) -> CompiledSchema:
348349
# ---------------------------------------------------------------------------
349350

350351

352+
def _bytes_to_json(value: bytes) -> dict[str, Any]:
353+
return {"__bytes__": True, "base64": base64.b64encode(value).decode("ascii")}
354+
355+
356+
def _json_to_bytes(value: dict[str, Any]) -> bytes:
357+
return base64.b64decode(value["base64"])
358+
359+
351360
def _default_to_json(value: Any) -> Any:
352361
"""Convert a field default to a JSON-compatible value."""
353362
if value is MISSING:
354363
return None
355364
if isinstance(value, complex):
356365
return {"__complex__": True, "real": value.real, "imag": value.imag}
366+
if isinstance(value, bytes):
367+
return _bytes_to_json(value)
357368
return value
358369

359370

@@ -363,13 +374,17 @@ def _default_from_json(value: Any) -> Any:
363374
return MISSING
364375
if isinstance(value, dict) and value.get("__complex__"):
365376
return complex(value["real"], value["imag"])
377+
if isinstance(value, dict) and value.get("__bytes__"):
378+
return _json_to_bytes(value)
366379
return value
367380

368381

369382
def spec_from_metadata_dict(data: dict[str, Any]) -> SchemaSpec:
370383
"""Reconstruct one SchemaSpec from serialized metadata."""
371384
data = dict(data)
372385
kind = data.pop("kind")
386+
if isinstance(data.get("null_value"), dict) and data["null_value"].get("__bytes__"):
387+
data["null_value"] = _json_to_bytes(data["null_value"])
373388
if kind == "list":
374389
item_spec = spec_from_metadata_dict(data.pop("item"))
375390
return ListSpec(item_spec, **data)
@@ -404,6 +419,8 @@ def schema_to_dict(schema: CompiledSchema) -> dict[str, Any]:
404419
for col in schema.columns:
405420
entry: dict[str, Any] = {"name": col.name}
406421
entry.update(col.spec.to_metadata_dict()) # adds "kind" + constraints
422+
if isinstance(entry.get("null_value"), bytes):
423+
entry["null_value"] = _bytes_to_json(entry["null_value"])
407424
entry["default"] = _default_to_json(col.default)
408425
if col.config.cparams is not None:
409426
entry["cparams"] = col.config.cparams

0 commit comments

Comments
 (0)