Skip to content

Commit 363ab01

Browse files
committed
Speed up parquet import
1 parent 2606897 commit 363ab01

6 files changed

Lines changed: 135 additions & 34 deletions

File tree

src/blosc2/cli/parquet_to_blosc2.py

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,11 @@
3131
import argparse
3232
import base64
3333
import contextlib
34+
import cProfile
3435
import gc
36+
import io
3537
import os
38+
import pstats
3639
import shutil
3740
import sys
3841
import time
@@ -142,7 +145,19 @@ def build_parser() -> argparse.ArgumentParser:
142145
default=None,
143146
help="Output path. Defaults depend on the mode and input path.",
144147
)
145-
parser.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE)
148+
parser.add_argument("--parquet-batch-size", type=int, default=DEFAULT_BATCH_SIZE)
149+
parser.add_argument(
150+
"--batch-size",
151+
dest="parquet_batch_size",
152+
type=int,
153+
help=argparse.SUPPRESS,
154+
)
155+
parser.add_argument(
156+
"--blosc2-batch-size",
157+
type=int,
158+
default=DEFAULT_BATCH_SIZE,
159+
help="Rows grouped into each persisted BatchArray batch for imported Blosc2 varlen/list columns.",
160+
)
146161
parser.add_argument("--codec", type=str, default="ZSTD", choices=[c.name for c in blosc2.Codec])
147162
parser.add_argument("--clevel", type=int, default=5)
148163
parser.add_argument(
@@ -162,6 +177,11 @@ def build_parser() -> argparse.ArgumentParser:
162177
default=1,
163178
help="Print progress every N batches; the final batch is always reported.",
164179
)
180+
parser.add_argument(
181+
"--profile",
182+
action="store_true",
183+
help="Run the selected operation under cProfile and print cumulative timing stats.",
184+
)
165185
parser.add_argument("--overwrite", action="store_true")
166186
return parser
167187

