Skip to content

Commit e0b1b6b

Browse files
committed
New blosc2.struct() for declaring columns (useful for Arrow/Parquet struct<...> type)
1 parent a71f1dc commit e0b1b6b

7 files changed

Lines changed: 153 additions & 45 deletions

File tree

doc/reference/ctable.rst

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,14 +549,35 @@ Text & binary
549549
bytes
550550
vlstring
551551
vlbytes
552+
struct
552553
list
553554

554555
.. autoclass:: string
555556
.. autoclass:: bytes
556557
.. autofunction:: vlstring
557558
.. autofunction:: vlbytes
559+
.. autofunction:: struct
558560
.. autofunction:: list
559561

562+
Struct columns
563+
--------------
564+
565+
Struct columns are declared with :func:`blosc2.struct` and store one dictionary
566+
(or ``None`` when nullable) per row in batched variable-length storage. They are
567+
also used when importing top-level Arrow/Parquet ``struct<...>`` columns::
568+
569+
from dataclasses import dataclass
570+
import blosc2 as b2
571+
572+
@dataclass
573+
class Product:
574+
properties: dict = b2.field(
575+
b2.struct({"code": b2.int32(), "label": b2.vlstring()}, nullable=True)
576+
)
577+
578+
table.append([{"code": 1, "label": "fresh"}])
579+
table.append([None])
580+
560581
List columns
561582
------------
562583

src/blosc2/ctable.py

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,7 @@ def is_list(self) -> bool:
535535
def is_varlen_scalar(self) -> bool:
536536
"""True if this column holds variable-length scalar strings or bytes."""
537537
col = self._table._schema.columns_by_name.get(self._col_name)
538-
return col is not None and isinstance(col.spec, (VLStringSpec, VLBytesSpec))
538+
return col is not None and isinstance(col.spec, (VLStringSpec, VLBytesSpec, StructSpec))
539539

540540
@property
541541
def _valid_rows(self):
@@ -1581,7 +1581,7 @@ def _is_list_column(col: CompiledColumn) -> bool:
15811581

15821582
@staticmethod
15831583
def _is_varlen_scalar_column(col: CompiledColumn) -> bool:
1584-
return isinstance(col.spec, (VLStringSpec, VLBytesSpec))
1584+
return isinstance(col.spec, (VLStringSpec, VLBytesSpec, StructSpec))
15851585

