diff --git a/changes/2921.bugfix.rst b/changes/2921.bugfix.rst new file mode 100644 index 0000000000..65db48654f --- /dev/null +++ b/changes/2921.bugfix.rst @@ -0,0 +1 @@ +Ignore stale child metadata when reconsolidating metadata. diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 285d777258..374c568495 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -193,7 +193,10 @@ async def consolidate_metadata( group = await AsyncGroup.open(store_path, zarr_format=zarr_format, use_consolidated=False) group.store_path.store._check_writable() - members_metadata = {k: v.metadata async for k, v in group.members(max_depth=None)} + members_metadata = { + k: v.metadata + async for k, v in group.members(max_depth=None, use_consolidated_for_children=False) + } # While consolidating, we want to be explicit about when child groups # are empty by inserting an empty dict for consolidated_metadata.metadata for k, v in members_metadata.items(): diff --git a/src/zarr/core/group.py b/src/zarr/core/group.py index da2aa5f754..8d5e9ac68f 100644 --- a/src/zarr/core/group.py +++ b/src/zarr/core/group.py @@ -1296,6 +1296,8 @@ async def nmembers( async def members( self, max_depth: int | None = 0, + *, + use_consolidated_for_children: bool = True, ) -> AsyncGenerator[ tuple[str, AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | AsyncGroup], None, @@ -1313,6 +1315,11 @@ async def members( default, (``max_depth=0``) only immediate children are included. Set ``max_depth=None`` to include all nodes, and some positive integer to consider children within that many levels of the root Group. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups loaded + from the store. Note that this only affects groups loaded from the + store. If the current Group already has consolidated metadata, it + will always be used. Returns ------- @@ -1323,7 +1330,9 @@ async def members( """ if max_depth is not None and max_depth < 0: raise ValueError(f"max_depth must be None or >= 0. Got '{max_depth}' instead") - async for item in self._members(max_depth=max_depth): + async for item in self._members( + max_depth=max_depth, use_consolidated_for_children=use_consolidated_for_children + ): yield item def _members_consolidated( @@ -1353,7 +1362,7 @@ def _members_consolidated( yield from obj._members_consolidated(new_depth, prefix=key) async def _members( - self, max_depth: int | None + self, max_depth: int | None, *, use_consolidated_for_children: bool = True ) -> AsyncGenerator[ tuple[str, AsyncArray[ArrayV3Metadata] | AsyncArray[ArrayV2Metadata] | AsyncGroup], None ]: @@ -1383,7 +1392,11 @@ async def _members( # enforce a concurrency limit by passing a semaphore to all the recursive functions semaphore = asyncio.Semaphore(config.get("async.concurrency")) async for member in _iter_members_deep( - self, max_depth=max_depth, skip_keys=skip_keys, semaphore=semaphore + self, + max_depth=max_depth, + skip_keys=skip_keys, + semaphore=semaphore, + use_consolidated_for_children=use_consolidated_for_children, ): yield member @@ -2079,10 +2092,34 @@ def nmembers(self, max_depth: int | None = 0) -> int: return self._sync(self._async_group.nmembers(max_depth=max_depth)) - def members(self, max_depth: int | None = 0) -> tuple[tuple[str, Array | Group], ...]: + def members( + self, max_depth: int | None = 0, *, use_consolidated_for_children: bool = True + ) -> tuple[tuple[str, Array | Group], ...]: """ - Return the sub-arrays and sub-groups of this group as a tuple of (name, array | group) - pairs + Returns an AsyncGenerator over the arrays and groups contained in this group. + This method requires that `store_path.store` supports directory listing. + + The results are not guaranteed to be ordered. + + Parameters + ---------- + max_depth : int, default 0 + The maximum number of levels of the hierarchy to include. By + default, (``max_depth=0``) only immediate children are included. Set + ``max_depth=None`` to include all nodes, and some positive integer + to consider children within that many levels of the root Group. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups loaded + from the store. Note that this only affects groups loaded from the + store. If the current Group already has consolidated metadata, it + will always be used. + + Returns + ------- + path: + A string giving the path to the target, relative to the Group ``self``. + value: AsyncArray or AsyncGroup + The AsyncArray or AsyncGroup that is a child of ``self``. """ _members = self._sync_iter(self._async_group.members(max_depth=max_depth)) @@ -3317,6 +3354,7 @@ async def _iter_members_deep( max_depth: int | None, skip_keys: tuple[str, ...], semaphore: asyncio.Semaphore | None = None, + use_consolidated_for_children: bool = True, ) -> AsyncGenerator[ tuple[str, AsyncArray[ArrayV3Metadata] | AsyncArray[ArrayV2Metadata] | AsyncGroup], None ]: @@ -3334,6 +3372,11 @@ async def _iter_members_deep( A tuple of keys to skip when iterating over the possible members of the group. semaphore : asyncio.Semaphore | None An optional semaphore to use for concurrency control. + use_consolidated_for_children : bool, default True + Whether to use the consolidated metadata of child groups loaded + from the store. Note that this only affects groups loaded from the + store. If the current Group already has consolidated metadata, it + will always be used. Yields ------ @@ -3348,8 +3391,19 @@ async def _iter_members_deep( else: new_depth = max_depth - 1 async for name, node in _iter_members(group, skip_keys=skip_keys, semaphore=semaphore): + is_group = isinstance(node, AsyncGroup) + if ( + is_group + and not use_consolidated_for_children + and node.metadata.consolidated_metadata is not None # type: ignore [union-attr] + ): + node = cast("AsyncGroup", node) + # We've decided not to trust consolidated metadata at this point, because we're + # reconsolidating the metadata, for example. + node = replace(node, metadata=replace(node.metadata, consolidated_metadata=None)) yield name, node - if isinstance(node, AsyncGroup) and do_recursion: + if is_group and do_recursion: + node = cast("AsyncGroup", node) to_recurse[name] = _iter_members_deep( node, max_depth=new_depth, skip_keys=skip_keys, semaphore=semaphore ) diff --git a/tests/test_metadata/test_consolidated.py b/tests/test_metadata/test_consolidated.py index c1ff2e130a..42a14869fc 100644 --- a/tests/test_metadata/test_consolidated.py +++ b/tests/test_metadata/test_consolidated.py @@ -573,3 +573,47 @@ async def test_use_consolidated_false( assert len([x async for x in good.members()]) == 2 assert good.metadata.consolidated_metadata assert sorted(good.metadata.consolidated_metadata.metadata) == ["a", "b"] + + async def test_stale_child_metadata_ignored(self, memory_store: zarr.storage.MemoryStore): + # https://github.com/zarr-developers/zarr-python/issues/2921 + # When consolidating metadata, we should ignore any (possibly stale) metadata + # from previous consolidations, *including at child nodes*. + root = await zarr.api.asynchronous.group(store=memory_store, zarr_format=3) + await root.create_group("foo") + await zarr.api.asynchronous.consolidate_metadata(memory_store, path="foo") + await root.create_group("foo/bar/spam") + + await zarr.api.asynchronous.consolidate_metadata(memory_store) + + reopened = await zarr.api.asynchronous.open_consolidated(store=memory_store, zarr_format=3) + result = [x[0] async for x in reopened.members(max_depth=None)] + expected = ["foo", "foo/bar", "foo/bar/spam"] + assert result == expected + + async def test_use_consolidated_for_children_members( + self, memory_store: zarr.storage.MemoryStore + ): + # A test that has *unconsolidated* metadata at the root group, but discovers + # a child group with consolidated metadata. + + root = await zarr.api.asynchronous.create_group(store=memory_store) + await root.create_group("a/b") + # Consolidate metadata at "a/b" + await zarr.api.asynchronous.consolidate_metadata(memory_store, path="a/b") + + # Add a new group a/b/c, that's not present in the CM at "a/b" + await root.create_group("a/b/c") + + # Now according to the consolidated metadata, "a" has children ["b"] + # but according to the unconsolidated metadata, "a" has children ["b", "c"] + group = await zarr.api.asynchronous.open_group(store=memory_store, path="a") + with pytest.warns(UserWarning, match="Object at 'c' not found"): + result = sorted([x[0] async for x in group.members(max_depth=None)]) + expected = ["b"] + assert result == expected + + result = sorted( + [x[0] async for x in group.members(max_depth=None, use_consolidated_for_children=False)] + ) + expected = ["b", "b/c"] + assert result == expected