@@ -327,7 +347,8 @@ def print_import_plan(
327347
print(f" Skipped unsupported: {len(skipped)}")
328348
for name, entry in skipped.items():
329349
print(f" - {name}: {entry['reason']}")
330-
print(f"Batch size: {args.batch_size:,}")
350+
print(f"Parquet batch size: {args.parquet_batch_size:,}")
351+
print(f"Blosc2 batch size: {args.blosc2_batch_size:,}")
331352
print(f"Codec / level: {args.codec} / {args.clevel}")
332353
print()
333354

@@ -337,7 +358,7 @@ def progress_batches(pa, pf, args, selected_cols, struct_wrap_cols):
337358
t0 = time.perf_counter()
338359
total = pf.metadata.num_rows
339360
for batch_n, raw_batch in enumerate(
340-
pf.iter_batches(batch_size=args.batch_size, columns=selected_cols), start=1
361+
pf.iter_batches(batch_size=args.parquet_batch_size, columns=selected_cols), start=1
341362
):
342363
report_batch_mem = args.mem_report and batch_n % args.mem_every == 0
343364
if report_batch_mem:
@@ -363,8 +384,10 @@ def progress_batches(pa, pf, args, selected_cols, struct_wrap_cols):
363384

364385

365386
def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
366-
if args.batch_size <= 0:
367-
raise ValueError("--batch-size must be positive")
387+
if args.parquet_batch_size <= 0:
388+
raise ValueError("--parquet-batch-size must be positive")
389+
if args.blosc2_batch_size <= 0:
390+
raise ValueError("--blosc2-batch-size must be positive")
368391
if args.mem_every <= 0:
369392
raise ValueError("--mem-every must be positive")
370393
if args.batch_report_every <= 0:
@@ -412,6 +435,7 @@ def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
412435
capacity_hint=pf.metadata.num_rows,
413436
string_max_length=None,
414437
auto_null_sentinels=True,
438+
blosc2_batch_size=args.blosc2_batch_size,
415439
)
416440
maybe_memory_report(args, "after CTable import", pa)
417441
store_original_arrow_metadata(ct, parquet_schema, import_schema, conversions)
@@ -451,7 +475,7 @@ def unwrap_singleton_list(pa, arr, arrow_type):
451475
def export_ctable_to_parquet(input_path: Path, output_path: Path, *, batch_size: int, overwrite: bool):
452476
pa, pq = require_pyarrow()
453477
if batch_size <= 0:
454-
raise ValueError("--batch-size must be positive")
478+
raise ValueError("--parquet-batch-size must be positive")
455479
prepare_output(output_path, overwrite)
456480
ct = blosc2.CTable.open(str(input_path))
457481
original_schema = original_schema_from_ctable(pa, ct)
@@ -549,13 +573,12 @@ def assess_parquet_difference(original_path: Path, roundtrip_path: Path, exporte
549573
print(f" Roundtrip size: {roundtrip_path.stat().st_size / 1e6:.1f} MB")
550574

551575

552-
def main(argv: list[str] | None = None) -> int:
553-
args = build_parser().parse_args(argv)
576+
def _run_command(args) -> int:
554577
if args.export:
555578
input_path = args.input_path
556579
output_path = args.output_path or _default_export_output(input_path)
557580
export_ctable_to_parquet(
558-
input_path, output_path, batch_size=args.batch_size, overwrite=args.overwrite
581+
input_path, output_path, batch_size=args.parquet_batch_size, overwrite=args.overwrite
559582
)
560583
return 0
561584
if args.roundtrip:
@@ -564,7 +587,7 @@ def main(argv: list[str] | None = None) -> int:
564587
roundtrip_path = _default_roundtrip_output(input_path)
565588
selected = import_parquet_to_ctable(args, input_path, b2_path)
566589
exported = export_ctable_to_parquet(
567-
b2_path, roundtrip_path, batch_size=args.batch_size, overwrite=True
590+
b2_path, roundtrip_path, batch_size=args.parquet_batch_size, overwrite=True
568591
)
569592
assess_parquet_difference(input_path, roundtrip_path, exported or selected)
570593
return 0
@@ -574,5 +597,41 @@ def main(argv: list[str] | None = None) -> int:
574597
return 0
575598

576599

600+
def _run_profiled(args) -> int:
601+
profiler = cProfile.Profile()
602+
profiler.enable()
603+
try:
604+
return _run_command(args)
605+
finally:
606+
profiler.disable()
607+
stream = io.StringIO()
608+
stats = pstats.Stats(profiler, stream=stream).sort_stats("cumulative")
609+
stats.print_stats(50)
610+
print("\n[cProfile] Top cumulative-time functions\n")
611+
print(stream.getvalue().rstrip())
612+
613+
614+
def _option_present(argv: list[str], option: str) -> bool:
615+
return any(arg == option or arg.startswith(option + "=") for arg in argv)
616+
617+
618+
def main(argv: list[str] | None = None) -> int:
619+
argv = sys.argv[1:] if argv is None else list(argv)
620+
args = build_parser().parse_args(argv)
621+
622+
parquet_specified = _option_present(argv, "--parquet-batch-size") or _option_present(
623+
argv, "--batch-size"
624+
)
625+
blosc2_specified = _option_present(argv, "--blosc2-batch-size")
626+
if parquet_specified and not blosc2_specified:
627+
args.blosc2_batch_size = args.parquet_batch_size
628+
elif blosc2_specified and not parquet_specified:
629+
args.parquet_batch_size = args.blosc2_batch_size
630+
631+
if args.profile:
632+
return _run_profiled(args)
633+
return _run_command(args)
634+
635+
577636
if __name__ == "__main__":
578637
raise SystemExit(main())

src/blosc2/ctable.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1549,6 +1549,7 @@ def _fmt_bytes(n: int) -> str:
15491549

15501550

15511551
_EXPECTED_SIZE_DEFAULT = 1_048_576
1552+
_BATCH_SIZE_DEFAULT = 2048
15521553

15531554
# ---------------------------------------------------------------------------
15541555
# Computed-column definition (virtual columns backed by a LazyExpr)
@@ -2804,7 +2805,7 @@ def iter_arrow_batches(
28042805
self,
28052806
*,
28062807
columns: list[str] | None = None,
2807-
batch_size: int = 65_536,
2808+
batch_size: int = _BATCH_SIZE_DEFAULT,
28082809
include_computed: bool = True,
28092810
):
28102811
"""Yield live rows as bounded-size :class:`pyarrow.RecordBatch` objects."""
@@ -3123,7 +3124,11 @@ def _write_arrow_batch(cls, batch, columns, new_cols, new_valid, pos: int) -> in
31233124
return pos
31243125
for col in columns:
31253126
arrow_col = batch.column(batch.schema.get_field_index(col.name))
3126-
if cls._is_list_column(col) or cls._is_varlen_scalar_column(col):
3127+
if cls._is_list_column(col):
3128+
# Trusted Arrow-import fast path: schema has already been inferred,
3129+
# so avoid Python-level per-item coercion/validation here.
3130+
new_cols[col.name].extend(arrow_col.to_pylist(), validate=False)
3131+
elif cls._is_varlen_scalar_column(col):
31273132
new_cols[col.name].extend(arrow_col.to_pylist())
31283133
else:
31293134
new_cols[col.name][pos : pos + m] = cls._arrow_column_to_numpy(arrow_col, col)
@@ -3181,7 +3186,7 @@ def from_arrow(
31813186
capacity_hint: int | None = None,
31823187
string_max_length: int | None = None,
31833188
auto_null_sentinels: bool = True,
3184-
list_batch_rows: int | None = 2048,
3189+
blosc2_batch_size: int | None = _BATCH_SIZE_DEFAULT,
31853190
) -> CTable:
31863191
"""Build a :class:`CTable` from an Arrow schema and iterable of record batches.
31873192
@@ -3196,13 +3201,14 @@ def from_arrow(
31963201
:func:`~blosc2.string` / :func:`~blosc2.bytes` columns whose dtype is
31973202
sized to *string_max_length* characters/bytes.
31983203
3199-
``list_batch_rows`` controls how many rows are buffered before
3200-
list-valued columns are flushed to their backend. Set it to ``None``
3201-
to keep list columns pending until the final flush.
3204+
``blosc2_batch_size`` controls how many rows are buffered before
3205+
BatchArray-backed imported columns (list columns and varlen scalar
3206+
columns) are flushed to their backend. Set it to ``None`` to keep
3207+
those columns pending until the final flush.
32023208
"""
32033209
pa = cls._require_pyarrow("from_arrow()")
3204-
if list_batch_rows is not None and list_batch_rows <= 0:
3205-
raise ValueError("list_batch_rows must be a positive integer or None")
3210+
if blosc2_batch_size is not None and blosc2_batch_size <= 0:
3211+
raise ValueError("blosc2_batch_size must be a positive integer or None")
32063212
batches = iter(batches)
32073213
first_batch = None
32083214
table_for_inference = None
@@ -3217,10 +3223,12 @@ def from_arrow(
32173223
string_max_length,
32183224
auto_null_sentinels=auto_null_sentinels,
32193225
)
3220-
if list_batch_rows is not None:
3226+
if blosc2_batch_size is not None:
32213227
for col in columns:
3222-
if cls._is_list_column(col) and getattr(col.spec, "storage", None) == "batch":
3223-
col.spec.batch_rows = list_batch_rows
3228+
if (
3229+
cls._is_list_column(col) and getattr(col.spec, "storage", None) == "batch"
3230+
) or cls._is_varlen_scalar_column(col):
3231+
col.spec.batch_rows = blosc2_batch_size
32243232
compiled = CompiledSchema(
32253233
row_cls=None,
32263234
columns=columns,
@@ -3253,7 +3261,7 @@ def to_parquet(
32533261
path,
32543262
*,
32553263
columns: list[str] | None = None,
3256-
batch_size: int = 65_536,
3264+
batch_size: int = _BATCH_SIZE_DEFAULT,
32573265
compression: str | None = "zstd",
32583266
row_group_size: int | None = None,
32593267
include_computed: bool = True,
@@ -3277,14 +3285,14 @@ def from_parquet(
32773285
path,
32783286
*,
32793287
columns: list[str] | None = None,
3280-
batch_size: int = 65_536,
3288+
batch_size: int = _BATCH_SIZE_DEFAULT,
32813289
urlpath: str | None = None,
32823290
mode: str = "w",
32833291
cparams=None,
32843292
dparams=None,
32853293
validate: bool = False,
32863294
auto_null_sentinels: bool = True,
3287-
list_batch_rows: int | None = 2048,
3295+
blosc2_batch_size: int | None = _BATCH_SIZE_DEFAULT,
32883296
**kwargs,
32893297
) -> CTable:
32903298
"""Read a Parquet file into a :class:`CTable` batch-wise using pyarrow."""
@@ -3311,7 +3319,7 @@ def from_parquet(
33113319
capacity_hint=pf.metadata.num_rows if pf.metadata is not None else None,
33123320
string_max_length=string_max_length,
33133321
auto_null_sentinels=auto_null_sentinels,
3314-
list_batch_rows=list_batch_rows,
3322+
blosc2_batch_size=blosc2_batch_size,
33153323
)
33163324

33173325
# ------------------------------------------------------------------

src/blosc2/list_array.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,9 @@ def extend(self, values: Iterable[Any], *, validate: bool = True) -> None:
354354
if validate:
355355
cells = [coerce_list_cell(self.spec, v) for v in values]
356356
else:
357-
cells = [v if v is not None else [] for v in values]
357+
# Trusted fast path used by Arrow/Parquet import and internal row reordering.
358+
# Preserve nullable list cells as native None and skip all per-item coercion.
359+
cells = list(values)
358360
if self.spec.storage == "vl":
359361
self._backend.extend(iter(cells))
360362
self._persisted_row_count = len(self._backend)
@@ -662,7 +664,7 @@ def from_arrow(
662664
items_per_block=items_per_block,
663665
**kwargs,
664666
)
665-
arr.extend(arrow_array.to_pylist())
667+
arr.extend(arrow_array.to_pylist(), validate=False)
666668
return arr
667669

668670
def __enter__(self) -> ListArray:

tests/ctable/test_arrow_interop.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,32 @@ def test_from_arrow_string_fixed_width_with_max_length():
230230
assert t["name"][:].tolist() == ["hi", "hello world", "!"]
231231

232232

233+
def test_from_arrow_list_struct_nullable_values_roundtrip():
234+
nutrient_type = pa.struct(
235+
[
236+
pa.field("name", pa.string()),
237+
pa.field("value", pa.float64()),
238+
]
239+
)
240+
at = pa.table(
241+
{
242+
"id": pa.array([1, 2, 3], type=pa.int64()),
243+
"nutriments": pa.array(
244+
[
245+
[{"name": "fat", "value": 1.5}, {"name": "salt", "value": 0.2}],
246+
None,
247+
[{"name": "energy", "value": 42.0}],
248+
],
249+
type=pa.list_(nutrient_type),
250+
),
251+
}
252+
)
253+
t = CTable.from_arrow(at.schema, at.to_batches())
254+
assert t[0].nutriments == [{"name": "fat", "value": 1.5}, {"name": "salt", "value": 0.2}]
255+
assert t[1].nutriments is None
256+
assert t[2].nutriments == [{"name": "energy", "value": 42.0}]
257+
258+
233259
def test_from_arrow_unsupported_type_raises():
234260
at = pa.table({"ts": pa.array([1, 2, 3], type=pa.timestamp("s"))})
235261
with pytest.raises(TypeError, match="No blosc2 spec"):

tests/ctable/test_parquet_interop.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -342,25 +342,25 @@ def test_interop_write_with_pyarrow(self, tmp_path):
342342
assert len(t) == 3
343343
assert t.col_names == ["x", "y"]
344344

345-
def test_from_arrow_list_batch_rows_default(self):
345+
def test_from_arrow_blosc2_batch_size_default(self):
346346
at = pa.table({"vals": pa.array([[1], [2, 3]], type=pa.list_(pa.int64()))})
347347
t = CTable.from_arrow(at.schema, at.to_batches())
348348
assert t._schema.columns_by_name["vals"].spec.batch_rows == 2048
349349
assert t["vals"][0] == [1]
350350
assert t["vals"][1] == [2, 3]
351351

352-
def test_from_arrow_list_batch_rows_override_and_none(self):
352+
def test_from_arrow_blosc2_batch_size_override_and_none(self):
353353
at = pa.table({"vals": pa.array([[1], [2], [3]], type=pa.list_(pa.int64()))})
354-
t = CTable.from_arrow(at.schema, at.to_batches(max_chunksize=1), list_batch_rows=2)
354+
t = CTable.from_arrow(at.schema, at.to_batches(max_chunksize=1), blosc2_batch_size=2)
355355
assert t._schema.columns_by_name["vals"].spec.batch_rows == 2
356356

357-
t2 = CTable.from_arrow(at.schema, at.to_batches(max_chunksize=1), list_batch_rows=None)
357+
t2 = CTable.from_arrow(at.schema, at.to_batches(max_chunksize=1), blosc2_batch_size=None)
358358
assert t2._schema.columns_by_name["vals"].spec.batch_rows is None
359359

360-
def test_from_arrow_invalid_list_batch_rows_raises(self):
360+
def test_from_arrow_invalid_blosc2_batch_size_raises(self):
361361
at = pa.table({"vals": pa.array([[1]], type=pa.list_(pa.int64()))})
362-
with pytest.raises(ValueError, match="list_batch_rows"):
363-
CTable.from_arrow(at.schema, at.to_batches(), list_batch_rows=0)
362+
with pytest.raises(ValueError, match="blosc2_batch_size"):
363+
CTable.from_arrow(at.schema, at.to_batches(), blosc2_batch_size=0)
364364

365365
def test_vlstring_arrow_roundtrip_no_singleton_list(self):
366366
"""Scalar string columns import as vlstring (not list<string>) without singleton wrapping."""

tests/test_list_array.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,3 +82,9 @@ def test_listarray_arrow_roundtrip():
8282
arr = blosc2.ListArray.from_arrow(values, item_spec=blosc2.string(), nullable=True)
8383
assert arr[:] == [["a"], None, ["b", "c"]]
8484
assert arr.to_arrow().to_pylist() == [["a"], None, ["b", "c"]]
85+
86+
87+
def test_listarray_extend_validate_false_preserves_none():
88+
arr = blosc2.ListArray(item_spec=blosc2.int32(), nullable=True, storage="batch", batch_rows=2)
89+
arr.extend([[1], None, [2, 3]], validate=False)
90+
assert arr[:] == [[1], None, [2, 3]]

0 commit comments

Comments
 (0)