15861586
@staticmethod
15871587
def _is_list_spec(spec: SchemaSpec) -> bool:
@@ -1644,7 +1644,7 @@ def _resolve_nullable_specs(
16441644
for col in schema.columns:
16451645
spec = col.spec
16461646
if (
1647-
isinstance(spec, (ListSpec, VLStringSpec, VLBytesSpec))
1647+
isinstance(spec, (ListSpec, VLStringSpec, VLBytesSpec, StructSpec))
16481648
or getattr(spec, "null_value", None) is not None
16491649
):
16501650
continue
@@ -2913,14 +2913,21 @@ def _arrow_type_to_spec( # noqa: C901
29132913
for field in pa_type:
29142914
child_col = None
29152915
if arrow_col is not None:
2916-
child_col = arrow_col.field(field.name)
2916+
combined = (
2917+
arrow_col.combine_chunks() if hasattr(arrow_col, "combine_chunks") else arrow_col
2918+
)
2919+
child_col = combined.field(field.name)
29172920
child_string_max_length = string_max_length
29182921
if field.type in (pa.string(), pa.large_string(), pa.utf8(), pa.large_utf8()):
29192922
child_string_max_length = max(string_max_length or 1, 1_000_000)
29202923
fields[field.name] = CTable._arrow_type_to_spec(
2921-
pa, field.type, child_col, string_max_length=child_string_max_length
2924+
pa,
2925+
field.type,
2926+
child_col,
2927+
string_max_length=child_string_max_length,
2928+
nullable=field.nullable,
29222929
)
2923-
return b2s.struct(fields)
2930+
return b2s.struct(fields, nullable=nullable)
29242931

29252932
if pa_type in (pa.string(), pa.large_string(), pa.utf8(), pa.large_utf8()):
29262933
if string_max_length is None:
@@ -2938,7 +2945,7 @@ def _arrow_type_to_spec( # noqa: C901
29382945

29392946
raise TypeError(
29402947
f"No blosc2 spec for Arrow type {pa_type!r}. Supported: int8/16/32/64, "
2941-
"uint8/16/32/64, float32/64, bool, string, binary, and list."
2948+
"uint8/16/32/64, float32/64, bool, string, binary, list, and struct."
29422949
)
29432950

29442951
@staticmethod
@@ -3197,8 +3204,11 @@ def from_arrow(
31973204
When *string_max_length* is ``None`` (the default), scalar Arrow
31983205
``string`` / ``large_string`` columns are imported as
31993206
:func:`~blosc2.vlstring` columns and ``binary`` / ``large_binary``
3200-
columns are imported as :func:`~blosc2.vlbytes` columns. Null values
3201-
are represented as native ``None`` with no sentinel needed.
3207+
columns are imported as :func:`~blosc2.vlbytes` columns. Arrow
3208+
``struct`` columns are imported as :func:`~blosc2.struct` columns backed
3209+
by batched variable-length storage. Null values for these variable-
3210+
length scalar columns are represented as native ``None`` with no
3211+
sentinel needed.
32023212
32033213
When *string_max_length* is set to a positive integer, scalar string
32043214
and binary columns are imported as fixed-width
@@ -3208,9 +3218,10 @@ def from_arrow(
32083218
:func:`~blosc2.vlstring` / :func:`~blosc2.vlbytes` columns.
32093219
32103220
``blosc2_batch_size`` controls how many rows are buffered before
3211-
BatchArray-backed imported columns (list columns and varlen scalar
3212-
columns) are flushed to their backend. Set it to ``None`` to keep
3213-
those columns pending until the final flush.
3221+
BatchArray-backed imported columns (list columns and variable-length
3222+
scalar columns such as ``vlstring``, ``vlbytes``, and ``struct``) are
3223+
flushed to their backend. Set it to ``None`` to keep those columns
3224+
pending until the final flush.
32143225
"""
32153226
pa = cls._require_pyarrow("from_arrow()")
32163227
if blosc2_batch_size is not None and blosc2_batch_size <= 0:
@@ -3314,7 +3325,9 @@ def from_parquet(
33143325
33153326
This method delegates the actual table construction to
33163327
:meth:`CTable.from_arrow`, so Arrow schema handling, nullable-column support,
3317-
and Blosc2 write tuning follow the same rules as that method.
3328+
and Blosc2 write tuning follow the same rules as that method. Top-level
3329+
Arrow ``struct<...>`` columns are imported as :func:`~blosc2.struct`
3330+
columns backed by batched variable-length storage.
33183331
33193332
Parameters
33203333
----------
@@ -3900,7 +3913,7 @@ def _fetch_col_at_positions(self, name: str, positions: np.ndarray):
39003913
)
39013914
col = self._cols[name]
39023915
spec = self._schema.columns_by_name[name].spec
3903-
if self._is_list_spec(spec) or isinstance(spec, (VLStringSpec, VLBytesSpec)):
3916+
if self._is_list_spec(spec) or isinstance(spec, (VLStringSpec, VLBytesSpec, StructSpec)):
39043917
return col[positions]
39053918
return col[positions]
39063919

@@ -4473,7 +4486,7 @@ def _normalise_sort_keys(
44734486
dtype = self._col_dtype(name)
44744487
if dtype is None:
44754488
cc = self._schema.columns_by_name.get(name)
4476-
if cc is not None and isinstance(cc.spec, (VLStringSpec, VLBytesSpec)):
4489+
if cc is not None and isinstance(cc.spec, (VLStringSpec, VLBytesSpec, StructSpec)):
44774490
raise TypeError(
44784491
f"Column {name!r} is a varlen scalar column and does not support sort ordering."
44794492
)
@@ -5364,10 +5377,10 @@ def create_index( # noqa: C901
53645377
col_arr = self._cols[col_name]
53655378
if isinstance(self._schema.columns_by_name[col_name].spec, ListSpec):
53665379
raise ValueError(f"Cannot create an index on list column {col_name!r} in V1.")
5367-
if isinstance(self._schema.columns_by_name[col_name].spec, (VLStringSpec, VLBytesSpec)):
5380+
if isinstance(self._schema.columns_by_name[col_name].spec, (VLStringSpec, VLBytesSpec, StructSpec)):
53685381
raise NotImplementedError(
5369-
f"Cannot create an index on varlen scalar column {col_name!r}: "
5370-
"indexing for vlstring/vlbytes columns is not supported yet."
5382+
f"Cannot create an index on variable-length scalar column {col_name!r}: "
5383+
"indexing for vlstring/vlbytes/struct columns is not supported yet."
53715384
)
53725385
is_persistent = self._storage.index_anchor_path(col_name) is not None
53735386

@@ -5756,6 +5769,8 @@ def _dtype_info_label(dtype: np.dtype | None, spec: SchemaSpec | None = None) ->
57565769
return "vlstring"
57575770
if isinstance(spec, VLBytesSpec):
57585771
return "vlbytes"
5772+
if isinstance(spec, StructSpec):
5773+
return spec.display_label()
57595774
if isinstance(spec, ListSpec):
57605775
return spec.display_label()
57615776
if dtype is None:
@@ -6010,7 +6025,7 @@ def _guard_varlen_scalar_expression(self, expr: str) -> None:
60106025
rf"(?<!\w){re.escape(col.name)}(?!\w)", expr
60116026
):
60126027
raise NotImplementedError(
6013-
f"Column {col.name!r} is a vlstring/vlbytes column; "
6028+
f"Column {col.name!r} is a variable-length scalar column (vlstring/vlbytes/struct); "
60146029
"lazy expressions are not supported yet."
60156030
)
60166031

src/blosc2/scalar_array.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
"""Internal variable-length scalar column adapter over BatchArray.
1010
1111
This module is *not* part of the public API. It provides row-wise scalar
12-
semantics (one str or bytes value per row) backed by batched msgpack storage
13-
via :class:`blosc2.BatchArray`.
12+
semantics (one str, bytes, or struct dict value per row) backed by batched
13+
msgpack storage via :class:`blosc2.BatchArray`.
1414
1515
Physical layout: each chunk in the backing BatchArray stores a list of
1616
scalar values, e.g. ``["foo", None, "bar", "baz"]``. Nulls are represented
@@ -19,6 +19,7 @@
1919

2020
from __future__ import annotations
2121

22+
import os
2223
from bisect import bisect_right
2324
from collections import defaultdict
2425
from typing import TYPE_CHECKING, Any
@@ -35,9 +36,15 @@
3536

3637
def _role_metadata_for_spec(spec) -> dict[str, Any]:
3738
"""Return the fixed metadata role tag for a CTable varlen scalar backend."""
39+
if spec.python_type is str:
40+
py_type = "str"
41+
elif spec.python_type is bytes:
42+
py_type = "bytes"
43+
else:
44+
py_type = "struct"
3845
return {
3946
"version": 1,
40-
"py_type": "str" if spec.python_type is str else "bytes",
47+
"py_type": py_type,
4148
"nullable": bool(getattr(spec, "nullable", False)),
4249
"batch_rows": getattr(spec, "batch_rows", 2048),
4350
}
@@ -61,7 +68,12 @@ def _validate_role_metadata(backend: BatchArray, spec) -> None:
6168
# still identifies the logical role, so keep reopen tolerant.
6269
return
6370
role = meta[_CTABLE_VARLEN_SCALAR_META_KEY]
64-
expected_py_type = "str" if spec.python_type is str else "bytes"
71+
if spec.python_type is str:
72+
expected_py_type = "str"
73+
elif spec.python_type is bytes:
74+
expected_py_type = "bytes"
75+
else:
76+
expected_py_type = "struct"
6577
if role.get("py_type") != expected_py_type:
6678
raise ValueError(
6779
f"Varlen scalar backend type mismatch: expected {expected_py_type!r}, "
@@ -81,6 +93,7 @@ def _make_backend(spec) -> BatchArray:
8193

8294
def _make_persistent_backend(spec, urlpath: str, mode: str, *, cparams=None, dparams=None) -> BatchArray:
8395
"""Create or open a persistent BatchArray for a varlen scalar spec."""
96+
os.makedirs(os.path.dirname(urlpath), exist_ok=True)
8497
kwargs: dict[str, Any] = {}
8598
if cparams is not None:
8699
kwargs["cparams"] = cparams
@@ -115,20 +128,23 @@ class _ScalarVarLenArray:
115128
Parameters
116129
----------
117130
spec:
118-
A :class:`~blosc2.schema.VLStringSpec` or
119-
:class:`~blosc2.schema.VLBytesSpec` describing this column.
131+
A :class:`~blosc2.schema.VLStringSpec`,
132+
:class:`~blosc2.schema.VLBytesSpec`, or
133+
:class:`~blosc2.schema.StructSpec` describing this column.
120134
backend:
121135
Pre-constructed :class:`~blosc2.BatchArray`. If ``None``, a fresh
122136
in-memory backend is created from *spec*.
123137
"""
124138

125139
def __init__(self, spec, backend: BatchArray | None = None) -> None:
126-
from blosc2.schema import VLBytesSpec, VLStringSpec
140+
from blosc2.schema import StructSpec, VLBytesSpec, VLStringSpec
127141

128-
if not isinstance(spec, (VLStringSpec, VLBytesSpec)):
129-
raise TypeError(f"_ScalarVarLenArray requires a VLStringSpec or VLBytesSpec, got {type(spec)!r}")
142+
if not isinstance(spec, (VLStringSpec, VLBytesSpec, StructSpec)):
143+
raise TypeError(
144+
f"_ScalarVarLenArray requires a VLStringSpec, VLBytesSpec, or StructSpec, got {type(spec)!r}"
145+
)
130146
self._spec = spec
131-
self._py_type: type = spec.python_type # str or bytes
147+
self._py_type: type = spec.python_type # str, bytes, or dict
132148
self._nullable: bool = getattr(spec, "nullable", False)
133149
self._batch_rows: int = int(getattr(spec, "batch_rows", 2048) or 2048)
134150

@@ -190,10 +206,13 @@ def _coerce(self, value: Any) -> Any:
190206
if isinstance(value, str):
191207
return value
192208
raise TypeError(f"Expected str for vlstring column, got {type(value).__name__!r}.")
193-
# bytes
194-
if isinstance(value, (bytes, bytearray, memoryview)):
195-
return bytes(value)
196-
raise TypeError(f"Expected bytes for vlbytes column, got {type(value).__name__!r}.")
209+
if self._py_type is bytes:
210+
if isinstance(value, (bytes, bytearray, memoryview)):
211+
return bytes(value)
212+
raise TypeError(f"Expected bytes for vlbytes column, got {type(value).__name__!r}.")
213+
from blosc2.list_array import _coerce_struct_item
214+
215+
return _coerce_struct_item(self._spec, value)
197216

198217
def _flush_full_batches(self) -> None:
199218
"""Flush as many full batches as possible from _pending."""

src/blosc2/schema.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,17 @@ def to_metadata_dict(self) -> dict[str, Any]:
363363

364364

365365
class StructSpec(SchemaSpec):
366-
"""Logical schema descriptor for dict-like structured values."""
366+
"""Logical schema descriptor for dict-like structured values.
367+
368+
Top-level CTable struct columns are stored as row-wise dictionaries in a
369+
batched variable-length backend. Struct specs can also be used as
370+
:func:`list` item specs for Arrow ``list<struct<...>>`` columns.
371+
"""
367372

368373
python_type = dict
369374
dtype = None
370375

371-
def __init__(self, fields: dict[str, SchemaSpec]):
376+
def __init__(self, fields: dict[str, SchemaSpec], *, nullable: bool = False):
372377
if not isinstance(fields, dict) or not fields:
373378
raise TypeError("StructSpec fields must be a non-empty dict")
374379
for name, spec in fields.items():
@@ -377,6 +382,7 @@ def __init__(self, fields: dict[str, SchemaSpec]):
377382
if not isinstance(spec, SchemaSpec):
378383
raise TypeError("StructSpec field values must be SchemaSpec instances")
379384
self.fields = dict(fields)
385+
self.nullable = nullable
380386

381387
def to_pydantic_kwargs(self) -> dict[str, Any]:
382388
return {}
@@ -385,6 +391,7 @@ def to_metadata_dict(self) -> dict[str, Any]:
385391
return {
386392
"kind": "struct",
387393
"fields": [{"name": name, **spec.to_metadata_dict()} for name, spec in self.fields.items()],
394+
"nullable": self.nullable,
388395
}
389396

390397
def display_label(self) -> str:
@@ -399,7 +406,7 @@ def from_metadata_dict(cls, data: dict[str, Any]) -> StructSpec:
399406
field = dict(field)
400407
name = field.pop("name")
401408
fields[name] = spec_from_metadata_dict(field)
402-
return cls(fields)
409+
return cls(fields, nullable=data.get("nullable", False))
403410

404411

405412
class ListSpec(SchemaSpec):
@@ -646,9 +653,13 @@ def vlbytes(
646653
)
647654

648655

649-
def struct(fields: dict[str, SchemaSpec]) -> StructSpec:
650-
"""Build a structured schema descriptor for nested CTable values."""
651-
return StructSpec(fields)
656+
def struct(fields: dict[str, SchemaSpec], *, nullable: bool = False) -> StructSpec:
657+
"""Build a structured schema descriptor for dict-like CTable values.
658+
659+
Top-level struct columns store one dictionary (or ``None`` when nullable)
660+
per row. Struct specs may also be nested as list item specs.
661+
"""
662+
return StructSpec(fields, nullable=nullable)
652663

653664

654665
def list(

src/blosc2/schema_compiler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def compute_display_width(spec: SchemaSpec) -> int:
9898
"""Return a reasonable terminal display width for *spec*'s column."""
9999
if isinstance(spec, (VLStringSpec, VLBytesSpec)):
100100
return 40
101-
if isinstance(spec, ListSpec):
101+
if isinstance(spec, (ListSpec, StructSpec)):
102102
return max(40, len(spec.display_label()) + 4)
103103
dtype = spec.dtype
104104
if dtype is None:
@@ -389,7 +389,7 @@ def spec_from_metadata_dict(data: dict[str, Any]) -> SchemaSpec:
389389
item_spec = spec_from_metadata_dict(data.pop("item"))
390390
return ListSpec(item_spec, **data)
391391
if kind == "struct":
392-
return StructSpec.from_metadata_dict({"fields": data.pop("fields")})
392+
return StructSpec.from_metadata_dict({"fields": data.pop("fields"), **data})
393393
spec_cls = _KIND_TO_SPEC.get(kind)
394394
if spec_cls is None:
395395
raise ValueError(f"Unknown column kind {kind!r}")

0 commit comments

Comments
 (0)