Skip to content

Commit ea21b3f

Browse files
Create sink only once (#528)
* Create sink only once * Create sources only once as well
1 parent fe94be8 commit ea21b3f

File tree

1 file changed

+4
-22
lines changed

1 file changed

+4
-22
lines changed

pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,9 @@ class OperationSplitter(conf: Config,
5656
sourceTables: Seq[SourceTable])(implicit spark: SparkSession): Seq[Job] = {
5757
val specialCharacters = conf.getString(SPECIAL_CHARACTERS_IN_COLUMN_NAMES)
5858
val temporaryDirectory = ConfigUtils.getOptionString(conf, TEMPORARY_DIRECTORY_KEY)
59-
val sourceBase = SourceManager.getSourceByName(sourceName, conf, None)
6059

6160
sourceTables.map(sourceTable => {
62-
val source = sourceTable.overrideConf match {
63-
case Some(confOverride) => SourceManager.getSourceByName(sourceName, conf, Some(confOverride))
64-
case None => sourceBase
65-
}
61+
val source = SourceManager.getSourceByName(sourceName, conf, sourceTable.overrideConf)
6662

6763
val disableCountQuery = ConfigUtils.getOptionBoolean(source.config, DISABLE_COUNT_QUERY).getOrElse(false)
6864
val outputTable = metastore.getTableDef(sourceTable.metaTableName)
@@ -85,19 +81,10 @@ class OperationSplitter(conf: Config,
8581
tables: Seq[TransferTable])(implicit spark: SparkSession): Seq[Job] = {
8682
val specialCharacters = conf.getString(SPECIAL_CHARACTERS_IN_COLUMN_NAMES)
8783
val temporaryDirectory = ConfigUtils.getOptionString(conf, TEMPORARY_DIRECTORY_KEY)
88-
val sourceBase = SourceManager.getSourceByName(sourceName, conf, None)
89-
val sinkBase = SinkManager.getSinkByName(sinkName, conf, None)
9084

9185
tables.map(transferTable => {
92-
val source = transferTable.sourceOverrideConf match {
93-
case Some(confOverride) => SourceManager.getSourceByName(sourceName, conf, Some(confOverride))
94-
case None => sourceBase
95-
}
96-
97-
val sink = transferTable.sinkOverrideConf match {
98-
case Some(confOverride) => SinkManager.getSinkByName(sinkName, conf, Some(confOverride))
99-
case None => sinkBase
100-
}
86+
val source = SourceManager.getSourceByName(sourceName, conf, transferTable.sourceOverrideConf)
87+
val sink = SinkManager.getSinkByName(sinkName, conf, transferTable.sinkOverrideConf)
10188

10289
val disableCountQuery = ConfigUtils.getOptionBoolean(source.config, DISABLE_COUNT_QUERY).getOrElse(false)
10390
val outputTable = TransferTableParser.getMetaTable(transferTable)
@@ -146,15 +133,10 @@ class OperationSplitter(conf: Config,
146133
sinkName: String,
147134
sinkTables: Seq[SinkTable])
148135
(implicit spark: SparkSession): Seq[Job] = {
149-
val sinkBase = SinkManager.getSinkByName(sinkName, conf, None)
150-
151136
sinkTables.map(sinkTable => {
152137
val inputTable = metastore.getTableDef(sinkTable.metaTableName)
153138

154-
val sink = sinkTable.overrideConf match {
155-
case Some(confOverride) => SinkManager.getSinkByName(sinkName, conf, Some(confOverride))
156-
case None => sinkBase
157-
}
139+
val sink = SinkManager.getSinkByName(sinkName, conf, sinkTable.overrideConf)
158140

159141
val outputTableName = sinkTable.outputTableName.getOrElse(s"${sinkTable.metaTableName}->$sinkName")
160142

0 commit comments

Comments
 (0)