diff --git a/docs/content/pypaimon/fuse-support.md b/docs/content/pypaimon/fuse-support.md new file mode 100644 index 000000000000..2dcfc82aa776 --- /dev/null +++ b/docs/content/pypaimon/fuse-support.md @@ -0,0 +1,137 @@ +--- +title: "FUSE Support" +weight: 7 +type: docs +aliases: + - /pypaimon/fuse-support.html +--- + +# FUSE Support + +When using PyPaimon REST Catalog to access remote object storage (such as OSS, S3, or HDFS), data access typically goes through remote storage SDKs. However, in scenarios where remote storage paths are mounted locally via FUSE (Filesystem in Userspace), users can access data directly through local filesystem paths for better performance. + +This feature enables PyPaimon to use local file access when FUSE mount is available, bypassing remote storage SDKs. + +## Configuration + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `fuse.local-path.enabled` | Boolean | `false` | Whether to enable FUSE local path mapping | +| `fuse.local-path.root` | String | (none) | FUSE mounted local root path, e.g., `/mnt/fuse/warehouse` | +| `fuse.local-path.mode` | String | `pvfs` | FUSE path mode: `pvfs` uses database/table logical names, `raw` uses URI path segments directly | +| `fuse.local-path.validation-mode` | String | `strict` | Validation mode: `strict`, `warn`, or `none` | + +## Usage + +```python +from pypaimon import CatalogFactory + +# PVFS mode (default): uses database/table logical names +catalog_options = { + 'metastore': 'rest', + 'uri': 'http://rest-server:8080', + 'warehouse': 'oss://my-catalog/', + 'token.provider': 'xxx', + + # FUSE local path configuration + 'fuse.local-path.enabled': 'true', + 'fuse.local-path.root': '/mnt/fuse/warehouse', + 'fuse.local-path.mode': 'pvfs', + 'fuse.local-path.validation-mode': 'strict' +} + +catalog = CatalogFactory.create(catalog_options) +``` + +## Path Modes + +The `fuse.local-path.mode` option controls how remote storage paths are mapped to local FUSE paths: + +| Mode | Path Mapping | Use Case | +|------|-------------|----------| +| `pvfs` | Uses database/table logical names from Identifier: `//` | PVFS catalog-level FUSE mount, where remote paths use UUIDs (e.g., `oss:////`) but FUSE exposes logical names | +| `raw` | Uses URI path segments directly: `//` | Standard FUSE mount where local directory structure mirrors remote storage layout | + +## Validation Modes + +Validation is performed on first data access to verify FUSE mount correctness. The `validation-mode` controls behavior when the local path does not exist: + +| Mode | Behavior | Use Case | +|------|----------|----------| +| `strict` | Throw exception, block operation | Production, safety first | +| `warn` | Log warning, fallback to default FileIO | Testing, compatibility first | +| `none` | Skip validation, use directly | Trusted environment, performance first | + +**Note**: Configuration errors (e.g., `fuse.local-path.enabled=true` but `fuse.local-path.root` not configured) will throw exceptions directly, regardless of validation mode. + +## How It Works + +1. When `fuse.local-path.enabled=true`, PyPaimon attempts to use local file access +2. On first data access, validation is triggered (unless mode is `none`) +3. Validation fetches the `default` database location and converts it to local path (always using raw URI parsing) +4. If local path exists, subsequent data access uses `FuseLocalFileIO` +5. Path translation depends on `fuse.local-path.mode`: + - `pvfs`: remote path `oss:////` → local path `//` + - `raw`: remote path `oss:////` → local path `//` +6. If validation fails, behavior depends on `validation-mode` + +## Example Scenario + +### PVFS Mode + +Assume you have: +- Remote storage paths use UUIDs: `oss://clg-paimon-xxx/db-xxx/tbl-xxx` +- FUSE mount: `/mnt/fuse/warehouse` (mounted to `pvfs://demo_catalog`) +- FUSE exposes logical names: `/mnt/fuse/warehouse/my_db/my_table` + +```python +from pypaimon import CatalogFactory + +catalog = CatalogFactory.create({ + 'metastore': 'rest', + 'uri': 'http://rest-server:8080', + 'warehouse': 'oss://my-catalog/', + 'fuse.local-path.enabled': 'true', + 'fuse.local-path.root': '/mnt/fuse/warehouse', + 'fuse.local-path.mode': 'pvfs', + 'fuse.local-path.validation-mode': 'none' +}) + +# When reading table 'my_db.my_table', PyPaimon will: +# 1. Convert "oss://clg-paimon-xxx/db-xxx/tbl-xxx" to "/mnt/fuse/warehouse/my_db/my_table" +# 2. Use FuseLocalFileIO to read from local path +table = catalog.get_table('my_db.my_table') +reader = table.new_read_builder().new_read() +``` + +### Raw Mode + +Assume you have: +- Remote storage: `oss://my-catalog/` +- FUSE mount: `/mnt/fuse/warehouse` (mounted to `oss://my-catalog/`) +- Local directory structure mirrors remote: `/mnt/fuse/warehouse/db/table` + +```python +from pypaimon import CatalogFactory + +catalog = CatalogFactory.create({ + 'metastore': 'rest', + 'uri': 'http://rest-server:8080', + 'warehouse': 'oss://my-catalog/', + 'fuse.local-path.enabled': 'true', + 'fuse.local-path.root': '/mnt/fuse/warehouse', + 'fuse.local-path.mode': 'raw' +}) + +# When reading table data, PyPaimon will: +# 1. Convert "oss://my-catalog/db/table" to "/mnt/fuse/warehouse/db/table" +# 2. Use FuseLocalFileIO to read from local path +table = catalog.get_table('db.table') +reader = table.new_read_builder().new_read() +``` + +## Limitations + +- Only catalog-level FUSE mount is supported (single `fuse.local-path.root` configuration) +- Validation only checks if local path exists, not data consistency +- If FUSE mount becomes unavailable after validation, file operations may fail diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 943d88840e2a..002dad3bf50b 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -15,7 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. """ +import logging from typing import Any, Callable, Dict, List, Optional, Union +from urllib.parse import urlparse from pypaimon.api.api_response import GetTableResponse, PagedList from pypaimon.api.rest_api import RESTApi @@ -32,10 +34,11 @@ from pypaimon.catalog.rest.property_change import PropertyChange from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO from pypaimon.catalog.rest.table_metadata import TableMetadata -from pypaimon.common.options.config import CatalogOptions +from pypaimon.common.options.config import CatalogOptions, FuseOptions from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier +from pypaimon.filesystem.local_file_io import LocalFileIO, FuseLocalFileIO from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import SchemaChange from pypaimon.schema.table_schema import TableSchema @@ -45,6 +48,8 @@ from pypaimon.table.format.format_table import FormatTable, Format from pypaimon.table.iceberg.iceberg_table import IcebergTable +logger = logging.getLogger(__name__) + FORMAT_TABLE_TYPE = "format-table" ICEBERG_TABLE_TYPE = "iceberg-table" @@ -57,6 +62,17 @@ def __init__(self, context: CatalogContext, config_required: Optional[bool] = Tr context.prefer_io_loader, context.fallback_io_loader) self.data_token_enabled = self.rest_api.options.get(CatalogOptions.DATA_TOKEN_ENABLED) + # FUSE local path configuration + self.fuse_local_path_enabled = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_ENABLED, False) + self.fuse_local_path_root = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_ROOT) + self.fuse_validation_mode = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_VALIDATION_MODE, "strict") + self.fuse_local_path_mode = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_MODE, "pvfs") + self._fuse_validation_state = None # None=not validated, True=passed, False=failed + def catalog_loader(self): """ Create and return a RESTCatalogLoader for this catalog. @@ -338,9 +354,125 @@ def file_io_from_options(self, table_path: str) -> FileIO: return FileIO.get(table_path, self.context.options) def file_io_for_data(self, table_path: str, identifier: Identifier): + """ + Get FileIO for data access, supporting FUSE local path mapping. + """ + # Try to use FUSE local path + if self.fuse_local_path_enabled: + # Configuration error raises exception directly + local_path = self._resolve_fuse_local_path(table_path, identifier) + + # Perform validation (only once) + if self._fuse_validation_state is None: + self._validate_fuse_path() + + # Validation passed, return FUSE-aware local FileIO + if self._fuse_validation_state: + return FuseLocalFileIO( + path=table_path.rstrip('/'), + fuse_path=local_path.rstrip('/'), + catalog_options=self.context.options, + ) + + # warn mode validation failed, fallback to default FileIO + return RESTTokenFileIO(identifier, table_path, self.context.options) \ + if self.data_token_enabled else self.file_io_from_options(table_path) + + # Fallback to original logic return RESTTokenFileIO(identifier, table_path, self.context.options) \ if self.data_token_enabled else self.file_io_from_options(table_path) + def _resolve_fuse_local_path(self, original_path: str, identifier: Optional[Identifier] = None) -> str: + """ + Resolve FUSE local path. + + In 'pvfs' mode, use database/table logical names from identifier to build the path. + If identifier has no object name, returns database-level path (used for validation). + In 'raw' mode, use URI path segments directly. + + Returns: + Local path + + Raises: + ValueError: If fuse.local-path.root is not configured or pvfs mode missing identifier + """ + if not self.fuse_local_path_root: + raise ValueError( + "FUSE local path is enabled but fuse.local-path.root is not configured" + ) + + root = self.fuse_local_path_root.rstrip('/') + + if self.fuse_local_path_mode == "pvfs": + if identifier is None: + raise ValueError( + "FUSE path mode 'pvfs' requires an Identifier to resolve " + "the local path, but identifier is None." + ) + db = identifier.get_database_name() + obj = identifier.get_object_name() + if obj: + return f"{root}/{db}/{obj}" + return f"{root}/{db}" + elif self.fuse_local_path_mode == "raw": + # raw mode: use URI path segments directly + uri = urlparse(original_path) + path_part = uri.path.lstrip('/') + if not uri.scheme: + # No scheme means path like "catalog/db/table", + # skip the first segment (catalog name) to align with scheme-based paths + segments = path_part.split('/') + if len(segments) > 1: + path_part = '/'.join(segments[1:]) + return f"{root}/{path_part}" + else: + raise ValueError( + f"Invalid fuse.local-path.mode: '{self.fuse_local_path_mode}'. " + f"Supported modes are 'pvfs' and 'raw'." + ) + + def _validate_fuse_path(self) -> None: + """ + Validate FUSE local path is correctly mounted. + + Get default database's location, convert to local path and check if it exists. + """ + if self.fuse_validation_mode == "none": + self._fuse_validation_state = True + return + + # Get default database details, API call failure raises exception directly + db = self.rest_api.get_database("default") + remote_location = db.location + + if not remote_location: + logger.info("Default database has no location, skipping FUSE validation") + self._fuse_validation_state = True + return + + expected_local = self._resolve_fuse_local_path(remote_location, Identifier.create("default", None)) + local_file_io = LocalFileIO(expected_local, self.context.options) + + # Only validate if local path exists, handle based on validation mode + if not local_file_io.exists(expected_local): + error_msg = ( + f"FUSE local path validation failed: " + f"local path '{expected_local}' does not exist " + f"for default database location '{remote_location}'" + ) + self._handle_validation_error(error_msg) + else: + self._fuse_validation_state = True + logger.info("FUSE local path validation passed") + + def _handle_validation_error(self, error_msg: str) -> None: + """Handle validation error based on validation mode.""" + if self.fuse_validation_mode == "strict": + raise ValueError(error_msg) + elif self.fuse_validation_mode == "warn": + logger.warning(f"{error_msg}. Falling back to default FileIO.") + self._fuse_validation_state = False # Mark validation failed, fallback to default FileIO + def load_table(self, identifier: Identifier, internal_file_io: Callable[[str], Any], diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index fb6a46446ea0..ee8d622947b4 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -83,3 +83,38 @@ class CatalogOptions: HTTP_USER_AGENT_HEADER = ConfigOptions.key( "header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP User Agent header") BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1 + + +class FuseOptions: + """FUSE local path configuration options.""" + + FUSE_LOCAL_PATH_ENABLED = ( + ConfigOptions.key("fuse.local-path.enabled") + .boolean_type() + .default_value(False) + .with_description("Whether to enable FUSE local path mapping") + ) + + FUSE_LOCAL_PATH_ROOT = ( + ConfigOptions.key("fuse.local-path.root") + .string_type() + .no_default_value() + .with_description("FUSE mounted local root path, e.g., /mnt/fuse/warehouse") + ) + + FUSE_LOCAL_PATH_VALIDATION_MODE = ( + ConfigOptions.key("fuse.local-path.validation-mode") + .string_type() + .default_value("strict") + .with_description("Validation mode: strict, warn, or none") + ) + + FUSE_LOCAL_PATH_MODE = ( + ConfigOptions.key("fuse.local-path.mode") + .string_type() + .default_value("pvfs") + .with_description( + "FUSE path mode: 'pvfs' uses database/table logical names, " + "'raw' uses URI path segments directly" + ) + ) diff --git a/paimon-python/pypaimon/filesystem/local_file_io.py b/paimon-python/pypaimon/filesystem/local_file_io.py index cf9399309f61..44dadaaf6680 100644 --- a/paimon-python/pypaimon/filesystem/local_file_io.py +++ b/paimon-python/pypaimon/filesystem/local_file_io.py @@ -453,3 +453,27 @@ def write_blob(self, path: str, data: pyarrow.Table, **kwargs): except Exception as e: self.delete_quietly(path) raise RuntimeError(f"Failed to write blob file {path}: {e}") from e + + +class FuseLocalFileIO(LocalFileIO): + """LocalFileIO that translates remote OSS paths to FUSE-mounted local paths. + + All file operations receive paths like: + oss://clg-paimon-xxx/db-xxx/tbl-xxx/manifest/manifest-xxx + This class replaces the path prefix with fuse_path so the actual + I/O goes through the FUSE mount point. + """ + + def __init__(self, path: str, fuse_path: str, + catalog_options: Optional[Options] = None): + super().__init__(path=fuse_path, catalog_options=catalog_options) + self.path = path + self.fuse_path = fuse_path + + def _to_file(self, path: str) -> Path: + return super()._to_file(self._translate(path)) + + def _translate(self, path: str) -> str: + if path == self.path or path.startswith(self.path + "/"): + return self.fuse_path + path[len(self.path):] + return path diff --git a/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py b/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py new file mode 100644 index 000000000000..8bd9f9209fca --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py @@ -0,0 +1,330 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import unittest +from unittest.mock import MagicMock, patch + +from pypaimon.catalog.rest.rest_catalog import RESTCatalog +from pypaimon.common.options import Options +from pypaimon.common.options.config import FuseOptions + + +class TestFuseLocalPath(unittest.TestCase): + """Test cases for FUSE local path functionality.""" + + def _create_catalog_with_fuse( + self, + enabled: bool = True, + root: str = "/mnt/fuse/warehouse", + validation_mode: str = "strict", + mode: str = "pvfs" + ) -> RESTCatalog: + """Helper to create a mock RESTCatalog with FUSE configuration.""" + options = Options({ + "uri": "http://localhost:8080", + "warehouse": "oss://catalog/warehouse", + FuseOptions.FUSE_LOCAL_PATH_ENABLED.key(): str(enabled).lower(), + FuseOptions.FUSE_LOCAL_PATH_ROOT.key(): root, + FuseOptions.FUSE_LOCAL_PATH_VALIDATION_MODE.key(): validation_mode, + FuseOptions.FUSE_LOCAL_PATH_MODE.key(): mode, + }) + + # Create a mock catalog directly without going through __init__ + catalog = MagicMock(spec=RESTCatalog) + catalog.fuse_local_path_enabled = enabled + catalog.fuse_local_path_root = root + catalog.fuse_validation_mode = validation_mode + catalog.fuse_local_path_mode = mode + catalog._fuse_validation_state = None + catalog.data_token_enabled = False + catalog.rest_api = MagicMock() + catalog.context = MagicMock() + catalog.context.options = options + + # Bind actual methods to the mock + catalog._resolve_fuse_local_path = RESTCatalog._resolve_fuse_local_path.__get__(catalog) + catalog._validate_fuse_path = RESTCatalog._validate_fuse_path.__get__(catalog) + catalog._handle_validation_error = RESTCatalog._handle_validation_error.__get__(catalog) + catalog.file_io_for_data = RESTCatalog.file_io_for_data.__get__(catalog) + catalog.file_io_from_options = MagicMock(return_value=MagicMock()) + + return catalog + + # ========== _resolve_fuse_local_path Tests ========== + + # --- pvfs mode tests --- + + def test_resolve_pvfs_mode_with_identifier(self): + """Test pvfs mode uses identifier logical names.""" + from pypaimon.common.identifier import Identifier + catalog = self._create_catalog_with_fuse(mode="pvfs") + identifier = Identifier.create("my_db", "my_table") + + result = catalog._resolve_fuse_local_path( + "oss://clg-paimon-xxx/db-xxx/tbl-xxx", identifier + ) + self.assertEqual(result, "/mnt/fuse/warehouse/my_db/my_table") + + def test_resolve_pvfs_mode_with_trailing_slash(self): + """Test pvfs mode with trailing slash on root.""" + from pypaimon.common.identifier import Identifier + catalog = self._create_catalog_with_fuse(mode="pvfs", root="/mnt/fuse/warehouse/") + identifier = Identifier.create("my_db", "my_table") + + result = catalog._resolve_fuse_local_path( + "oss://clg-paimon-xxx/db-xxx/tbl-xxx", identifier + ) + self.assertEqual(result, "/mnt/fuse/warehouse/my_db/my_table") + + def test_resolve_pvfs_mode_without_identifier_raises(self): + """Test pvfs mode raises ValueError when identifier is None.""" + catalog = self._create_catalog_with_fuse(mode="pvfs") + + with self.assertRaises(ValueError) as context: + catalog._resolve_fuse_local_path("oss://clg-paimon-xxx/db-xxx/tbl-xxx") + + self.assertIn("identifier is None", str(context.exception)) + + # --- raw mode tests --- + + def test_resolve_raw_mode_basic(self): + """Test raw mode basic path conversion.""" + catalog = self._create_catalog_with_fuse(mode="raw") + + result = catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1") + + def test_resolve_raw_mode_with_trailing_slash(self): + """Test raw mode with trailing slash on root.""" + catalog = self._create_catalog_with_fuse(mode="raw", root="/mnt/fuse/warehouse/") + + result = catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1") + + def test_resolve_raw_mode_deep_path(self): + """Test raw mode with deep path.""" + catalog = self._create_catalog_with_fuse(mode="raw") + + result = catalog._resolve_fuse_local_path( + "oss://catalog/db1/table1/partition1/file.parquet" + ) + self.assertEqual( + result, + "/mnt/fuse/warehouse/db1/table1/partition1/file.parquet" + ) + + def test_resolve_raw_mode_without_scheme(self): + """Test raw mode path without scheme skips first segment.""" + catalog = self._create_catalog_with_fuse(mode="raw") + + result = catalog._resolve_fuse_local_path("catalog/db1/table1") + self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1") + + def test_resolve_raw_mode_ignores_identifier(self): + """Test raw mode uses URI path even when identifier is provided.""" + from pypaimon.common.identifier import Identifier + catalog = self._create_catalog_with_fuse(mode="raw") + identifier = Identifier.create("my_db", "my_table") + + result = catalog._resolve_fuse_local_path( + "oss://catalog/db-uuid/tbl-uuid", identifier + ) + self.assertEqual(result, "/mnt/fuse/warehouse/db-uuid/tbl-uuid") + + # --- common tests --- + + def test_resolve_fuse_local_path_missing_root(self): + """Test error when root is not configured.""" + catalog = self._create_catalog_with_fuse(root=None) + + with self.assertRaises(ValueError) as context: + catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + + self.assertIn("fuse.local-path.root is not configured", str(context.exception)) + + # ========== Validation Tests ========== + + def test_validation_mode_none_skips_validation(self): + """Test none mode skips validation.""" + catalog = self._create_catalog_with_fuse(validation_mode="none") + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + def test_validation_mode_strict_raises_on_failure(self): + """Test strict mode raises exception on validation failure.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict") + + # Mock default database with location + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + # Mock LocalFileIO to return False for exists + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = False + mock_local_io.return_value = mock_instance + + with self.assertRaises(ValueError) as context: + catalog._validate_fuse_path() + + self.assertIn("FUSE local path validation failed", str(context.exception)) + + def test_validation_mode_warn_fallback_on_failure(self): + """Test warn mode falls back to default FileIO on validation failure.""" + catalog = self._create_catalog_with_fuse(validation_mode="warn") + + # Mock default database with location + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + # Mock LocalFileIO to return False for exists + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = False + mock_local_io.return_value = mock_instance + + # Should not raise, just set state to False + catalog._validate_fuse_path() + + self.assertFalse(catalog._fuse_validation_state) + + def test_validation_passes_when_local_exists(self): + """Test validation passes when local path exists.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict") + + # Mock default database with location + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + # Mock LocalFileIO to return True for exists + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = True + mock_local_io.return_value = mock_instance + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + def test_validation_skips_when_no_location(self): + """Test validation skips when default database has no location.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict") + + # Mock default database without location + mock_db = MagicMock() + mock_db.location = None + catalog.rest_api.get_database.return_value = mock_db + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + # ========== file_io_for_data Tests ========== + + def test_file_io_for_data_disabled_fuse(self): + """Test that disabled FUSE uses default FileIO.""" + catalog = self._create_catalog_with_fuse(enabled=False) + catalog.data_token_enabled = False + + from pypaimon.common.identifier import Identifier + identifier = Identifier.create("db1", "table1") + + _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier) + catalog.file_io_from_options.assert_called_once() + + def test_file_io_for_data_uses_local_when_validated(self): + """Test that validated FUSE uses FuseLocalFileIO.""" + catalog = self._create_catalog_with_fuse(enabled=True, validation_mode="none") + catalog._fuse_validation_state = True # Already validated + + from pypaimon.common.identifier import Identifier + identifier = Identifier.create("db1", "table1") + + with patch('pypaimon.catalog.rest.rest_catalog.FuseLocalFileIO') as mock_fuse_io: + mock_fuse_io.return_value = MagicMock() + _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier) + mock_fuse_io.assert_called_once() + + def test_file_io_for_data_fallback_when_validation_failed(self): + """Test that failed validation falls back to default FileIO.""" + catalog = self._create_catalog_with_fuse(enabled=True, validation_mode="warn") + catalog._fuse_validation_state = False # Validation failed + catalog.data_token_enabled = False + + from pypaimon.common.identifier import Identifier + identifier = Identifier.create("db1", "table1") + + _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier) + catalog.file_io_from_options.assert_called_once() + + # ========== Invalid Mode Tests ========== + + def test_resolve_invalid_mode_raises(self): + """Test that an invalid mode raises ValueError.""" + catalog = self._create_catalog_with_fuse(mode="invalid") + + with self.assertRaises(ValueError) as context: + catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + + self.assertIn("Invalid fuse.local-path.mode", str(context.exception)) + self.assertIn("invalid", str(context.exception)) + + # ========== Raw Mode Validation Tests ========== + + def test_validation_raw_mode_strict_raises_on_failure(self): + """Test strict validation in raw mode raises exception on failure.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict", mode="raw") + + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = False + mock_local_io.return_value = mock_instance + + with self.assertRaises(ValueError) as context: + catalog._validate_fuse_path() + + self.assertIn("FUSE local path validation failed", str(context.exception)) + + def test_validation_raw_mode_passes_when_local_exists(self): + """Test validation passes in raw mode when local path exists.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict", mode="raw") + + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = True + mock_local_io.return_value = mock_instance + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + +if __name__ == '__main__': + unittest.main()