Skip to content

Commit 4dcdca8

Browse files
committed
Add more template options for the command line sink.
1 parent a522924 commit 4dcdca8

File tree

3 files changed

+110
-15
lines changed

3 files changed

+110
-15
lines changed

README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -991,7 +991,15 @@ The pipeline operation for this sink could look like this:
991991
tables = [
992992
{
993993
input.metastore.table = metastore_table
994-
output.cmd.line = "/my_apps/cmd_line_tool --path @dataPath --date @infoDate"
994+
# Supported substitutions:
995+
# - @dataPath - the path to generated data or to the original metastore table
996+
# - @partitionPath - the path to the partition corresponding to the information date being processed
997+
# - @bucket - the bucket of the table location if the output is on S3
998+
# - @prefix - the prefix on the bucket for tables located on S3
999+
# - @partitionPrefix - the prefix to the data for the information date currently being processed
1000+
# - @infoDate - the information date in yyyy-MM-dd format
1001+
# - @infoMonth - the information month in yyyy-MM format
1002+
output.cmd.line = "/my_apps/cmd_line_tool --path @dataPath --partition-path @partitionPath --date @infoDate"
9951003
9961004
## All following settings are OPTIONAL
9971005

pramen/core/src/main/scala/za/co/absa/pramen/core/sink/CmdLineSink.scala

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import com.typesafe.config.Config
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
2222
import org.slf4j.LoggerFactory
23-
import za.co.absa.pramen.api.{ExternalChannelFactory, MetastoreReader, Sink, SinkResult}
23+
import za.co.absa.pramen.api.{DataFormat, ExternalChannelFactory, MetaTableDef, MetastoreReader, Query, Sink, SinkResult}
2424
import za.co.absa.pramen.core.exceptions.CmdFailedException
2525
import za.co.absa.pramen.core.process.{ProcessRunner, ProcessRunnerImpl}
2626
import za.co.absa.pramen.core.sink.CmdLineSink.{CMD_LINE_KEY, CmdLineDataParams}
27-
import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils}
27+
import za.co.absa.pramen.core.utils.{ConfigUtils, FsUtils, SparkUtils}
2828

2929
import java.time.LocalDate
3030
import java.time.format.DateTimeFormatter
@@ -51,7 +51,7 @@ import scala.util.control.NonFatal
5151
*
5252
* Otherwise, the data can be accessed by the command line tool directly from the metastore.
5353
*
54-
* Example sink definition:
54+
* ==Example sink definition:==
5555
* {{{
5656
* {
5757
* name = "cmd_line"
@@ -73,6 +73,7 @@ import scala.util.control.NonFatal
7373
* Here is an example of a sink definition in a pipeline. As for any other operation you can specify
7474
* dependencies, transformations, filters and columns to select.
7575
*
76+
* ==Example operation:==
7677
* {{{
7778
* {
7879
* name = "Command Line sink"
@@ -154,15 +155,18 @@ class CmdLineSink(sinkConfig: Config,
154155

155156
log.info(s"$count records saved to $tempPath.")
156157

157-
val cmdLine = getCmdLine(cmdLineTemplate, Option(tempPath), infoDate)
158+
val cmdLine = getCmdLine(cmdLineTemplate, Option(tempPath), Option(tempPath), infoDate)
158159

159160
runCmd(cmdLine)
160161

161162
log.info(s"$count records sent to the cmd line sink ($cmdLine).")
162163
}
163164
SinkResult(count)
164165
case None =>
165-
val cmdLine = getCmdLine(cmdLineTemplate, None, infoDate)
166+
val metaTable = metastore.getTableDef(tableName)
167+
val (dataPath, partitionPath) = getPaths(metaTable, infoDate)
168+
169+
val cmdLine = getCmdLine(cmdLineTemplate, dataPath, partitionPath, infoDate)
166170

167171
runCmd(cmdLine)
168172

@@ -173,21 +177,80 @@ class CmdLineSink(sinkConfig: Config,
173177
}
174178
}
175179

180+
private[core] def getPaths(metaTable: MetaTableDef, infoDate: LocalDate): (Option[Path], Option[Path]) = {
181+
val basePathOpt = metaTable.format match {
182+
case DataFormat.Parquet(path, _) =>
183+
Option(path)
184+
case DataFormat.Delta(query, _) =>
185+
query match {
186+
case Query.Path(path) =>
187+
Option(path)
188+
case _ => None
189+
}
190+
case _ =>
191+
None
192+
}
193+
194+
basePathOpt match {
195+
case Some(basePath) =>
196+
(Option(new Path(basePath)), Option(SparkUtils.getPartitionPath(infoDate, metaTable.infoDateColumn, metaTable.infoDateFormat, basePath)))
197+
case None =>
198+
(None, None)
199+
}
200+
}
201+
176202
private[core] def getCmdLine(cmdLineTemplate: String,
177203
dataPath: Option[Path],
204+
partitionPath: Option[Path],
178205
infoDate: LocalDate): String = {
179206
log.info(s"CmdLine template: $cmdLineTemplate")
180207

181208
val cmdWithDates = cmdLineTemplate.replace("@infoDate", infoDate.toString)
182209
.replace("@infoMonth", infoDate.format(DateTimeFormatter.ofPattern("yyyy-MM")))
183210

184-
dataPath match {
211+
val cmdWithDataPath = dataPath match {
185212
case Some(path) =>
186-
cmdWithDates.replace("@dataPath", path.toString)
187-
.replace("@dataUri", path.toUri.toString)
213+
if (Option(path.toUri.getAuthority).isDefined) {
214+
val bucket = path.toUri.getAuthority
215+
val prefixOrg = path.toUri.getPath
216+
val prefix = if (prefixOrg.startsWith("/", 0))
217+
prefixOrg.substring(1)
218+
else
219+
prefixOrg
220+
221+
cmdWithDates
222+
.replace("@bucket", bucket)
223+
.replace("@prefix", prefix)
224+
.replace("@dataPath", path.toString)
225+
.replace("@dataUri", path.toUri.toString)
226+
} else {
227+
cmdWithDates.replace("@dataPath", path.toString)
228+
.replace("@dataUri", path.toUri.toString)
229+
}
188230
case None =>
189231
cmdWithDates
190232
}
233+
234+
partitionPath match {
235+
case Some(path) =>
236+
if (Option(path.toUri.getAuthority).isDefined) {
237+
val bucket = path.toUri.getAuthority
238+
val prefixOrg = path.toUri.getPath
239+
val prefix = if (prefixOrg.startsWith("/", 0))
240+
prefixOrg.substring(1)
241+
else
242+
prefixOrg
243+
244+
cmdWithDataPath
245+
.replace("@bucket", bucket)
246+
.replace("@partitionPrefix", prefix)
247+
.replace("@partitionPath", path.toString)
248+
} else {
249+
cmdWithDataPath.replace("@partitionPath", path.toString)
250+
}
251+
case None =>
252+
cmdWithDataPath
253+
}
191254
}
192255

193256
private[core] def runCmd(cmdLine: String): Unit = {

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/sink/CmdLineSinkSuite.scala

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ package za.co.absa.pramen.core.mocks.sink
1919
import com.typesafe.config.ConfigFactory
2020
import org.apache.hadoop.fs.Path
2121
import org.apache.spark.sql.DataFrame
22+
import org.mockito.Mockito.{mock, when}
2223
import org.scalatest.wordspec.AnyWordSpec
24+
import za.co.absa.pramen.api.{DataFormat, MetastoreReader}
25+
import za.co.absa.pramen.core.MetaTableDefFactory
2326
import za.co.absa.pramen.core.base.SparkTestBase
2427
import za.co.absa.pramen.core.exceptions.CmdFailedException
2528
import za.co.absa.pramen.core.fixtures.TempDirFixture
@@ -93,22 +96,43 @@ class CmdLineSinkSuite extends AnyWordSpec with SparkTestBase with TempDirFixtur
9396
}
9497

9598
"work without a temporary path" in {
96-
val (sink, _) = getUseCase(null, recordCountToReturn = Some(5))
99+
withTempDirectory("cmd_sink") { tempDir =>
100+
val metastoreReader = mock(classOf[MetastoreReader])
101+
val metatable = MetaTableDefFactory.getDummyMetaTableDef(name = "table1",
102+
format = DataFormat.Parquet(tempDir, None)
103+
)
104+
when(metastoreReader.getTableDef("table1")).thenReturn(metatable)
105+
106+
val (sink, _) = getUseCase(null, recordCountToReturn = Some(5))
97107

98-
val sinkResult = sink.send(exampleDf, "table1", null, infoDate, Map[String, String]("cmd.line" -> "dummy @infoDate"))
108+
val sinkResult = sink.send(exampleDf, "table1", metastoreReader, infoDate, Map[String, String]("cmd.line" -> "dummy @infoDate @partitionPath"))
99109

100-
assert(sinkResult.recordsSent == 5)
110+
assert(sinkResult.recordsSent == 5)
111+
}
101112
}
102113
}
103114

104115
"getCmdLine()" should {
105116
"replace variables with actual values" in {
106117
val (sink, _) = getUseCase()
118+
val dataPath = Some(new Path("/dummy/path"))
119+
val partitionPath = Some(new Path(s"/dummy/path/date=$infoDate"))
120+
121+
val cmdTemplate = "--data-path @dataPath --data-uri @dataUri --partition-path @partitionPath --info-date @infoDate --infoMonth @infoMonth"
122+
123+
assert(sink.getCmdLine(cmdTemplate, dataPath, partitionPath, infoDate) ==
124+
"--data-path /dummy/path --data-uri /dummy/path --partition-path /dummy/path/date=2021-12-28 --info-date 2021-12-28 --infoMonth 2021-12")
125+
}
126+
127+
"replace s3 variables with actual values" in {
128+
val (sink, _) = getUseCase()
129+
val dataPath = Some(new Path("s3a://my_bucket1/dummy/path"))
130+
val partitionPath = Some(new Path("s3a://my_bucket2/dummy/path/enceladus_info_date=2023-12-30"))
107131

108-
val cmdTemplate = "--data-path @dataPath --data-uri @dataUri --info-date @infoDate --infoMonth @infoMonth"
132+
val cmdTemplate = "--bucket @bucket --prefix @prefix --partition-prefix @partitionPrefix"
109133

110-
assert(sink.getCmdLine(cmdTemplate, Some(new Path("/dummy/path")), infoDate) ==
111-
"--data-path /dummy/path --data-uri /dummy/path --info-date 2021-12-28 --infoMonth 2021-12")
134+
assert(sink.getCmdLine(cmdTemplate, dataPath, partitionPath, infoDate) ==
135+
"--bucket my_bucket1 --prefix dummy/path --partition-prefix dummy/path/enceladus_info_date=2023-12-30")
112136
}
113137
}
114138

0 commit comments

Comments
 (0)