diff --git a/icechunk-python/pyproject.toml b/icechunk-python/pyproject.toml index b7abf2782..cdf7d7c42 100644 --- a/icechunk-python/pyproject.toml +++ b/icechunk-python/pyproject.toml @@ -131,13 +131,13 @@ cache-keys = [ [tool.pytest.ini_options] asyncio_mode = "auto" -asyncio_default_fixture_loop_scope = "function" minversion = "7" testpaths = ["tests", "integration_tests"] log_cli_level = "INFO" xfail_strict = true addopts = ["-ra", "--strict-config", "--strict-markers"] markers = [ + "asyncio", "gpu", # need this to run the zarr tests ] filterwarnings = [ @@ -209,3 +209,7 @@ ban-relative-imports = "all" [project.scripts] icechunk = "icechunk._icechunk_python:cli_entrypoint" + +[project.entry-points."zarr.stores"] +icechunk = "icechunk.zarr_adapter:IcechunkStoreAdapter" +ic = "icechunk.zarr_adapter:ICStoreAdapter" diff --git a/icechunk-python/python/icechunk/__init__.py b/icechunk-python/python/icechunk/__init__.py index 331898307..48826b77d 100644 --- a/icechunk-python/python/icechunk/__init__.py +++ b/icechunk-python/python/icechunk/__init__.py @@ -85,6 +85,11 @@ tigris_storage, ) from icechunk.store import IcechunkStore +from icechunk.zarr_adapter import ( + IcechunkPathSpec, + create_readonly_session_from_path, + parse_icechunk_path_spec, +) __all__ = [ "AnyAzureCredential", @@ -113,6 +118,7 @@ "GcsCredentials", "GcsStaticCredentials", "IcechunkError", + "IcechunkPathSpec", "IcechunkStore", "ManifestConfig", "ManifestFileInfo", @@ -143,6 +149,7 @@ "azure_static_credentials", "azure_storage", "containers_credentials", + "create_readonly_session_from_path", "gcs_credentials", "gcs_from_env_credentials", "gcs_refreshable_credentials", @@ -154,6 +161,7 @@ "initialize_logs", "local_filesystem_storage", "local_filesystem_store", + "parse_icechunk_path_spec", "print_debug_info", "r2_storage", "s3_anonymous_credentials", diff --git a/icechunk-python/python/icechunk/zarr_adapter.py b/icechunk-python/python/icechunk/zarr_adapter.py new file mode 100644 index 000000000..0a19f556f --- /dev/null +++ b/icechunk-python/python/icechunk/zarr_adapter.py @@ -0,0 +1,406 @@ +""" +Icechunk store adapter for ZEP 8 URL syntax support. + +This module provides integration between Icechunk and zarr-python's ZEP 8 URL syntax, +enabling Icechunk stores to be used in URL chains. It also includes shared utilities +for parsing ZEP 8 URL path specifications and creating readonly sessions. +""" + +from __future__ import annotations + +import re +from typing import TYPE_CHECKING, Any, Optional +from urllib.parse import urlparse + +from icechunk import ( + IcechunkStore, + Repository, + Storage, + gcs_storage, + in_memory_storage, + local_filesystem_storage, + s3_storage, +) +from zarr.abc.store_adapter import StoreAdapter + +if TYPE_CHECKING: + from zarr.abc.store_adapter import URLSegment + +# Re-export shared utilities for external use +__all__ = [ + "ICStoreAdapter", + "IcechunkPathSpec", + "IcechunkStoreAdapter", + "create_readonly_session_from_path", + "parse_icechunk_path_spec", +] + +# ============================================================================= +# Shared Session Utilities +# ============================================================================= + + +class IcechunkPathSpec: + """Parsed icechunk path specification from ZEP 8 URL syntax. + + Represents the parsed components of a path specification like: + - @branch.main/data/array -> branch='main', path='data/array' + - @tag.v1.0 -> tag='v1.0', path='' + - @abc123def456/nested -> snapshot_id='abc123def456', path='nested' + - data/array -> branch='main' (default), path='data/array' + """ + + def __init__( + self, + branch: Optional[str] = None, + tag: Optional[str] = None, + snapshot_id: Optional[str] = None, + path: str = "", + ): + """Initialize path specification. + + Args: + branch: Branch name (defaults to 'main' if no other ref specified) + tag: Tag name + snapshot_id: Snapshot ID + path: Path within the repository after version specification + """ + # Ensure only one reference type is specified + ref_count = sum(1 for ref in [branch, tag, snapshot_id] if ref is not None) + if ref_count > 1: + raise ValueError("Only one of branch, tag, or snapshot_id can be specified") + + # Default to main branch if no reference specified + if ref_count == 0: + branch = "main" + + self.branch = branch + self.tag = tag + self.snapshot_id = snapshot_id + self.path = path + + @property + def reference_type(self) -> str: + """Get the type of reference specified.""" + if self.branch is not None: + return "branch" + elif self.tag is not None: + return "tag" + elif self.snapshot_id is not None: + return "snapshot" + else: + return "branch" # Default + + @property + def reference_value(self) -> str: + """Get the reference value.""" + if self.branch is not None: + return self.branch + elif self.tag is not None: + return self.tag + elif self.snapshot_id is not None: + return self.snapshot_id + else: + return "main" # Default + + def __repr__(self) -> str: + return ( + f"IcechunkPathSpec(branch={self.branch!r}, tag={self.tag!r}, " + f"snapshot_id={self.snapshot_id!r}, path={self.path!r})" + ) + + +def parse_icechunk_path_spec(segment_path: str) -> IcechunkPathSpec: + """Parse ZEP 8 URL path specification into structured components. + + Args: + segment_path: Path specification using ZEP 8 format: + - @branch.main/path -> branch 'main' + - @tag.v123/path -> tag 'v123' + - @SNAPSHOT_ID/path -> snapshot 'SNAPSHOT_ID' + - empty/path -> default to main branch + - just/path -> default to main branch, path='just/path' + + Returns: + IcechunkPathSpec with parsed components + + Examples: + >>> parse_icechunk_path_spec("@branch.main/data/temp") + IcechunkPathSpec(branch='main', tag=None, snapshot_id=None, path='data/temp') + + >>> parse_icechunk_path_spec("@tag.v1.0") + IcechunkPathSpec(branch=None, tag='v1.0', snapshot_id=None, path='') + + >>> parse_icechunk_path_spec("data/array") + IcechunkPathSpec(branch='main', tag=None, snapshot_id=None, path='data/array') + """ + if not segment_path: + # Empty path -> default to main branch + return IcechunkPathSpec(branch="main", path="") + + if not segment_path.startswith("@"): + # No @ prefix -> treat entire string as path, default to main branch + return IcechunkPathSpec(branch="main", path=segment_path) + + # Remove @ prefix and split on first / + ref_spec = segment_path[1:] + if "/" in ref_spec: + ref_part, path_part = ref_spec.split("/", 1) + else: + ref_part = ref_spec + path_part = "" + + # Parse reference specification + if ref_part.startswith("branch."): + branch_name = ref_part[7:] # Remove 'branch.' prefix + if not branch_name: + raise ValueError("Branch name cannot be empty in @branch.name format") + return IcechunkPathSpec(branch=branch_name, path=path_part) + elif ref_part.startswith("tag."): + tag_name = ref_part[4:] # Remove 'tag.' prefix + if not tag_name: + raise ValueError("Tag name cannot be empty in @tag.name format") + return IcechunkPathSpec(tag=tag_name, path=path_part) + else: + # Assume it's a snapshot ID (no prefix, must be a valid ID format) + if not re.match(r"^[a-fA-F0-9]{12,}$", ref_part): + # If it doesn't look like a snapshot ID, treat as invalid + raise ValueError( + f"Invalid reference specification: '{ref_part}'. " + "Expected @branch.name, @tag.name, or @SNAPSHOT_ID format" + ) + return IcechunkPathSpec(snapshot_id=ref_part, path=path_part) + + +async def create_readonly_session_from_path( + repo: Repository, path_spec: IcechunkPathSpec +) -> IcechunkStore: + """Create readonly Icechunk session from parsed path specification. + + Args: + repo: Icechunk repository instance + path_spec: Parsed path specification + + Returns: + IcechunkStore instance for the specified version + + Raises: + ValueError: If branch, tag, or snapshot doesn't exist + """ + try: + if path_spec.branch is not None: + session = await repo.readonly_session_async(branch=path_spec.branch) + elif path_spec.tag is not None: + session = await repo.readonly_session_async(tag=path_spec.tag) + elif path_spec.snapshot_id is not None: + session = await repo.readonly_session_async(snapshot_id=path_spec.snapshot_id) + else: + # Fallback to main branch (should not happen due to IcechunkPathSpec validation) + session = await repo.readonly_session_async(branch="main") + + return session.store + except Exception as e: + ref_desc = f"{path_spec.reference_type} '{path_spec.reference_value}'" + raise ValueError(f"Could not create readonly session for {ref_desc}: {e}") from e + + +# ============================================================================= +# Storage and URL Parsing Utilities +# ============================================================================= + + +def _parse_s3_url(url: str) -> tuple[str, str]: + """Parse s3:// URL into bucket and prefix.""" + parsed = urlparse(url) + if parsed.scheme != "s3": + raise ValueError(f"Expected s3:// URL, got: {url}") + + bucket = parsed.netloc + prefix = parsed.path.lstrip("/") + return bucket, prefix + + +def _parse_gcs_url(url: str) -> tuple[str, str]: + """Parse gcs:// or gs:// URL into bucket and prefix.""" + parsed = urlparse(url) + if parsed.scheme not in ("gcs", "gs"): + raise ValueError(f"Expected gcs:// or gs:// URL, got: {url}") + + bucket = parsed.netloc + prefix = parsed.path.lstrip("/") + return bucket, prefix + + +def _create_icechunk_storage(preceding_url: str) -> Storage: + """Create appropriate Icechunk storage from URL.""" + if preceding_url == "memory:": + return in_memory_storage() + elif preceding_url.startswith("file:"): + path = preceding_url[5:] # Remove 'file:' prefix + return local_filesystem_storage(path) + elif preceding_url.startswith("s3://"): + bucket, prefix = _parse_s3_url(preceding_url) + return s3_storage( + bucket=bucket, + prefix=prefix, + from_env=True, # Use environment variables for credentials + ) + elif preceding_url.startswith(("gcs://", "gs://")): + bucket, prefix = _parse_gcs_url(preceding_url) + return gcs_storage( + bucket=bucket, + prefix=prefix, + from_env=True, # Use environment variables for credentials + ) + else: + raise ValueError(f"Unsupported storage URL for icechunk: {preceding_url}") + + +async def _create_icechunk_store(repo: Repository, segment_path: str) -> IcechunkStore: + """Create appropriate Icechunk session based on path specification. + + Uses consolidated session utilities for consistent parsing and session creation. + + Expected path formats: + - @branch.main/path -> branch 'main' + - @tag.v123/path -> tag 'v123' + - @SNAPSHOT_ID/path -> snapshot 'SNAPSHOT_ID' + - empty/path -> default to main branch + """ + # Use consolidated utilities for parsing and session creation + path_spec = parse_icechunk_path_spec(segment_path) + return await create_readonly_session_from_path(repo, path_spec) + + +# ============================================================================= +# Store Adapter Implementation +# ============================================================================= + + +class IcechunkStoreAdapter(StoreAdapter): + """Store adapter for Icechunk repositories in ZEP 8 URL chains.""" + + adapter_name = "icechunk" + + @classmethod + async def from_url_segment( + cls, + segment: URLSegment, + preceding_url: str, + **kwargs: Any, + ) -> IcechunkStore: + """ + Create an IcechunkStore from a URL segment and preceding URL. + + Parameters + ---------- + segment : URLSegment + The URL segment with adapter='icechunk' and optional path. + preceding_url : str + The full URL before this adapter segment (e.g., 's3://bucket/repo', 'file:/path'). + This is used to determine the appropriate Icechunk storage backend. + **kwargs : Any + Additional arguments including storage_options. + + Returns + ------- + Store + A configured IcechunkStore instance. + + Raises + ------ + ValueError + If write mode is requested or repository cannot be opened. + + Examples + -------- + For URL "s3://mybucket/repo|icechunk:@branch.main": + - segment.adapter = "icechunk" + - segment.path = "@branch.main" + - preceding_url = "s3://mybucket/repo" + - Uses icechunk.s3_storage(bucket="mybucket", prefix="repo") + + For URL "file:/tmp/repo|icechunk:@tag.v1.0/data": + - segment.adapter = "icechunk" + - segment.path = "@tag.v1.0/data" + - Opens tag "v1.0" and accesses path "data" + """ + # Icechunk adapter is read-only via ZEP 8 URLs + # For writing, use the native Icechunk API directly + mode = kwargs.get("mode", "r") + if mode in ("w", "w-", "a"): + raise ValueError( + f"Write mode '{mode}' not supported for icechunk: URLs. " + "Use the native Icechunk API to create and write data, " + "then read it back using ZEP 8 URL syntax." + ) + + # Check storage_options for read_only setting + storage_options = kwargs.get("storage_options", {}) + if not storage_options.get("read_only", True): + raise ValueError( + "Icechunk adapter only supports read-only access via ZEP 8 URLs. " + "Set storage_options={'read_only': True} or omit for default." + ) + + # Check for unsupported memory: URLs + if preceding_url == "memory:": + raise ValueError( + "memory:|icechunk: URLs are not supported. In-memory Icechunk repositories " + "cannot be shared across different URL resolution calls. Each call creates " + "a new empty repository instance. Use file-based repositories instead: " + "file:/path/to/repo|icechunk:" + ) + + # Create appropriate Icechunk storage from preceding URL + try: + storage = _create_icechunk_storage(preceding_url) + except Exception as e: + raise ValueError( + f"Could not create Icechunk storage from URL '{preceding_url}': {e}" + ) from e + + # Open existing repository (read-only) + try: + repo = await Repository.open_async(storage) + except Exception: + raise ValueError( + f"Could not open existing Icechunk repository at '{preceding_url}'. " + "Create the repository first using the native Icechunk API." + ) from None + + # Create appropriate session / store based on segment path + store = await _create_icechunk_store(repo, segment.path) + + return store + + @classmethod + def can_handle_scheme(cls, scheme: str) -> bool: + return scheme in ("icechunk", "ic") + + @classmethod + def get_supported_schemes(cls) -> list[str]: + return ["icechunk", "ic"] + + @classmethod + def _extract_zarr_path_from_segment(cls, segment_path: str) -> str: + """Extract the zarr path component from an icechunk segment path. + + Args: + segment_path: Path like "@branch.main/data/array" or "@tag.v1.0" + + Returns: + The zarr path component, e.g. "data/array" or "" + """ + try: + path_spec = parse_icechunk_path_spec(segment_path) + return path_spec.path + except Exception: + return "" + + +# Additional short alias +class ICStoreAdapter(IcechunkStoreAdapter): + """Short alias for IcechunkStoreAdapter.""" + + adapter_name = "ic" diff --git a/icechunk-python/tests/test_zarr_adapter.py b/icechunk-python/tests/test_zarr_adapter.py new file mode 100644 index 000000000..e02ba7eea --- /dev/null +++ b/icechunk-python/tests/test_zarr_adapter.py @@ -0,0 +1,712 @@ +""" +Tests for Icechunk ZEP 8 URL adapter integration and session utilities. + +This module tests that the Icechunk store adapter works correctly with +zarr-python's ZEP 8 URL syntax for chained store access, and includes +tests for the consolidated session utilities. +""" + +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import numpy as np +import pytest + +import icechunk + +# Register the adapters for testing +import icechunk.zarr_adapter +import zarr + +# Import session utilities for testing +from icechunk.zarr_adapter import ( + IcechunkPathSpec, + create_readonly_session_from_path, + parse_icechunk_path_spec, +) +from zarr.registry import register_store_adapter + +register_store_adapter(icechunk.zarr_adapter.IcechunkStoreAdapter) +register_store_adapter(icechunk.zarr_adapter.ICStoreAdapter) + + +# ============================================================================= +# Basic URL Validation Tests +# ============================================================================= + + +def test_memory_url_rejected(): + """Test that memory:|icechunk: URLs are properly rejected.""" + with pytest.raises(ValueError, match="memory:\\|icechunk: URLs are not supported"): + zarr.open_group("memory:|icechunk:", mode="r") + + +def test_write_mode_rejected(): + """Test that write mode is rejected for icechunk: URLs.""" + with tempfile.TemporaryDirectory() as tmpdir: + url = f"file:{tmpdir}/test|icechunk:" + with pytest.raises( + ValueError, match="Write mode '.*' not supported for icechunk: URLs" + ): + zarr.open_group(url, mode="w") + + +def test_nonexistent_repository_rejected(): + """Test that non-existent repositories are properly rejected.""" + with tempfile.TemporaryDirectory() as tmpdir: + url = f"file:{tmpdir}/nonexistent|icechunk:" + with pytest.raises( + ValueError, match="Could not open existing Icechunk repository" + ): + zarr.open_group(url, mode="r") + + +def test_error_messages_are_helpful(): + """Test that error messages provide helpful guidance.""" + # Test memory URL error message + with pytest.raises(ValueError) as exc_info: + zarr.open_group("memory:|icechunk:", mode="r") + + error_msg = str(exc_info.value) + assert "not supported" in error_msg + assert "file:/path/to/repo|icechunk:" in error_msg + assert "shared across different URL resolution calls" in error_msg + + # Test write mode error message + with tempfile.TemporaryDirectory() as tmpdir: + with pytest.raises(ValueError) as exc_info: + zarr.open_group(f"file:{tmpdir}/test|icechunk:", mode="w") + + error_msg = str(exc_info.value) + assert "not supported for icechunk: URLs" in error_msg + assert "native Icechunk API" in error_msg + + +# ============================================================================= +# Adapter Name Variant Tests +# ============================================================================= + + +def test_adapter_name_variants(): + """Test that both 'icechunk' and 'ic' adapter names work.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "test_adapter_names" + + # Create repository + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + root = zarr.open_group(store, mode="w") + arr = root.create_array("data", shape=(10,), dtype="i4") + arr[:] = range(10) + session.commit("Test data") + + del session, store, root, arr + + # Test both adapter names + for adapter_name in ["icechunk", "ic"]: + url = f"file:{repo_path}|{adapter_name}:" + root_read = zarr.open_group(url, mode="r") + arr_read = root_read["data"] + np.testing.assert_array_equal(arr_read[:], list(range(10))) + + +# ============================================================================= +# File-based Repository Integration Tests +# ============================================================================= + + +def test_file_based_repository_roundtrip(): + """Test creating repository with Icechunk API and reading via ZEP 8 URL.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "test_repo" + + # Step 1: Create repository using native Icechunk API + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + # Create test data + root = zarr.open_group(store, mode="w") + arr = root.create_array("test_data", shape=(100,), dtype="f4", chunks=(10,)) + arr[:] = np.random.random(100) + arr.attrs.update({"units": "meters", "description": "Test measurement data"}) + + # Create nested structure + subgroup = root.create_group("measurements") + temp = subgroup.create_array("temperature", shape=(50,), dtype="f4") + temp[:] = np.random.normal(20, 5, 50) + temp.attrs["units"] = "celsius" + + root.attrs["experiment"] = "ZEP8 Integration Test" + root.attrs["version"] = "1.0" + + # Commit changes + session.commit("Initial test data") + + # Clean up references to ensure data is persisted + original_data = arr[:] # Save for comparison + original_temp = temp[:] + del session, store, root, arr, temp, subgroup + + # Step 2: Read via ZEP 8 URL syntax + url = f"file:{repo_path}|icechunk:" + root_read = zarr.open_group(url, mode="r") + + # Verify data integrity + assert "test_data" in root_read.array_keys() + assert "measurements" in root_read.group_keys() + + arr_read = root_read["test_data"] + assert arr_read.shape == (100,) + assert arr_read.dtype == np.dtype("f4") + assert arr_read.attrs["units"] == "meters" + assert arr_read.attrs["description"] == "Test measurement data" + np.testing.assert_array_equal(arr_read[:], original_data) + + # Verify nested structure + temp_read = root_read["measurements/temperature"] + assert temp_read.shape == (50,) + assert temp_read.attrs["units"] == "celsius" + np.testing.assert_array_equal(temp_read[:], original_temp) + + # Verify root attributes + assert root_read.attrs["experiment"] == "ZEP8 Integration Test" + assert root_read.attrs["version"] == "1.0" + + +def test_complex_data_structures(): + """Test ZEP 8 URL access with complex nested data structures.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "complex_data" + + # Create repository with complex structure + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + root = zarr.open_group(store, mode="w") + + # Create multi-level hierarchy + experiments = root.create_group("experiments") + exp1 = experiments.create_group("exp_001") + exp2 = experiments.create_group("exp_002") + + # Exp1: Time series data + exp1_data = exp1.create_array( + "timeseries", shape=(100, 3), dtype="f8", chunks=(10, 3) + ) + exp1_data[:] = np.random.random((100, 3)) + exp1_data.attrs.update( + {"columns": ["x", "y", "z"], "sampling_rate": 1000, "units": "volts"} + ) + + # Exp2: Image data + exp2_images = exp2.create_array( + "images", shape=(10, 64, 64), dtype="uint8", chunks=(1, 64, 64) + ) + exp2_images[:] = np.random.randint(0, 256, (10, 64, 64), dtype=np.uint8) + exp2_images.attrs.update( + {"format": "grayscale", "resolution": "64x64", "count": 10} + ) + + # Metadata group + metadata = root.create_group("metadata") + metadata.attrs["created_by"] = "test_suite" + metadata.attrs["experiment_date"] = "2024-08-09" + + session.commit("Complex data structure") + + # Save data for verification + exp1_original = exp1_data[:] + exp2_original = exp2_images[:] + + del ( + session, + store, + root, + experiments, + exp1, + exp2, + exp1_data, + exp2_images, + metadata, + ) + + # Read via ZEP 8 URL and verify everything + url = f"file:{repo_path}|icechunk:" + root_read = zarr.open_group(url, mode="r") + + # Verify structure + assert "experiments" in root_read.group_keys() + assert "metadata" in root_read.group_keys() + + exp1_read = root_read["experiments/exp_001"] + exp2_read = root_read["experiments/exp_002"] + + # Verify exp1 data + ts_read = exp1_read["timeseries"] + assert ts_read.shape == (100, 3) + assert ts_read.attrs["sampling_rate"] == 1000 + assert ts_read.attrs["units"] == "volts" + np.testing.assert_array_equal(ts_read[:], exp1_original) + + # Verify exp2 data + img_read = exp2_read["images"] + assert img_read.shape == (10, 64, 64) + assert img_read.attrs["format"] == "grayscale" + assert img_read.attrs["count"] == 10 + np.testing.assert_array_equal(img_read[:], exp2_original) + + # Verify metadata + meta_read = root_read["metadata"] + assert meta_read.attrs["created_by"] == "test_suite" + assert meta_read.attrs["experiment_date"] == "2024-08-09" + + +# ============================================================================= +# Branch-specific Access Tests +# ============================================================================= + + +def test_branch_specific_access(): + """Test accessing specific branches via ZEP 8 URL syntax.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "branched_repo" + + # Create repository with main branch + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + # Create initial data on main + root = zarr.open_group(store, mode="w") + data = root.create_array("experiment_data", shape=(20,), dtype="i4") + data[:] = np.arange(20) + data.attrs["version"] = "main" + + session.commit("Initial data on main") + + # Clean up to persist changes + main_data = data[:] + del session, store, root, data + + # Read from main branch via ZEP 8 URL + main_url = f"file:{repo_path}|icechunk:@branch.main" + root_main = zarr.open_group(main_url, mode="r") + + data_main = root_main["experiment_data"] + assert data_main.attrs["version"] == "main" + np.testing.assert_array_equal(data_main[:], main_data) + + +def test_icechunk_metadata_paths_ignored(): + """Test that icechunk metadata paths don't interfere with zarr paths.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "metadata_test" + + # Create repository + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + root = zarr.open_group(store, mode="w") + arr = root.create_array("root_data", shape=(5,), dtype="i4") + arr[:] = [1, 2, 3, 4, 5] + session.commit("Test data") + + del session, store, root, arr + + # URLs with icechunk metadata should access repository root, not treat + # metadata as zarr paths + metadata_urls = [ + f"file:{repo_path}|icechunk:@branch.main", + f"file:{repo_path}|icechunk:@tag.v1.0", # Would fail if tag doesn't exist, but path should be empty + f"file:{repo_path}|icechunk:@abc123", # Would fail if snapshot doesn't exist, but path should be empty + ] + + # Only test the branch URL since others would fail due to non-existent refs, + # but the path extraction logic should be correct + root_read = zarr.open_group(metadata_urls[0], mode="r") + assert "root_data" in root_read.array_keys() + arr_read = root_read["root_data"] + np.testing.assert_array_equal(arr_read[:], [1, 2, 3, 4, 5]) + + +# ============================================================================= +# Storage Backend Tests +# ============================================================================= + + +def test_file_storage_backend_used(): + """Test that file: URLs result in local_filesystem_storage usage.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "storage_test" + + # Create repo + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + root = zarr.open_group(store, mode="w") + arr = root.create_array("test", shape=(10,), dtype="i4") + arr[:] = range(10) + session.commit("Test") + + del session, store, root, arr + + # Access via ZEP 8 URL - this should use icechunk.local_filesystem_storage + # internally (we can't directly test this without inspecting internals, + # but we can verify it works correctly) + url = f"file:{repo_path}|icechunk:" + root_read = zarr.open_group(url, mode="r") + arr_read = root_read["test"] + np.testing.assert_array_equal(arr_read[:], list(range(10))) + + +def test_read_only_enforcement(): + """Test that ZEP 8 URL stores are properly read-only.""" + with tempfile.TemporaryDirectory() as tmpdir: + repo_path = Path(tmpdir) / "readonly_test" + + # Create repo + storage = icechunk.local_filesystem_storage(str(repo_path)) + repo = icechunk.Repository.create(storage) + session = repo.writable_session("main") + store = session.store + + root = zarr.open_group(store, mode="w") + arr = root.create_array("test", shape=(10,), dtype="i4") + arr[:] = range(10) + session.commit("Test") + + del session, store, root, arr + + # Access via ZEP 8 URL in read mode + url = f"file:{repo_path}|icechunk:" + root_read = zarr.open_group(url, mode="r") + + # Should be able to read + arr_read = root_read["test"] + assert len(arr_read[:]) == 10 + + # Should not be able to modify (this is enforced by zarr's read-only mode) + # The exact error depends on zarr version, but it should fail + with pytest.raises((PermissionError, ValueError, RuntimeError, Exception)): + arr_read[0] = 999 + + +# ============================================================================= +# Session Utilities Tests +# ============================================================================= + + +def test_icechunk_path_spec_branch(): + """Test IcechunkPathSpec with branch specification.""" + spec = IcechunkPathSpec(branch="main", path="data/temp") + assert spec.branch == "main" + assert spec.tag is None + assert spec.snapshot_id is None + assert spec.path == "data/temp" + assert spec.reference_type == "branch" + assert spec.reference_value == "main" + + +def test_icechunk_path_spec_tag(): + """Test IcechunkPathSpec with tag specification.""" + spec = IcechunkPathSpec(tag="v1.0", path="results") + assert spec.branch is None + assert spec.tag == "v1.0" + assert spec.snapshot_id is None + assert spec.path == "results" + assert spec.reference_type == "tag" + assert spec.reference_value == "v1.0" + + +def test_icechunk_path_spec_snapshot(): + """Test IcechunkPathSpec with snapshot specification.""" + spec = IcechunkPathSpec(snapshot_id="abc123def456", path="") + assert spec.branch is None + assert spec.tag is None + assert spec.snapshot_id == "abc123def456" + assert spec.path == "" + assert spec.reference_type == "snapshot" + assert spec.reference_value == "abc123def456" + + +def test_icechunk_path_spec_default(): + """Test IcechunkPathSpec with no reference (defaults to main).""" + spec = IcechunkPathSpec(path="data") + assert spec.branch == "main" + assert spec.tag is None + assert spec.snapshot_id is None + assert spec.path == "data" + assert spec.reference_type == "branch" + assert spec.reference_value == "main" + + +def test_icechunk_path_spec_multiple_refs_error(): + """Test that specifying multiple reference types raises error.""" + with pytest.raises(ValueError, match="Only one of branch, tag, or snapshot_id"): + IcechunkPathSpec(branch="main", tag="v1.0") + + with pytest.raises(ValueError, match="Only one of branch, tag, or snapshot_id"): + IcechunkPathSpec(branch="main", snapshot_id="abc123") + + with pytest.raises(ValueError, match="Only one of branch, tag, or snapshot_id"): + IcechunkPathSpec(tag="v1.0", snapshot_id="abc123") + + +def test_icechunk_path_spec_repr(): + """Test string representation of IcechunkPathSpec.""" + spec = IcechunkPathSpec(branch="main", path="data/temp") + repr_str = repr(spec) + assert "IcechunkPathSpec" in repr_str + assert "branch='main'" in repr_str + assert "path='data/temp'" in repr_str + + +def test_parse_empty_path(): + """Test parsing empty path specification.""" + spec = parse_icechunk_path_spec("") + assert spec.branch == "main" + assert spec.tag is None + assert spec.snapshot_id is None + assert spec.path == "" + + +def test_parse_path_only(): + """Test parsing path without version specification.""" + spec = parse_icechunk_path_spec("data/temperature") + assert spec.branch == "main" + assert spec.tag is None + assert spec.snapshot_id is None + assert spec.path == "data/temperature" + + +def test_parse_branch_specification(): + """Test parsing branch specifications.""" + # Branch without path + spec = parse_icechunk_path_spec("@branch.main") + assert spec.branch == "main" + assert spec.tag is None + assert spec.snapshot_id is None + assert spec.path == "" + + # Branch with path + spec = parse_icechunk_path_spec("@branch.development/data/temp") + assert spec.branch == "development" + assert spec.tag is None + assert spec.snapshot_id is None + assert spec.path == "data/temp" + + +def test_parse_tag_specification(): + """Test parsing tag specifications.""" + # Tag without path + spec = parse_icechunk_path_spec("@tag.v1.0") + assert spec.branch is None + assert spec.tag == "v1.0" + assert spec.snapshot_id is None + assert spec.path == "" + + # Tag with path + spec = parse_icechunk_path_spec("@tag.v2.1/results/final") + assert spec.branch is None + assert spec.tag == "v2.1" + assert spec.snapshot_id is None + assert spec.path == "results/final" + + +def test_parse_snapshot_specification(): + """Test parsing snapshot ID specifications.""" + # Snapshot without path + spec = parse_icechunk_path_spec("@abc123def456") + assert spec.branch is None + assert spec.tag is None + assert spec.snapshot_id == "abc123def456" + assert spec.path == "" + + # Snapshot with path + spec = parse_icechunk_path_spec("@abc123def456789/analysis/output") + assert spec.branch is None + assert spec.tag is None + assert spec.snapshot_id == "abc123def456789" + assert spec.path == "analysis/output" + + +def test_parse_invalid_branch_format(): + """Test error handling for invalid branch format.""" + with pytest.raises(ValueError, match="Branch name cannot be empty"): + parse_icechunk_path_spec("@branch.") + + with pytest.raises(ValueError, match="Branch name cannot be empty"): + parse_icechunk_path_spec("@branch./data") + + +def test_parse_invalid_tag_format(): + """Test error handling for invalid tag format.""" + with pytest.raises(ValueError, match="Tag name cannot be empty"): + parse_icechunk_path_spec("@tag.") + + with pytest.raises(ValueError, match="Tag name cannot be empty"): + parse_icechunk_path_spec("@tag./data") + + +def test_parse_invalid_snapshot_format(): + """Test error handling for invalid snapshot ID format.""" + # Too short + with pytest.raises(ValueError, match="Invalid reference specification"): + parse_icechunk_path_spec("@abc123") + + # Invalid characters + with pytest.raises(ValueError, match="Invalid reference specification"): + parse_icechunk_path_spec("@xyz123invalid456") + + # Invalid format + with pytest.raises(ValueError, match="Invalid reference specification"): + parse_icechunk_path_spec("@invalid-ref-format") + + +def test_parse_edge_cases(): + """Test edge cases in path parsing.""" + # Multiple slashes + spec = parse_icechunk_path_spec("@branch.main/data//nested///array") + assert spec.branch == "main" + assert spec.path == "data//nested///array" + + # Branch name with special characters (allowed) + spec = parse_icechunk_path_spec("@branch.feature-123_dev") + assert spec.branch == "feature-123_dev" + + # Tag name with special characters (allowed) + spec = parse_icechunk_path_spec("@tag.v1.0-rc.1") + assert spec.tag == "v1.0-rc.1" + + +@pytest.mark.asyncio +async def test_create_readonly_session_branch(): + """Test creating readonly session from branch specification.""" + # Mock repository and session + mock_repo = AsyncMock() + mock_session = AsyncMock() + mock_store = MagicMock() + + mock_repo.readonly_session_async.return_value = mock_session + mock_session.store = mock_store + + # Test branch session creation + path_spec = IcechunkPathSpec(branch="development", path="data") + store = await create_readonly_session_from_path(mock_repo, path_spec) + + mock_repo.readonly_session_async.assert_called_once_with(branch="development") + assert store == mock_store + + +@pytest.mark.asyncio +async def test_create_readonly_session_tag(): + """Test creating readonly session from tag specification.""" + mock_repo = AsyncMock() + mock_session = AsyncMock() + mock_store = MagicMock() + + mock_repo.readonly_session_async.return_value = mock_session + mock_session.store = mock_store + + # Test tag session creation + path_spec = IcechunkPathSpec(tag="v2.0", path="results") + store = await create_readonly_session_from_path(mock_repo, path_spec) + + mock_repo.readonly_session_async.assert_called_once_with(tag="v2.0") + assert store == mock_store + + +@pytest.mark.asyncio +async def test_create_readonly_session_snapshot(): + """Test creating readonly session from snapshot specification.""" + mock_repo = AsyncMock() + mock_session = AsyncMock() + mock_store = MagicMock() + + mock_repo.readonly_session_async.return_value = mock_session + mock_session.store = mock_store + + # Test snapshot session creation + path_spec = IcechunkPathSpec(snapshot_id="abc123def456", path="") + store = await create_readonly_session_from_path(mock_repo, path_spec) + + mock_repo.readonly_session_async.assert_called_once_with(snapshot_id="abc123def456") + assert store == mock_store + + +@pytest.mark.asyncio +async def test_create_readonly_session_default(): + """Test creating readonly session with default (main branch).""" + mock_repo = AsyncMock() + mock_session = AsyncMock() + mock_store = MagicMock() + + mock_repo.readonly_session_async.return_value = mock_session + mock_session.store = mock_store + + # Test default session creation + path_spec = IcechunkPathSpec(path="data") # Defaults to main branch + store = await create_readonly_session_from_path(mock_repo, path_spec) + + mock_repo.readonly_session_async.assert_called_once_with(branch="main") + assert store == mock_store + + +@pytest.mark.asyncio +async def test_create_readonly_session_error(): + """Test error handling in session creation.""" + mock_repo = AsyncMock() + mock_repo.readonly_session_async.side_effect = Exception("Branch not found") + + path_spec = IcechunkPathSpec(branch="nonexistent") + + with pytest.raises( + ValueError, match="Could not create readonly session for branch 'nonexistent'" + ): + await create_readonly_session_from_path(mock_repo, path_spec) + + +def test_parsing_integration_examples(): + """Test parsing with realistic examples.""" + examples = [ + ("", "main", None, None, ""), + ("data/temperature", "main", None, None, "data/temperature"), + ("@branch.main", "main", None, None, ""), + ("@branch.development/experiments", "development", None, None, "experiments"), + ("@tag.v1.0", None, "v1.0", None, ""), + ("@tag.v2.1-rc.1/results/final", None, "v2.1-rc.1", None, "results/final"), + ("@abc123def456789", None, None, "abc123def456789", ""), + ( + "@abc123def456789abcdef/analysis", + None, + None, + "abc123def456789abcdef", + "analysis", + ), + ] + + for ( + input_path, + expected_branch, + expected_tag, + expected_snapshot, + expected_path, + ) in examples: + spec = parse_icechunk_path_spec(input_path) + assert spec.branch == expected_branch, f"Failed for input: {input_path}" + assert spec.tag == expected_tag, f"Failed for input: {input_path}" + assert spec.snapshot_id == expected_snapshot, f"Failed for input: {input_path}" + assert spec.path == expected_path, f"Failed for input: {input_path}"