Skip to content

Ignore stale children when reconsolidating metadata #2980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/2921.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ignore stale child metadata when reconsolidating metadata.
5 changes: 4 additions & 1 deletion src/zarr/api/asynchronous.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ async def consolidate_metadata(
group = await AsyncGroup.open(store_path, zarr_format=zarr_format, use_consolidated=False)
group.store_path.store._check_writable()

members_metadata = {k: v.metadata async for k, v in group.members(max_depth=None)}
members_metadata = {
k: v.metadata
async for k, v in group.members(max_depth=None, use_consolidated_for_children=False)
}
# While consolidating, we want to be explicit about when child groups
# are empty by inserting an empty dict for consolidated_metadata.metadata
for k, v in members_metadata.items():
Expand Down
68 changes: 61 additions & 7 deletions src/zarr/core/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1296,6 +1296,8 @@ async def nmembers(
async def members(
self,
max_depth: int | None = 0,
*,
use_consolidated_for_children: bool = True,
) -> AsyncGenerator[
tuple[str, AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | AsyncGroup],
None,
Expand All @@ -1313,6 +1315,11 @@ async def members(
default, (``max_depth=0``) only immediate children are included. Set
``max_depth=None`` to include all nodes, and some positive integer
to consider children within that many levels of the root Group.
use_consolidated_for_children : bool, default True
Whether to use the consolidated metadata of child groups loaded
from the store. Note that this only affects groups loaded from the
store. If the current Group already has consolidated metadata, it
will always be used.

Returns
-------
Expand All @@ -1323,7 +1330,9 @@ async def members(
"""
if max_depth is not None and max_depth < 0:
raise ValueError(f"max_depth must be None or >= 0. Got '{max_depth}' instead")
async for item in self._members(max_depth=max_depth):
async for item in self._members(
max_depth=max_depth, use_consolidated_for_children=use_consolidated_for_children
):
yield item

def _members_consolidated(
Expand Down Expand Up @@ -1353,7 +1362,7 @@ def _members_consolidated(
yield from obj._members_consolidated(new_depth, prefix=key)

async def _members(
self, max_depth: int | None
self, max_depth: int | None, *, use_consolidated_for_children: bool = True
) -> AsyncGenerator[
tuple[str, AsyncArray[ArrayV3Metadata] | AsyncArray[ArrayV2Metadata] | AsyncGroup], None
]:
Expand Down Expand Up @@ -1383,7 +1392,11 @@ async def _members(
# enforce a concurrency limit by passing a semaphore to all the recursive functions
semaphore = asyncio.Semaphore(config.get("async.concurrency"))
async for member in _iter_members_deep(
self, max_depth=max_depth, skip_keys=skip_keys, semaphore=semaphore
self,
max_depth=max_depth,
skip_keys=skip_keys,
semaphore=semaphore,
use_consolidated_for_children=use_consolidated_for_children,
):
yield member

Expand Down Expand Up @@ -2079,10 +2092,34 @@ def nmembers(self, max_depth: int | None = 0) -> int:

return self._sync(self._async_group.nmembers(max_depth=max_depth))

def members(self, max_depth: int | None = 0) -> tuple[tuple[str, Array | Group], ...]:
def members(
self, max_depth: int | None = 0, *, use_consolidated_for_children: bool = True
) -> tuple[tuple[str, Array | Group], ...]:
"""
Return the sub-arrays and sub-groups of this group as a tuple of (name, array | group)
pairs
Returns an AsyncGenerator over the arrays and groups contained in this group.
This method requires that `store_path.store` supports directory listing.

The results are not guaranteed to be ordered.

Parameters
----------
max_depth : int, default 0
The maximum number of levels of the hierarchy to include. By
default, (``max_depth=0``) only immediate children are included. Set
``max_depth=None`` to include all nodes, and some positive integer
to consider children within that many levels of the root Group.
use_consolidated_for_children : bool, default True
Whether to use the consolidated metadata of child groups loaded
from the store. Note that this only affects groups loaded from the
store. If the current Group already has consolidated metadata, it
will always be used.

Returns
-------
path:
A string giving the path to the target, relative to the Group ``self``.
value: AsyncArray or AsyncGroup
The AsyncArray or AsyncGroup that is a child of ``self``.
"""
_members = self._sync_iter(self._async_group.members(max_depth=max_depth))

Expand Down Expand Up @@ -3317,6 +3354,7 @@ async def _iter_members_deep(
max_depth: int | None,
skip_keys: tuple[str, ...],
semaphore: asyncio.Semaphore | None = None,
use_consolidated_for_children: bool = True,
) -> AsyncGenerator[
tuple[str, AsyncArray[ArrayV3Metadata] | AsyncArray[ArrayV2Metadata] | AsyncGroup], None
]:
Expand All @@ -3334,6 +3372,11 @@ async def _iter_members_deep(
A tuple of keys to skip when iterating over the possible members of the group.
semaphore : asyncio.Semaphore | None
An optional semaphore to use for concurrency control.
use_consolidated_for_children : bool, default True
Whether to use the consolidated metadata of child groups loaded
from the store. Note that this only affects groups loaded from the
store. If the current Group already has consolidated metadata, it
will always be used.

Yields
------
Expand All @@ -3348,8 +3391,19 @@ async def _iter_members_deep(
else:
new_depth = max_depth - 1
async for name, node in _iter_members(group, skip_keys=skip_keys, semaphore=semaphore):
is_group = isinstance(node, AsyncGroup)
if (
is_group
and not use_consolidated_for_children
and node.metadata.consolidated_metadata is not None # type: ignore [union-attr]
):
node = cast("AsyncGroup", node)
# We've decided not to trust consolidated metadata at this point, because we're
# reconsolidating the metadata, for example.
node = replace(node, metadata=replace(node.metadata, consolidated_metadata=None))
yield name, node
if isinstance(node, AsyncGroup) and do_recursion:
if is_group and do_recursion:
node = cast("AsyncGroup", node)
to_recurse[name] = _iter_members_deep(
node, max_depth=new_depth, skip_keys=skip_keys, semaphore=semaphore
)
Expand Down
44 changes: 44 additions & 0 deletions tests/test_metadata/test_consolidated.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,3 +573,47 @@ async def test_use_consolidated_false(
assert len([x async for x in good.members()]) == 2
assert good.metadata.consolidated_metadata
assert sorted(good.metadata.consolidated_metadata.metadata) == ["a", "b"]

async def test_stale_child_metadata_ignored(self, memory_store: zarr.storage.MemoryStore):
# https://github.com/zarr-developers/zarr-python/issues/2921
# When consolidating metadata, we should ignore any (possibly stale) metadata
# from previous consolidations, *including at child nodes*.
root = await zarr.api.asynchronous.group(store=memory_store, zarr_format=3)
await root.create_group("foo")
await zarr.api.asynchronous.consolidate_metadata(memory_store, path="foo")
await root.create_group("foo/bar/spam")

await zarr.api.asynchronous.consolidate_metadata(memory_store)

reopened = await zarr.api.asynchronous.open_consolidated(store=memory_store, zarr_format=3)
result = [x[0] async for x in reopened.members(max_depth=None)]
expected = ["foo", "foo/bar", "foo/bar/spam"]
assert result == expected

async def test_use_consolidated_for_children_members(
self, memory_store: zarr.storage.MemoryStore
):
# A test that has *unconsolidated* metadata at the root group, but discovers
# a child group with consolidated metadata.

root = await zarr.api.asynchronous.create_group(store=memory_store)
await root.create_group("a/b")
# Consolidate metadata at "a/b"
await zarr.api.asynchronous.consolidate_metadata(memory_store, path="a/b")

# Add a new group a/b/c, that's not present in the CM at "a/b"
await root.create_group("a/b/c")

# Now according to the consolidated metadata, "a" has children ["b"]
# but according to the unconsolidated metadata, "a" has children ["b", "c"]
group = await zarr.api.asynchronous.open_group(store=memory_store, path="a")
with pytest.warns(UserWarning, match="Object at 'c' not found"):
result = sorted([x[0] async for x in group.members(max_depth=None)])
expected = ["b"]
assert result == expected

result = sorted(
[x[0] async for x in group.members(max_depth=None, use_consolidated_for_children=False)]
)
expected = ["b", "b/c"]
assert result == expected