Skip to content

Commit 2606897

Browse files
committed
New parquet-to-blosc2 cli utility
1 parent 6c6afde commit 2606897

3 files changed

Lines changed: 72 additions & 76 deletions

File tree

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ blosc2 = "blosc2"
4949
homepage = "https://github.com/Blosc/python-blosc2"
5050
documentation = "https://www.blosc.org/python-blosc2/python-blosc2.html"
5151

52+
[project.optional-dependencies]
53+
parquet = ["pyarrow"]
54+
55+
[project.scripts]
56+
parquet-to-blosc2 = "blosc2.cli.parquet_to_blosc2:main"
57+
5258
[dependency-groups]
5359
dev = [
5460
"dask",

src/blosc2/cli/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Command-line utilities for blosc2."""
Lines changed: 65 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,29 @@
1-
#!/usr/bin/env python3
21
#######################################################################
32
# Copyright (c) 2019-present, Blosc Development Team <blosc@blosc.org>
43
# All rights reserved.
54
#
65
# SPDX-License-Identifier: BSD-3-Clause
76
#######################################################################
87

9-
"""Import/export Parquet datasets through a CTable store.
8+
"""Import/export Parquet datasets through a Blosc2 CTable store.
109
11-
Default mode imports parquet -> .b2z/.b2d using CTable.from_arrow().
12-
The output extension selects the storage layout: .b2z is compact/zip-backed,
13-
.b2d is sparse directory-backed. Additional modes:
10+
The installed ``parquet-to-blosc2`` utility supports three modes:
1411
15-
* --export: export an existing .b2z/.b2d -> parquet.
16-
* --roundtrip: import parquet -> .b2z/.b2d -> parquet and assess differences.
12+
* default import: parquet -> ``.b2z`` / ``.b2d``
13+
* ``--export``: existing ``.b2z`` / ``.b2d`` -> parquet
14+
* ``--roundtrip``: parquet -> ``.b2z`` / ``.b2d`` -> parquet and compare
1715
18-
Scalar string columns are stored as vlstring (variable-length, no length
19-
limit). Scalar binary columns are stored as vlbytes. Nullable string/binary
20-
columns are represented with native None — no sentinel value is needed.
16+
The output extension selects the storage layout: ``.b2z`` is compact/zip-backed,
17+
while ``.b2d`` is sparse directory-backed.
2118
22-
Struct-valued columns are wrapped as list<struct> (one-element lists) so they
23-
round-trip through the list column machinery. True list columns pass through
24-
unchanged. Unsupported types (nested lists, timestamps, etc.) are skipped.
19+
Scalar string columns are stored as ``vlstring`` (variable-length, no length
20+
limit). Scalar binary columns are stored as ``vlbytes``. Nullable string/binary
21+
columns are represented with native ``None`` — no sentinel value is needed.
22+
23+
Struct-valued columns are wrapped as ``list<struct>`` (one-element lists) so
24+
that they round-trip through the list-column machinery. True list columns pass
25+
through unchanged. Unsupported types (nested lists, timestamps, etc.) are
26+
skipped.
2527
"""
2628

2729
from __future__ import annotations
@@ -35,13 +37,11 @@
3537
import sys
3638
import time
3739
from pathlib import Path
40+
from typing import Any
3841

3942
import blosc2
4043
from blosc2.schema_compiler import schema_to_dict
4144

42-
DEFAULT_INPUT = Path(__file__).with_name("off-1pct.parquet")
43-
DEFAULT_B2Z = Path(__file__).with_name("off-1pct-gpt.b2z")
44-
DEFAULT_ROUNDTRIP_PARQUET = Path(__file__).with_name("off-1pct-gpt-roundtrip.parquet")
4545
DEFAULT_BATCH_SIZE = 2048
4646

4747

@@ -51,11 +51,23 @@ def require_pyarrow():
5151
import pyarrow.parquet as pq
5252
except ImportError as exc:
5353
raise ImportError(
54-
"parquet-to-b2z.py requires pyarrow; install it with: pip install pyarrow"
54+
"parquet-to-blosc2 requires pyarrow; install it with: pip install 'blosc2[parquet]'"
5555
) from exc
5656
return pa, pq
5757

5858

59+
def _default_import_output(input_path: Path) -> Path:
60+
return input_path.with_suffix(".b2z")
61+
62+
63+
def _default_export_output(input_path: Path) -> Path:
64+
return input_path.with_suffix(".parquet")
65+
66+
67+
def _default_roundtrip_output(input_path: Path) -> Path:
68+
return input_path.with_name(f"{input_path.stem}-roundtrip.parquet")
69+
70+
5971
def _format_bytes(n: int | None) -> str:
6072
if n is None:
6173
return "n/a"
@@ -112,14 +124,24 @@ def maybe_memory_report(args, label: str, pa=None) -> None:
112124

113125
def build_parser() -> argparse.ArgumentParser:
114126
parser = argparse.ArgumentParser(
115-
description="Import/export Parquet datasets via CTable (.b2z compact or .b2d sparse).",
127+
description="Import/export Parquet datasets via Blosc2 CTable (.b2z compact or .b2d sparse).",
116128
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
117129
)
118130
mode = parser.add_mutually_exclusive_group()
119-
mode.add_argument("--export", action="store_true", help="Export input .b2z to output parquet.")
120-
mode.add_argument("--roundtrip", action="store_true", help="Run parquet -> .b2z -> parquet and compare.")
121-
parser.add_argument("input_path", nargs="?", type=Path, default=DEFAULT_INPUT)
122-
parser.add_argument("output_path", nargs="?", type=Path, default=None)
131+
mode.add_argument("--export", action="store_true", help="Export input .b2z/.b2d to output parquet.")
132+
mode.add_argument(
133+
"--roundtrip", action="store_true", help="Run parquet -> .b2z/.b2d -> parquet and compare."
134+
)
135+
parser.add_argument(
136+
"input_path", type=Path, help="Input parquet file or Blosc2 store, depending on mode."
137+
)
138+
parser.add_argument(
139+
"output_path",
140+
nargs="?",
141+
type=Path,
142+
default=None,
143+
help="Output path. Defaults depend on the mode and input path.",
144+
)
123145
parser.add_argument("--batch-size", type=int, default=DEFAULT_BATCH_SIZE)
124146
parser.add_argument("--codec", type=str, default="ZSTD", choices=[c.name for c in blosc2.Codec])
125147
parser.add_argument("--clevel", type=int, default=5)
@@ -170,22 +192,10 @@ def _release_arrow_temporaries(pa) -> None:
170192

171193

172194
def classify_columns(pa, schema):
173-
"""Classify Parquet schema columns into importable categories.
174-
175-
Returns
176-
-------
177-
fixed_cols : dict[str, field]
178-
Scalar numeric/bool columns and list columns passed through unchanged.
179-
struct_wrap_cols : dict[str, pa.DataType]
180-
Struct columns wrapped as list<struct> for import.
181-
conversions : dict[str, dict]
182-
Metadata describing what happened to each column (for round-trip export).
183-
nullable_scalars : list[str]
184-
Non-string scalar columns that are nullable (need sentinels).
185-
"""
195+
"""Classify Parquet schema columns into importable categories."""
186196
fixed_cols: dict[str, object] = {}
187197
struct_wrap_cols: dict[str, object] = {}
188-
conversions: dict[str, dict] = {}
198+
conversions: dict[str, dict[str, Any]] = {}
189199
nullable_scalars: list[str] = []
190200

191201
for field in schema:
@@ -214,33 +224,20 @@ def classify_columns(pa, schema):
214224
conversions[field.name] = {"conversion": "nullable_scalar_sentinel"}
215225
continue
216226
if pa.types.is_string(t) or pa.types.is_large_string(t):
217-
# Scalar strings → vlstring (variable-length, handles any length, nullable natively)
218227
fixed_cols[field.name] = field
219-
if field.nullable:
220-
conversions[field.name] = {"conversion": "vlstring_nullable"}
221-
else:
222-
conversions[field.name] = {"conversion": "vlstring"}
228+
conversions[field.name] = {"conversion": "vlstring_nullable" if field.nullable else "vlstring"}
223229
continue
224230
if pa.types.is_binary(t) or pa.types.is_large_binary(t):
225-
# Scalar bytes → vlbytes (variable-length, handles any length, nullable natively)
226231
fixed_cols[field.name] = field
227-
if field.nullable:
228-
conversions[field.name] = {"conversion": "vlbytes_nullable"}
229-
else:
230-
conversions[field.name] = {"conversion": "vlbytes"}
232+
conversions[field.name] = {"conversion": "vlbytes_nullable" if field.nullable else "vlbytes"}
231233
continue
232234
conversions[field.name] = {"conversion": "skipped", "reason": f"unsupported: {t}"}
233235

234236
return fixed_cols, struct_wrap_cols, conversions, nullable_scalars
235237

236238

237239
def build_import_schema(pa, original_schema, fixed_cols: dict, struct_wrap_cols: dict):
238-
"""Build the Arrow schema passed to CTable.from_arrow().
239-
240-
Struct columns become list<struct>; all other importable columns keep their
241-
original Arrow type unchanged (vlstring/vlbytes conversion happens inside
242-
CTable.from_arrow when string_max_length=None).
243-
"""
240+
"""Build the Arrow schema passed to CTable.from_arrow()."""
244241
fields = []
245242
for field in original_schema:
246243
if field.name in struct_wrap_cols:
@@ -372,7 +369,7 @@ def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
372369
raise ValueError("--mem-every must be positive")
373370
if args.batch_report_every <= 0:
374371
raise ValueError("--batch-report-every must be positive")
375-
if args.output_path is not None and output_path.suffix not in {".b2z", ".b2d"}:
372+
if output_path.suffix not in {".b2z", ".b2d"}:
376373
raise ValueError("output_path must use the .b2z (compact) or .b2d (sparse) extension")
377374
if not input_path.exists():
378375
raise FileNotFoundError(f"Input file not found: {input_path}")
@@ -406,7 +403,6 @@ def import_parquet_to_ctable(args, input_path: Path, output_path: Path):
406403
t0 = time.perf_counter()
407404
maybe_memory_report(args, "before CTable import", pa)
408405

409-
# string_max_length=None → scalar string/binary columns become vlstring/vlbytes automatically
410406
ct = blosc2.CTable.from_arrow(
411407
import_schema,
412408
progress_batches(pa, pf, args, selected_cols, struct_wrap_cols),
@@ -471,10 +467,8 @@ def export_ctable_to_parquet(input_path: Path, output_path: Path, *, batch_size:
471467
else ct._arrow_schema_for_columns(export_names)
472468
)
473469

474-
# Conversions that stored data differently and need unwrapping on export.
475-
_SINGLETON_LIST_CONVERSIONS = {
470+
singleton_list_conversions = {
476471
"struct_wrapped_as_singleton_list",
477-
# Legacy reasons kept for reading older CTable stores:
478472
"nullable_scalar_wrapped_as_singleton_list",
479473
"long_nullable_scalar_wrapped_as_singleton_list",
480474
"scalar_string_promoted_after_overflow",
@@ -489,15 +483,9 @@ def export_ctable_to_parquet(input_path: Path, output_path: Path, *, batch_size:
489483
meta = fields_meta.get(name, {})
490484
field = export_schema.field(name)
491485
conversion = meta.get("conversion", "")
492-
if conversion in _SINGLETON_LIST_CONVERSIONS:
486+
if conversion in singleton_list_conversions:
493487
arr = unwrap_singleton_list(pa, arr, field.type)
494-
elif conversion in {"vlstring", "vlstring_nullable"}:
495-
# vlstring columns export as pa.string() — already correct from iter_arrow_batches.
496-
# Cast to original Arrow type in case it was large_string in the source.
497-
if str(arr.type) != str(field.type):
498-
arr = arr.cast(field.type)
499-
elif conversion in {"vlbytes", "vlbytes_nullable"}:
500-
# vlbytes columns export as pa.large_binary() — cast to original if needed.
488+
elif conversion in {"vlstring", "vlstring_nullable", "vlbytes", "vlbytes_nullable"}:
501489
if str(arr.type) != str(field.type):
502490
arr = arr.cast(field.type)
503491
elif str(arr.type) != str(field.type):
@@ -561,29 +549,30 @@ def assess_parquet_difference(original_path: Path, roundtrip_path: Path, exporte
561549
print(f" Roundtrip size: {roundtrip_path.stat().st_size / 1e6:.1f} MB")
562550

563551

564-
def main() -> None:
565-
args = build_parser().parse_args()
552+
def main(argv: list[str] | None = None) -> int:
553+
args = build_parser().parse_args(argv)
566554
if args.export:
567555
input_path = args.input_path
568-
output_path = args.output_path or input_path.with_suffix(".parquet")
556+
output_path = args.output_path or _default_export_output(input_path)
569557
export_ctable_to_parquet(
570558
input_path, output_path, batch_size=args.batch_size, overwrite=args.overwrite
571559
)
572-
return
560+
return 0
573561
if args.roundtrip:
574562
input_path = args.input_path
575-
b2z_path = args.output_path or DEFAULT_B2Z
576-
roundtrip_path = DEFAULT_ROUNDTRIP_PARQUET
577-
selected = import_parquet_to_ctable(args, input_path, b2z_path)
563+
b2_path = args.output_path or _default_import_output(input_path)
564+
roundtrip_path = _default_roundtrip_output(input_path)
565+
selected = import_parquet_to_ctable(args, input_path, b2_path)
578566
exported = export_ctable_to_parquet(
579-
b2z_path, roundtrip_path, batch_size=args.batch_size, overwrite=True
567+
b2_path, roundtrip_path, batch_size=args.batch_size, overwrite=True
580568
)
581569
assess_parquet_difference(input_path, roundtrip_path, exported or selected)
582-
return
570+
return 0
583571

584-
output_path = args.output_path or DEFAULT_B2Z
572+
output_path = args.output_path or _default_import_output(args.input_path)
585573
import_parquet_to_ctable(args, args.input_path, output_path)
574+
return 0
586575

587576

588577
if __name__ == "__main__":
589-
main()
578+
raise SystemExit(main())

0 commit comments

Comments
 (0)