Skip to content

Conversation

@fredwang1012
Copy link

@fredwang1012 fredwang1012 commented Jun 27, 2025

  • Description of PR changes above includes a link to an existing GitHub issue
  • PR title is prefixed with one of: [BUGFIX], [FEATURE], [DOCS], [MAINTENANCE], [CONTRIB], [MINORBUMP]
  • Code is linted - run invoke lint (uses ruff format + ruff check)
  • Appropriate tests and docs have been updated

For more information about contributing, visit our community resources.

After you submit your PR, keep the page open and monitor the statuses of the various checks made by our continuous integration process at the bottom of the page. Please fix any issues that come up and reach out on Slack if you need help. Thanks for contributing!

@netlify
Copy link

netlify bot commented Jun 27, 2025

‼️ Deploy request for niobium-lead-7998 rejected.

Name Link
🔨 Latest commit 0d60767

Comment on lines 335 to 350
class DatabricksIdentifierPreparer(IdentifierPreparer):
"""Custom identifier preparer for Databricks that uses backticks."""

def __init__(self, dialect, initial_quote="`", final_quote="`", escape_quote="``", quote_case_sensitive_collations=True, omit_schema=False):
super().__init__(
dialect,
initial_quote=initial_quote,
final_quote=final_quote,
escape_quote=escape_quote,
quote_case_sensitive_collations=quote_case_sensitive_collations,
omit_schema=omit_schema
)

# Replace the dialect's preparer with our custom one
self.engine.dialect.identifier_preparer = DatabricksIdentifierPreparer(self.engine.dialect)
logger.debug("Successfully patched Databricks dialect to use backticks for identifier quoting")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem correct.

The databricks dialect we use already overrides the preparer. See here.

Furthermore, identifier_preparer is a memoized_property which I believe is meant to be read-only. It's derived from the dialect's property field.

@NathanFarmer NathanFarmer changed the title Bug fix for Databricks SQL batch replacement [BUGFIX] Databricks SQL tables with special characters Jul 7, 2025
@codecov
Copy link

codecov bot commented Jul 7, 2025

❌ 62 Tests Failed:

Tests completed Failed Passed Skipped
17935 62 17873 2437
View the full list of 3 ❄️ flaky tests
tests.integration.data_sources_and_expectations.expectations.test_unexpected_rows_expectation::test_unexpected_rows_expectation_batch_keyword_failure[SELECT * FROM {batch} WHERE quantity > 0-redshift]

Flake rate in main: 29.39% (Passed 418 times, Failed 174 times)

Stack Traces | 0.469s run time
self = <sqlalchemy.engine.base.Connection object at 0x7fce59d80080>
engine = Engine(redshift+psycopg2://oss:***@oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com:5439/dev?sslmode=prefer)
connection = None, _has_events = None, _allow_revalidate = True
_allow_autobegin = True

    def __init__(
        self,
        engine: Engine,
        connection: Optional[PoolProxiedConnection] = None,
        _has_events: Optional[bool] = None,
        _allow_revalidate: bool = True,
        _allow_autobegin: bool = True,
    ):
        """Construct a new Connection."""
        self.engine = engine
        self.dialect = dialect = engine.dialect
    
        if connection is None:
            try:
>               self._dbapi_connection = engine.raw_connection()
                                         ^^^^^^^^^^^^^^^^^^^^^^^

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:145: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3297: in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:449: in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:1264: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:713: in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:179: in _do_get
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:177: in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:390: in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:675: in __init__
    self.__connect()
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:901: in __connect
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:897: in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/create.py:646: in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/default.py:625: in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dsn = 'sslmode=prefer sslrootcert=.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/sqlalchemy_redshift/r...est-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com dbname=dev user=oss password=rc30dRs3H1ft port=5439'
connection_factory = None, cursor_factory = None
kwargs = {'dbname': 'dev', 'host': 'oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com', 'password': 'rc30dRs3H1ft', 'port': 5439, ...}
kwasync = {}

    def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
        """
        Create a new database connection.
    
        The connection parameters can be specified as a string:
    
            conn = psycopg2.connect("dbname=test user=postgres password=secret")
    
        or using a set of keyword arguments:
    
            conn = psycopg2.connect(database="test", user="postgres", password="secret")
    
        Or as a mix of both. The basic connection parameters are:
    
        - *dbname*: the database name
        - *database*: the database name (only as keyword argument)
        - *user*: user name used to authenticate
        - *password*: password used to authenticate
        - *host*: database host address (defaults to UNIX socket if not provided)
        - *port*: connection port number (defaults to 5432 if not provided)
    
        Using the *connection_factory* parameter a different class or connections
        factory can be specified. It should be a callable object taking a dsn
        argument.
    
        Using the *cursor_factory* parameter, a new default cursor factory will be
        used by cursor().
    
        Using *async*=True an asynchronous connection will be created. *async_* is
        a valid alias (for Python versions where ``async`` is a keyword).
    
        Any other keyword parameter will be passed to the underlying client
        library: the list of supported parameters depends on the library version.
    
        """
        kwasync = {}
        if 'async' in kwargs:
            kwasync['async'] = kwargs.pop('async')
        if 'async_' in kwargs:
            kwasync['async_'] = kwargs.pop('async_')
    
        dsn = _ext.make_dsn(dsn, **kwargs)
>       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E       psycopg2.OperationalError: connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/psycopg2/__init__.py:122: OperationalError

The above exception was the direct cause of the following exception:

self = RedshiftDatasource(type='redshift', name='kyyzbgemmm', id=None, assets=[], connection_string=RedshiftDsn('redshift+psy...naws.com:5439/dev?sslmode=prefer', scheme='redshift+psycopg2', host_type='domain'), create_temp_table=False, kwargs={})
test_assets = True

    @override
    def test_connection(self, test_assets: bool = True) -> None:
        """Test the connection for the SQLDatasource.
    
        Args:
            test_assets: If assets have been passed to the SQLDatasource, whether to test them as well.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """  # noqa: E501 # FIXME CoP
        try:
            engine: sqlalchemy.Engine = self.get_engine()
>           engine.connect()

.../datasource/fluent/sql_datasource.py:1275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3273: in connect
    return self._connection_cls(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:147: in __init__
    Connection._handle_dbapi_exception_noconnection(
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:2436: in _handle_dbapi_exception_noconnection
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:145: in __init__
    self._dbapi_connection = engine.raw_connection()
                             ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3297: in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:449: in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:1264: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:713: in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:179: in _do_get
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:177: in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:390: in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:675: in __init__
    self.__connect()
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:901: in __connect
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:897: in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/create.py:646: in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/default.py:625: in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dsn = 'sslmode=prefer sslrootcert=.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/sqlalchemy_redshift/r...est-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com dbname=dev user=oss password=rc30dRs3H1ft port=5439'
connection_factory = None, cursor_factory = None
kwargs = {'dbname': 'dev', 'host': 'oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com', 'password': 'rc30dRs3H1ft', 'port': 5439, ...}
kwasync = {}

    def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
        """
        Create a new database connection.
    
        The connection parameters can be specified as a string:
    
            conn = psycopg2.connect("dbname=test user=postgres password=secret")
    
        or using a set of keyword arguments:
    
            conn = psycopg2.connect(database="test", user="postgres", password="secret")
    
        Or as a mix of both. The basic connection parameters are:
    
        - *dbname*: the database name
        - *database*: the database name (only as keyword argument)
        - *user*: user name used to authenticate
        - *password*: password used to authenticate
        - *host*: database host address (defaults to UNIX socket if not provided)
        - *port*: connection port number (defaults to 5432 if not provided)
    
        Using the *connection_factory* parameter a different class or connections
        factory can be specified. It should be a callable object taking a dsn
        argument.
    
        Using the *cursor_factory* parameter, a new default cursor factory will be
        used by cursor().
    
        Using *async*=True an asynchronous connection will be created. *async_* is
        a valid alias (for Python versions where ``async`` is a keyword).
    
        Any other keyword parameter will be passed to the underlying client
        library: the list of supported parameters depends on the library version.
    
        """
        kwasync = {}
        if 'async' in kwargs:
            kwasync['async'] = kwargs.pop('async')
        if 'async_' in kwargs:
            kwasync['async_'] = kwargs.pop('async_')
    
        dsn = _ext.make_dsn(dsn, **kwargs)
>       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E       sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users
E       
E       (Background on this error at: https://sqlalche..../e/20/e3q8)

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/psycopg2/__init__.py:122: OperationalError

The above exception was the direct cause of the following exception:

>       lambda: ihook(item=item, **kwds), when=when, reraise=reraise
                ^^^^^^^^^^^^^^^^^^^^^^^^
    )

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../site-packages/flaky/flaky_pytest_plugin.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/integration/conftest.py:222: in batch_for_datasource
    yield _batch_setup_for_datasource.make_batch()
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../test_utils/data_source_config/sql.py:99: in make_batch
    self.make_asset()
.../test_utils/data_source_config/redshift.py:100: in make_asset
    return self.context.data_sources.add_redshift(
.../datasource/fluent/sources.py:476: in add_datasource
    datasource.test_connection()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = RedshiftDatasource(type='redshift', name='kyyzbgemmm', id=None, assets=[], connection_string=RedshiftDsn('redshift+psy...naws.com:5439/dev?sslmode=prefer', scheme='redshift+psycopg2', host_type='domain'), create_temp_table=False, kwargs={})
test_assets = True

    @override
    def test_connection(self, test_assets: bool = True) -> None:
        """Test the connection for the SQLDatasource.
    
        Args:
            test_assets: If assets have been passed to the SQLDatasource, whether to test them as well.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """  # noqa: E501 # FIXME CoP
        try:
            engine: sqlalchemy.Engine = self.get_engine()
            engine.connect()
        except Exception as e:
>           raise TestConnectionError(cause=e) from e
E           great_expectations.datasource.fluent.interfaces.TestConnectionError: Attempt to connect to datasource failed: due to OperationalError('(psycopg2.OperationalError) connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users\n')

.../datasource/fluent/sql_datasource.py:1277: TestConnectionError
tests.integration.data_sources_and_expectations.expectations.test_expect_query_results_to_match_comparison::test_expect_query_results_to_match_comparison_error[redshift->redshift]

Flake rate in main: 31.03% (Passed 349 times, Failed 157 times)

Stack Traces | 0.48s run time
self = <sqlalchemy.engine.base.Connection object at 0x7fce5a39ed50>
engine = Engine(redshift+psycopg2://oss:***@oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com:5439/dev?sslmode=prefer)
connection = None, _has_events = None, _allow_revalidate = True
_allow_autobegin = True

    def __init__(
        self,
        engine: Engine,
        connection: Optional[PoolProxiedConnection] = None,
        _has_events: Optional[bool] = None,
        _allow_revalidate: bool = True,
        _allow_autobegin: bool = True,
    ):
        """Construct a new Connection."""
        self.engine = engine
        self.dialect = dialect = engine.dialect
    
        if connection is None:
            try:
>               self._dbapi_connection = engine.raw_connection()
                                         ^^^^^^^^^^^^^^^^^^^^^^^

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:145: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3297: in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:449: in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:1264: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:713: in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:179: in _do_get
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:177: in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:390: in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:675: in __init__
    self.__connect()
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:901: in __connect
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:897: in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/create.py:646: in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/default.py:625: in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dsn = 'sslmode=prefer sslrootcert=.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/sqlalchemy_redshift/r...est-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com dbname=dev user=oss password=rc30dRs3H1ft port=5439'
connection_factory = None, cursor_factory = None
kwargs = {'dbname': 'dev', 'host': 'oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com', 'password': 'rc30dRs3H1ft', 'port': 5439, ...}
kwasync = {}

    def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
        """
        Create a new database connection.
    
        The connection parameters can be specified as a string:
    
            conn = psycopg2.connect("dbname=test user=postgres password=secret")
    
        or using a set of keyword arguments:
    
            conn = psycopg2.connect(database="test", user="postgres", password="secret")
    
        Or as a mix of both. The basic connection parameters are:
    
        - *dbname*: the database name
        - *database*: the database name (only as keyword argument)
        - *user*: user name used to authenticate
        - *password*: password used to authenticate
        - *host*: database host address (defaults to UNIX socket if not provided)
        - *port*: connection port number (defaults to 5432 if not provided)
    
        Using the *connection_factory* parameter a different class or connections
        factory can be specified. It should be a callable object taking a dsn
        argument.
    
        Using the *cursor_factory* parameter, a new default cursor factory will be
        used by cursor().
    
        Using *async*=True an asynchronous connection will be created. *async_* is
        a valid alias (for Python versions where ``async`` is a keyword).
    
        Any other keyword parameter will be passed to the underlying client
        library: the list of supported parameters depends on the library version.
    
        """
        kwasync = {}
        if 'async' in kwargs:
            kwasync['async'] = kwargs.pop('async')
        if 'async_' in kwargs:
            kwasync['async_'] = kwargs.pop('async_')
    
        dsn = _ext.make_dsn(dsn, **kwargs)
>       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E       psycopg2.OperationalError: connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/psycopg2/__init__.py:122: OperationalError

The above exception was the direct cause of the following exception:

self = RedshiftDatasource(type='redshift', name='xtwlkdkmtj', id=None, assets=[], connection_string=RedshiftDsn('redshift+psy...naws.com:5439/dev?sslmode=prefer', scheme='redshift+psycopg2', host_type='domain'), create_temp_table=False, kwargs={})
test_assets = True

    @override
    def test_connection(self, test_assets: bool = True) -> None:
        """Test the connection for the SQLDatasource.
    
        Args:
            test_assets: If assets have been passed to the SQLDatasource, whether to test them as well.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """  # noqa: E501 # FIXME CoP
        try:
            engine: sqlalchemy.Engine = self.get_engine()
>           engine.connect()

.../datasource/fluent/sql_datasource.py:1275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3273: in connect
    return self._connection_cls(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:147: in __init__
    Connection._handle_dbapi_exception_noconnection(
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:2436: in _handle_dbapi_exception_noconnection
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:145: in __init__
    self._dbapi_connection = engine.raw_connection()
                             ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3297: in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:449: in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:1264: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:713: in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:179: in _do_get
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:177: in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:390: in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:675: in __init__
    self.__connect()
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:901: in __connect
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:897: in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/create.py:646: in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/default.py:625: in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dsn = 'sslmode=prefer sslrootcert=.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/sqlalchemy_redshift/r...est-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com dbname=dev user=oss password=rc30dRs3H1ft port=5439'
connection_factory = None, cursor_factory = None
kwargs = {'dbname': 'dev', 'host': 'oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com', 'password': 'rc30dRs3H1ft', 'port': 5439, ...}
kwasync = {}

    def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
        """
        Create a new database connection.
    
        The connection parameters can be specified as a string:
    
            conn = psycopg2.connect("dbname=test user=postgres password=secret")
    
        or using a set of keyword arguments:
    
            conn = psycopg2.connect(database="test", user="postgres", password="secret")
    
        Or as a mix of both. The basic connection parameters are:
    
        - *dbname*: the database name
        - *database*: the database name (only as keyword argument)
        - *user*: user name used to authenticate
        - *password*: password used to authenticate
        - *host*: database host address (defaults to UNIX socket if not provided)
        - *port*: connection port number (defaults to 5432 if not provided)
    
        Using the *connection_factory* parameter a different class or connections
        factory can be specified. It should be a callable object taking a dsn
        argument.
    
        Using the *cursor_factory* parameter, a new default cursor factory will be
        used by cursor().
    
        Using *async*=True an asynchronous connection will be created. *async_* is
        a valid alias (for Python versions where ``async`` is a keyword).
    
        Any other keyword parameter will be passed to the underlying client
        library: the list of supported parameters depends on the library version.
    
        """
        kwasync = {}
        if 'async' in kwargs:
            kwasync['async'] = kwargs.pop('async')
        if 'async_' in kwargs:
            kwasync['async_'] = kwargs.pop('async_')
    
        dsn = _ext.make_dsn(dsn, **kwargs)
>       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E       sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users
E       
E       (Background on this error at: https://sqlalche..../e/20/e3q8)

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/psycopg2/__init__.py:122: OperationalError

The above exception was the direct cause of the following exception:

>       lambda: ihook(item=item, **kwds), when=when, reraise=reraise
                ^^^^^^^^^^^^^^^^^^^^^^^^
    )

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../site-packages/flaky/flaky_pytest_plugin.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/integration/conftest.py:255: in multi_source_batch
    secondary_asset = secondary_batch_setup.make_asset()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../test_utils/data_source_config/redshift.py:100: in make_asset
    return self.context.data_sources.add_redshift(
.../datasource/fluent/sources.py:476: in add_datasource
    datasource.test_connection()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = RedshiftDatasource(type='redshift', name='xtwlkdkmtj', id=None, assets=[], connection_string=RedshiftDsn('redshift+psy...naws.com:5439/dev?sslmode=prefer', scheme='redshift+psycopg2', host_type='domain'), create_temp_table=False, kwargs={})
test_assets = True

    @override
    def test_connection(self, test_assets: bool = True) -> None:
        """Test the connection for the SQLDatasource.
    
        Args:
            test_assets: If assets have been passed to the SQLDatasource, whether to test them as well.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """  # noqa: E501 # FIXME CoP
        try:
            engine: sqlalchemy.Engine = self.get_engine()
            engine.connect()
        except Exception as e:
>           raise TestConnectionError(cause=e) from e
E           great_expectations.datasource.fluent.interfaces.TestConnectionError: Attempt to connect to datasource failed: due to OperationalError('(psycopg2.OperationalError) connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users\n')

.../datasource/fluent/sql_datasource.py:1277: TestConnectionError
tests.integration.data_sources_and_expectations.expectations.test_expect_query_results_to_match_comparison::test_expect_query_results_to_match_comparison_failure[redshift->sqlite-column_value_mismatch]

Flake rate in main: 30.83% (Passed 350 times, Failed 156 times)

Stack Traces | 0.483s run time
self = <sqlalchemy.engine.base.Connection object at 0x7fce5a3a4920>
engine = Engine(redshift+psycopg2://oss:***@oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com:5439/dev?sslmode=prefer)
connection = None, _has_events = None, _allow_revalidate = True
_allow_autobegin = True

    def __init__(
        self,
        engine: Engine,
        connection: Optional[PoolProxiedConnection] = None,
        _has_events: Optional[bool] = None,
        _allow_revalidate: bool = True,
        _allow_autobegin: bool = True,
    ):
        """Construct a new Connection."""
        self.engine = engine
        self.dialect = dialect = engine.dialect
    
        if connection is None:
            try:
>               self._dbapi_connection = engine.raw_connection()
                                         ^^^^^^^^^^^^^^^^^^^^^^^

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:145: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3297: in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:449: in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:1264: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:713: in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:179: in _do_get
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:177: in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:390: in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:675: in __init__
    self.__connect()
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:901: in __connect
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:897: in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/create.py:646: in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/default.py:625: in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dsn = 'sslmode=prefer sslrootcert=.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/sqlalchemy_redshift/r...est-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com dbname=dev user=oss password=rc30dRs3H1ft port=5439'
connection_factory = None, cursor_factory = None
kwargs = {'dbname': 'dev', 'host': 'oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com', 'password': 'rc30dRs3H1ft', 'port': 5439, ...}
kwasync = {}

    def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
        """
        Create a new database connection.
    
        The connection parameters can be specified as a string:
    
            conn = psycopg2.connect("dbname=test user=postgres password=secret")
    
        or using a set of keyword arguments:
    
            conn = psycopg2.connect(database="test", user="postgres", password="secret")
    
        Or as a mix of both. The basic connection parameters are:
    
        - *dbname*: the database name
        - *database*: the database name (only as keyword argument)
        - *user*: user name used to authenticate
        - *password*: password used to authenticate
        - *host*: database host address (defaults to UNIX socket if not provided)
        - *port*: connection port number (defaults to 5432 if not provided)
    
        Using the *connection_factory* parameter a different class or connections
        factory can be specified. It should be a callable object taking a dsn
        argument.
    
        Using the *cursor_factory* parameter, a new default cursor factory will be
        used by cursor().
    
        Using *async*=True an asynchronous connection will be created. *async_* is
        a valid alias (for Python versions where ``async`` is a keyword).
    
        Any other keyword parameter will be passed to the underlying client
        library: the list of supported parameters depends on the library version.
    
        """
        kwasync = {}
        if 'async' in kwargs:
            kwasync['async'] = kwargs.pop('async')
        if 'async_' in kwargs:
            kwasync['async_'] = kwargs.pop('async_')
    
        dsn = _ext.make_dsn(dsn, **kwargs)
>       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E       psycopg2.OperationalError: connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/psycopg2/__init__.py:122: OperationalError

The above exception was the direct cause of the following exception:

self = RedshiftDatasource(type='redshift', name='zqlwjvbfzh', id=None, assets=[], connection_string=RedshiftDsn('redshift+psy...naws.com:5439/dev?sslmode=prefer', scheme='redshift+psycopg2', host_type='domain'), create_temp_table=False, kwargs={})
test_assets = True

    @override
    def test_connection(self, test_assets: bool = True) -> None:
        """Test the connection for the SQLDatasource.
    
        Args:
            test_assets: If assets have been passed to the SQLDatasource, whether to test them as well.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """  # noqa: E501 # FIXME CoP
        try:
            engine: sqlalchemy.Engine = self.get_engine()
>           engine.connect()

.../datasource/fluent/sql_datasource.py:1275: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3273: in connect
    return self._connection_cls(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:147: in __init__
    Connection._handle_dbapi_exception_noconnection(
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:2436: in _handle_dbapi_exception_noconnection
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:145: in __init__
    self._dbapi_connection = engine.raw_connection()
                             ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/base.py:3297: in raw_connection
    return self.pool.connect()
           ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:449: in connect
    return _ConnectionFairy._checkout(self)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:1264: in _checkout
    fairy = _ConnectionRecord.checkout(pool)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:713: in checkout
    rec = pool._do_get()
          ^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:179: in _do_get
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/impl.py:177: in _do_get
    return self._create_connection()
           ^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:390: in _create_connection
    return _ConnectionRecord(self)
           ^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:675: in __init__
    self.__connect()
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:901: in __connect
    with util.safe_reraise():
         ^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/util/langhelpers.py:224: in __exit__
    raise exc_value.with_traceback(exc_tb)
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/pool/base.py:897: in __connect
    self.dbapi_connection = connection = pool._invoke_creator(self)
                                         ^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/create.py:646: in connect
    return dialect.connect(*cargs, **cparams)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../sqlalchemy/engine/default.py:625: in connect
    return self.loaded_dbapi.connect(*cargs, **cparams)  # type: ignore[no-any-return]  # NOQA: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

dsn = 'sslmode=prefer sslrootcert=.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/sqlalchemy_redshift/r...est-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com dbname=dev user=oss password=rc30dRs3H1ft port=5439'
connection_factory = None, cursor_factory = None
kwargs = {'dbname': 'dev', 'host': 'oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com', 'password': 'rc30dRs3H1ft', 'port': 5439, ...}
kwasync = {}

    def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
        """
        Create a new database connection.
    
        The connection parameters can be specified as a string:
    
            conn = psycopg2.connect("dbname=test user=postgres password=secret")
    
        or using a set of keyword arguments:
    
            conn = psycopg2.connect(database="test", user="postgres", password="secret")
    
        Or as a mix of both. The basic connection parameters are:
    
        - *dbname*: the database name
        - *database*: the database name (only as keyword argument)
        - *user*: user name used to authenticate
        - *password*: password used to authenticate
        - *host*: database host address (defaults to UNIX socket if not provided)
        - *port*: connection port number (defaults to 5432 if not provided)
    
        Using the *connection_factory* parameter a different class or connections
        factory can be specified. It should be a callable object taking a dsn
        argument.
    
        Using the *cursor_factory* parameter, a new default cursor factory will be
        used by cursor().
    
        Using *async*=True an asynchronous connection will be created. *async_* is
        a valid alias (for Python versions where ``async`` is a keyword).
    
        Any other keyword parameter will be passed to the underlying client
        library: the list of supported parameters depends on the library version.
    
        """
        kwasync = {}
        if 'async' in kwargs:
            kwasync['async'] = kwargs.pop('async')
        if 'async_' in kwargs:
            kwasync['async_'] = kwargs.pop('async_')
    
        dsn = _ext.make_dsn(dsn, **kwargs)
>       conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
E       sqlalchemy.exc.OperationalError: (psycopg2.OperationalError) connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users
E       
E       (Background on this error at: https://sqlalche..../e/20/e3q8)

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12....../site-packages/psycopg2/__init__.py:122: OperationalError

The above exception was the direct cause of the following exception:

>       lambda: ihook(item=item, **kwds), when=when, reraise=reraise
                ^^^^^^^^^^^^^^^^^^^^^^^^
    )

.../hostedtoolcache/Python/3.12.11................................................................................................................../x64/lib/python3.12.../site-packages/flaky/flaky_pytest_plugin.py:146: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
tests/integration/conftest.py:255: in multi_source_batch
    secondary_asset = secondary_batch_setup.make_asset()
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.../test_utils/data_source_config/redshift.py:100: in make_asset
    return self.context.data_sources.add_redshift(
.../datasource/fluent/sources.py:476: in add_datasource
    datasource.test_connection()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = RedshiftDatasource(type='redshift', name='zqlwjvbfzh', id=None, assets=[], connection_string=RedshiftDsn('redshift+psy...naws.com:5439/dev?sslmode=prefer', scheme='redshift+psycopg2', host_type='domain'), create_temp_table=False, kwargs={})
test_assets = True

    @override
    def test_connection(self, test_assets: bool = True) -> None:
        """Test the connection for the SQLDatasource.
    
        Args:
            test_assets: If assets have been passed to the SQLDatasource, whether to test them as well.
    
        Raises:
            TestConnectionError: If the connection test fails.
        """  # noqa: E501 # FIXME CoP
        try:
            engine: sqlalchemy.Engine = self.get_engine()
            engine.connect()
        except Exception as e:
>           raise TestConnectionError(cause=e) from e
E           great_expectations.datasource.fluent.interfaces.TestConnectionError: Attempt to connect to datasource failed: due to OperationalError('(psycopg2.OperationalError) connection to server at "oss-test-redshift-cluster.crz6wpzfh5px.us-east-1.redshift.amazonaws.com" (44.198.206.69), port 5439 failed: FATAL:  connection limit "500" exceeded for non-bootstrap users\n')

.../datasource/fluent/sql_datasource.py:1277: TestConnectionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Copy link
Contributor

@NathanFarmer NathanFarmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this contribution @fredwang1012! This "quoted identifiers" bug, is one we have been tackling piece by piece for a long time. There are already some existing tests for similar cases in this file that may or may not start failing once you move the changes into DatabricksTableAsset.

I also noticed that the linter is failing. You can run the linter at any time by running invoke lint. Also, if you run pre-commit install, the linter will run automatically before each commit.

@fredwang1012 fredwang1012 force-pushed the databricks-quote-bug branch 2 times, most recently from 0c6cfc4 to 85a82be Compare July 10, 2025 20:28
…acters

- Enhanced _resolve_quoted_name validator to automatically detect and quote table names with special characters (digits, spaces, hyphens, dots, symbols)
- Added _resolve_quoted_schema_name validator to handle schema names with special characters
- Moved test file to correct directory structure
- Fixes issue with table/schema names starting with digits or containing special characters
@fredwang1012 fredwang1012 force-pushed the databricks-quote-bug branch from 85a82be to 6b5261e Compare July 11, 2025 03:24
Copy link
Contributor

@NathanFarmer NathanFarmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking pretty good! Running pre-commit install or invoke lint will get you past the linting errors so the rest of the tests will run.

fredwang1012 and others added 11 commits July 12, 2025 00:31
- Fix QueryMetricProvider to use dialect-specific identifier quoting
- Update _get_substituted_batch_subquery_from_query_and_batch_selectable to handle TableClause objects
- Use dialect's identifier preparer to ensure correct quote characters (backticks for Databricks)
- Add test for Custom SQL Expectations with special table names
- Fixes PARSE_SYNTAX_ERROR when using table names starting with digits in Custom SQL

This completes the fix for Databricks table identifier quoting across all expectation types
@fredwang1012
Copy link
Author

@NathanFarmer I've addressed the linting issues by moving the imports to the top of the file. Ready for re-review and workflow approval. Thanks!

Copy link
Contributor

@NathanFarmer NathanFarmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the static analysis staged, the type ignore comments on these lines need to be removed. You can run the type checker by running invoke types. Once you get past that stage we should be able to see if the databricks tests pass.

ensures the Databricks dialect uses backticks for special identifiers.
"""
# If it's already a quoted_name object, return as-is
if hasattr(sqlalchemy, "quoted_name") and hasattr(table_name, "__class__"): # type: ignore[truthy-function]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type ignore comment needs to be removed

return schema_name

# If it's already a quoted_name object, return as-is
if hasattr(sqlalchemy, "quoted_name") and hasattr(schema_name, "__class__"): # type: ignore[truthy-function]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type ignore comment needs to be removed

Copy link
Contributor

@NathanFarmer NathanFarmer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are getting closer, but we definitely need some end-to-end integration tests. You were pretty close before you completely changed the tests/integration/data_sources_and_expectations/data_sources/test_databricks_special_table_names.py file. Commit 8f9ab2b5b59241824afb391353158a229ee49034 was the last time I reviewed and the file was close then, you were just attempting to use fixtures that didn't exist. You should remove the context and databricks_datasource fixtures from those tests and then use DatabricksBatchTestSetup with DatabricksDatasourceTestConfig. You should be able to pass the special table names to either of those classes. Follow the established patterns in tests/integration/data_sources_and_expectations/data_sources/test_postgresql.py except pass the table_name.

Comment on lines 123 to 132
# Integration tests - NO @pytest.mark.databricks decorator
# These get their markers from fixture parametrization
def test_table_asset_integration_digit_start(self, request, data_context):
"""Integration test: table names starting with digits work end-to-end."""
# Skip ephemeral backend as it doesn't support these special table name features
if hasattr(request, "param") and "ephemeral" in str(request.param):
pytest.skip("Ephemeral backend doesn't support special table names")

# Skip if no real Databricks connection is available
pytest.skip("Integration test requires real Databricks connection")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, this test doesn't assert anything

Comment on lines 134 to 141
def test_table_asset_integration_special_chars(self, request, data_context):
"""Integration test: table names with special characters work end-to-end."""
# Skip ephemeral backend as it doesn't support these special table name features
if hasattr(request, "param") and "ephemeral" in str(request.param):
pytest.skip("Ephemeral backend doesn't support special table names")

# Skip if no real Databricks connection is available
pytest.skip("Integration test requires real Databricks connection")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does nothing

Comment on lines 143 to 150
def test_table_asset_integration_schema_and_table_special(self, request, data_context):
"""Integration test: both schema and table names with special characters work."""
# Skip ephemeral backend as it doesn't support these special table name features
if hasattr(request, "param") and "ephemeral" in str(request.param):
pytest.skip("Ephemeral backend doesn't support special table names")

# Skip if no real Databricks connection is available
pytest.skip("Integration test requires real Databricks connection")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test does nothing

Comment on lines 18 to 121
@pytest.mark.databricks
def test_needs_databricks_backticks_digit_start(self):
"""Test that table names starting with digits require backticks."""
assert DatabricksTableAsset._needs_databricks_backticks("247_asset_class_returns")
assert DatabricksTableAsset._needs_databricks_backticks("123_test_table")
assert DatabricksTableAsset._needs_databricks_backticks("9_column_table")

@pytest.mark.databricks
def test_needs_databricks_backticks_special_chars(self):
"""Test that table names with special characters require backticks."""
assert DatabricksTableAsset._needs_databricks_backticks("table with spaces")
assert DatabricksTableAsset._needs_databricks_backticks("table-with-hyphens")
assert DatabricksTableAsset._needs_databricks_backticks("table.with.dots")
assert DatabricksTableAsset._needs_databricks_backticks("table#with#hash")
assert DatabricksTableAsset._needs_databricks_backticks("table@with@at")

@pytest.mark.databricks
def test_needs_databricks_backticks_normal_names(self):
"""Test that normal table names don't require backticks."""
assert not DatabricksTableAsset._needs_databricks_backticks("normal_table")
assert not DatabricksTableAsset._needs_databricks_backticks("table_name")
assert not DatabricksTableAsset._needs_databricks_backticks("TableName")
assert not DatabricksTableAsset._needs_databricks_backticks("test_table_123")

@pytest.mark.databricks
def test_needs_databricks_backticks_already_quoted(self):
"""Test that already quoted names don't require additional backticks."""
assert not DatabricksTableAsset._needs_databricks_backticks("`247_asset_class_returns`")
assert not DatabricksTableAsset._needs_databricks_backticks("`table with spaces`")

@pytest.mark.databricks
def test_is_bracketed_by_quotes_true(self):
"""Test that names with backticks are detected correctly."""
assert DatabricksTableAsset._is_bracketed_by_quotes("`table_name`")
assert DatabricksTableAsset._is_bracketed_by_quotes("`247_test_table`")
assert DatabricksTableAsset._is_bracketed_by_quotes("`table with spaces`")

@pytest.mark.databricks
def test_is_bracketed_by_quotes_false(self):
"""Test that names without backticks are detected correctly."""
assert not DatabricksTableAsset._is_bracketed_by_quotes("table_name")
assert not DatabricksTableAsset._is_bracketed_by_quotes("247_test_table")
assert not DatabricksTableAsset._is_bracketed_by_quotes('"table_name"')
assert not DatabricksTableAsset._is_bracketed_by_quotes("'table_name'")

@pytest.mark.databricks
def test_resolve_quoted_name_digit_start(self):
"""Test that table names starting with digits get quoted_name objects."""
result = DatabricksTableAsset._resolve_quoted_name("247_asset_class_returns")
assert isinstance(result, quoted_name)
assert str(result) == "247_asset_class_returns"

@pytest.mark.databricks
def test_resolve_quoted_name_special_chars(self):
"""Test that table names with special chars get quoted_name objects."""
result = DatabricksTableAsset._resolve_quoted_name("table with spaces")
assert isinstance(result, quoted_name)
assert str(result) == "table with spaces"

result = DatabricksTableAsset._resolve_quoted_name("table-with-hyphens")
assert isinstance(result, quoted_name)
assert str(result) == "table-with-hyphens"

@pytest.mark.databricks
def test_resolve_quoted_name_normal_names(self):
"""Test that normal table names remain as strings."""
result = DatabricksTableAsset._resolve_quoted_name("normal_table")
assert isinstance(result, str)
assert result == "normal_table"

@pytest.mark.databricks
def test_resolve_quoted_name_already_quoted(self):
"""Test that already quoted names get their quotes stripped and become
quoted_name objects."""
result = DatabricksTableAsset._resolve_quoted_name("`247_asset_class_returns`")
assert isinstance(result, quoted_name)
assert str(result) == "247_asset_class_returns"

@pytest.mark.databricks
def test_resolve_quoted_schema_name_digit_start(self):
"""Test that schema names starting with digits get quoted_name objects."""
result = DatabricksTableAsset._resolve_quoted_schema_name("123_schema")
assert isinstance(result, quoted_name)
assert str(result) == "123_schema"

@pytest.mark.databricks
def test_resolve_quoted_schema_name_special_chars(self):
"""Test that schema names with special chars get quoted_name objects."""
result = DatabricksTableAsset._resolve_quoted_schema_name("schema with spaces")
assert isinstance(result, quoted_name)
assert str(result) == "schema with spaces"

@pytest.mark.databricks
def test_resolve_quoted_schema_name_normal_names(self):
"""Test that normal schema names remain as strings."""
result = DatabricksTableAsset._resolve_quoted_schema_name("normal_schema")
assert isinstance(result, str)
assert result == "normal_schema"

@pytest.mark.databricks
def test_resolve_quoted_schema_name_none(self):
"""Test that None schema names remain None."""
result = DatabricksTableAsset._resolve_quoted_schema_name(None)
assert result is None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are unit tests since they only test the inputs and outputs of this one method (you don't need a databricks connection to test that). They should be marked with @pytest.mark.unit and moved into a new file tests/datasource/fluent/data_asset/databricks_table_asset.py.

@fredwang1012
Copy link
Author

I mistakenly thought the tests were supposed to be for the changes made in the PR, not the end-to-end integration tests. I will change them back. Sorry about that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants