Skip to content

Commit d6ea6e6

Browse files
committed
created _add_default_options
1 parent ea6e07a commit d6ea6e6

File tree

2 files changed

+14
-9
lines changed

2 files changed

+14
-9
lines changed

metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,13 @@ def __init__(self, config: SQLCommonConfig, ctx: PipelineContext, platform: str)
352352
)
353353
self.report.sql_aggregator = self.aggregator.report
354354

355+
def _add_default_options(self, sql_config: SQLCommonConfig) -> None:
356+
"""Add default SQLAlchemy options. Can be overridden by subclasses to add additional defaults."""
357+
if sql_config.is_profiling_enabled():
358+
sql_config.options.setdefault(
359+
"max_overflow", sql_config.profiling.max_workers
360+
)
361+
355362
@classmethod
356363
def test_connection(cls, config_dict: dict) -> TestConnectionReport:
357364
test_report = TestConnectionReport()
@@ -521,10 +528,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
521528

522529
# Extra default SQLAlchemy option for better connection pooling and threading.
523530
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
524-
if sql_config.is_profiling_enabled():
525-
sql_config.options.setdefault(
526-
"max_overflow", sql_config.profiling.max_workers
527-
)
531+
self._add_default_options(sql_config)
528532

529533
for inspector in self.get_inspectors():
530534
profiler = None

metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,12 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
679679
if self.config.stateful_ingestion:
680680
self.config.stateful_ingestion.remove_stale_metadata = False
681681

682+
def _add_default_options(self, sql_config: SQLCommonConfig) -> None:
683+
"""Add Teradata-specific default options"""
684+
# Teradata does not support max_overflow, instead we use QueuePool when profiling
685+
if sql_config.is_profiling_enabled():
686+
sql_config.options.setdefault("poolclass", QueuePool)
687+
682688
@classmethod
683689
def create(cls, config_dict, ctx):
684690
config = TeradataConfig.parse_obj(config_dict)
@@ -707,11 +713,6 @@ def get_inspectors(self):
707713
# run on multiple databases.
708714
url = self.config.get_sql_alchemy_url()
709715

710-
# Teradata does not support max_overflow, instead we use QueuePool when profiling data
711-
if "max_overflow" in self.config.options:
712-
self.config.options.pop("max_overflow")
713-
self.config.options["poolclass"] = QueuePool
714-
715716
logger.debug(f"sql_alchemy_url={url}")
716717
engine = create_engine(url, **self.config.options)
717718
with engine.connect() as conn:

0 commit comments

Comments
 (0)