Skip to content

feat(teradata): enhance column extraction with prepared statement fallback#16246

Draft
brock-acryl wants to merge 1 commit intomasterfrom
feat-teradata-views
Draft

feat(teradata): enhance column extraction with prepared statement fallback#16246
brock-acryl wants to merge 1 commit intomasterfrom
feat-teradata-views

Conversation

@brock-acryl
Copy link
Contributor

Fallback for column metadata: when column metadata cannot be obtained via QVCI (views), DBC system tables, or HELP COLUMN, ingestion can use prepared statements to extract column datatypes for both tables and views (when use_prepared_statement_metadata or metadata_extraction_fallback is enabled).

@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Feb 17, 2026
# Quote identifiers to prevent SQL injection
escaped_schema = schema.replace('"', '""')
escaped_table = table_name.replace('"', '""')
query_str = f'SELECT * FROM "{escaped_schema}"."{escaped_table}" WHERE 1=0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential SQL injection via string-based query concatenation - critical severity
SQL injection might be possible in these locations, especially if the strings being concatenated are controlled via user input.

Remediation: If possible, rebuild the query to use prepared statements or an ORM. If that is not possible, make sure the user input is verified or sanitized. As an added layer of protection, we also recommend installing a WAF that blocks SQL injection attacks.
View details in Aikido Security

@codecov
Copy link

codecov bot commented Feb 17, 2026

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
14536 1 14535 156
View the top 1 failed test(s) by shortest run time
tests.unit.test_teradata_performance.TestQueryOptimizations::test_qvci_optimization
Stack Traces | 0.009s run time
self = <tests.unit.test_teradata_performance.TestQueryOptimizations object at 0x7f256c4cbdf0>

    def test_qvci_optimization(self):
        """Test QVCI optimization for column information."""
        from datahub.ingestion.source.sql.teradata import optimized_get_columns
    
        # Create table in cache
        test_table = TeradataTable(
            database="test_schema",
            name="test_table",
            description="Test table",
            object_type="Table",
            create_timestamp=datetime.now(),
            last_alter_name=None,
            last_alter_timestamp=None,
            request_text=None,
        )
    
        tables_cache = {"test_schema": [test_table]}
    
        # Mock self object
        mock_self = MagicMock()
        mock_self.get_schema_columns.return_value = {"test_table": []}
    
        mock_connection = MagicMock()
    
        # Test with QVCI enabled
>       optimized_get_columns(
            mock_self,
            mock_connection,
            "test_table",
            schema="test_schema",
            tables_cache=tables_cache,
            use_qvci=True,
        )

tests/unit/test_teradata_performance.py:577: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <MagicMock id='139796691583232'>
connection = <MagicMock id='139796692894768'>, table_name = 'test_table'
schema = 'test_schema'
tables_cache = {'test_schema': [TeradataTable(database='test_schema', name='test_table', description='Test table', object_type='Table...atetime.datetime(2026, 2, 17, 22, 1, 13, 398918), last_alter_name=None, last_alter_timestamp=None, request_text=None)]}
use_qvci = True, kw = {}
td_table = TeradataTable(database='test_schema', name='test_table', description='Test table', object_type='Table', create_timestamp=datetime.datetime(2026, 2, 17, 22, 1, 13, 398918), last_alter_name=None, last_alter_timestamp=None, request_text=None)
t = TeradataTable(database='test_schema', name='test_table', description='Test table', object_type='Table', create_timestamp=datetime.datetime(2026, 2, 17, 22, 1, 13, 398918), last_alter_name=None, last_alter_timestamp=None, request_text=None)
config = <MagicMock name='mock.config' id='139800780970480'>
use_fallback = <MagicMock name='mock.config.metadata_extraction_fallback' id='139796691469888'>
use_prepared = <MagicMock name='mock.config.use_prepared_statement_metadata' id='139796579153280'>
res = [], extraction_errors = []

    def optimized_get_columns(  # noqa: C901
        self: Any,
        connection: Connection,
        table_name: str,
        schema: Optional[str] = None,
        tables_cache: Optional[MutableMapping[str, List[TeradataTable]]] = None,
        use_qvci: bool = False,
        **kw: Dict[str, Any],
    ) -> List[Dict]:
        tables_cache = tables_cache or {}
        if schema is None:
            schema = self.default_schema_name
    
        # Using 'help schema.table.*' statements has been considered.
        # The DBC.ColumnsV provides the default value which is not available
        # with the 'help column' commands result.
    
        td_table: Optional[TeradataTable] = None
        for t in tables_cache[schema]:
            if t.name == table_name:
                td_table = t
                break
    
        if td_table is None:
            logger.warning(
                f"Table {table_name} not found in cache for schema {schema}, not getting columns"
            )
            return []
    
        config = getattr(self, "config", None)
        use_fallback = config and getattr(config, "metadata_extraction_fallback", False)
        use_prepared = config and getattr(config, "use_prepared_statement_metadata", False)
    
        res: List[Any] = []
        extraction_errors: List[str] = []
    
        # Try QVCI first if enabled and view
        if td_table.object_type == "View" and use_qvci:
            res, error = _try_qvci_extraction(
                self, connection, schema, table_name, td_table
            )
            if error:
                extraction_errors.append(error)
                logger.warning(f"QVCI extraction failed for {schema}.{table_name}: {error}")
            if res:
                return _process_columns(res, td_table, self)
    
        # Try HELP COLUMN for views (existing behavior when QVCI not enabled)
        if td_table.object_type == "View" and not use_qvci and not res:
            res, error = _try_help_extraction(self, connection, schema, table_name)
            if error:
                extraction_errors.append(error)
                if not use_fallback:
                    raise Exception(error)
            if res:
                return _process_columns(res, td_table, self)
    
        # Try DBC system tables
        if not res:
            res, error = _try_dbc_extraction(self, connection, schema, table_name, use_qvci)
            if error:
                extraction_errors.append(error)
                logger.warning(f"DBC extraction failed for {schema}.{table_name}: {error}")
                if not (use_fallback or use_prepared):
                    raise Exception(error)
            if res:
                return _process_columns(res, td_table, self)
    
        # Fallback to prepared statement method if enabled
        if (use_fallback or use_prepared) and not res:
            res, error = _try_prepared_statement_extraction(
                self, connection, schema, table_name
            )
            if error:
                extraction_errors.append(error)
                logger.error(
                    f"Prepared statement extraction failed for {schema}.{table_name}: {error}"
                )
            if res:
                return _process_columns(res, td_table, self)
    
        # Final fallback to HELP COLUMN if all else failed
        if use_fallback and not res:
            res, error = _try_help_extraction(self, connection, schema, table_name)
            if error:
                extraction_errors.append(error)
            else:
                if hasattr(self, "report"):
                    self.report.num_tables_using_help_fallback += 1
                    self.report.tables_using_help_fallback.append(f"{schema}.{table_name}")
            if res:
                return _process_columns(res, td_table, self)
    
        # All methods failed
        if not res:
            error_msg = f"All metadata extraction methods failed for {schema}.{table_name}: {'; '.join(extraction_errors)}"
            logger.error(error_msg)
>           raise Exception(error_msg)
E           Exception: All metadata extraction methods failed for test_schema.test_table:

.../source/sql/teradata.py:440: Exception

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@alwaysmeticulous
Copy link

alwaysmeticulous bot commented Feb 17, 2026

✅ Meticulous spotted 0 visual differences across 1009 screens tested: view results.

Meticulous evaluated ~8 hours of user flows against your PR.

Expected differences? Click here. Last updated for commit c7ba25b. This comment will update as new commits are pushed.

@codecov
Copy link

codecov bot commented Feb 17, 2026

Bundle Report

Bundle size has no change ✅

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments