Skip to content

Commit

Permalink
Databricks Loader: allow any character in catalog name (close #1288)
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Jul 11, 2023
1 parent 3182e8c commit 1c3c7c5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,15 @@ object Databricks {
OPTIMIZE ${Fragment.const0(qualify(Manifest.Name))}
ZORDER BY base"""
case Statement.CreateDbSchema =>
val schema = tgt.catalog.map(c => s"$c.${tgt.schema}").getOrElse(tgt.schema)
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(qualifySchemaName)}"""
}

private def qualify(tableName: String): String = tgt.catalog match {
case Some(catalog) => s"${catalog}.${tgt.schema}.$tableName"
case None => s"${tgt.schema}.$tableName"
private def qualify(tableName: String): String =
s"$qualifySchemaName.$tableName"

private def qualifySchemaName: String = tgt.catalog match {
case Some(c) => s"`$c`.${tgt.schema}"
case None => s"${tgt.schema}"
}
}
Right(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,37 @@ class DatabricksSpec extends Specification {
)
}
}

"surround catalog name with backquotes" in {
val toCopy = ColumnsToCopy(
List(
ColumnName("app_id"),
ColumnName("unstruct_event_com_acme_aaa_1"),
ColumnName("contexts_com_acme_xxx_1")
)
)
val toSkip = ColumnsToSkip(List())
val statement =
Statement.EventsCopy(
baseFolder,
Compression.Gzip,
toCopy,
toSkip,
TypesInfo.WideRow(PARQUET, List.empty),
LoadAuthMethod.NoCreds,
()
)

val testTarget = Databricks
.build(targetConfig.copy(storage = targetConfig.storage.copy(catalog = Some("test_catalog"))))
.right
.get
testTarget.toFragment(statement).toString must beLike { case sql =>
sql must contain(
"COPY INTO `test_catalog`.snowplow.events"
)
}
}
}
}

Expand All @@ -162,38 +193,38 @@ object DatabricksSpec {
val baseFolder: BlobStorage.Folder =
BlobStorage.Folder.coerce("s3://somewhere/path")

val targetConfig: Config[StorageTarget.Databricks] = Config(
StorageTarget.Databricks(
"host",
None,
"snowplow",
443,
"some/path",
StorageTarget.PasswordConfig.PlainText("xxx"),
None,
"useragent",
StorageTarget.LoadAuthMethod.NoCreds,
2.days,
logLevel = 3
),
Config.Cloud.AWS(
Region("eu-central-1"),
Config.Cloud.AWS.SQS("my-queue.fifo", Some(Region("eu-central-1")))
),
None,
Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None),
None,
Config.Schedules(Nil),
Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute, 30.seconds),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.FeatureFlags(addLoadTstampColumn = true),
exampleTelemetry
)

val target: Target[Unit] = Databricks
.build(
Config(
StorageTarget.Databricks(
"host",
None,
"snowplow",
443,
"some/path",
StorageTarget.PasswordConfig.PlainText("xxx"),
None,
"useragent",
StorageTarget.LoadAuthMethod.NoCreds,
2.days,
logLevel = 3
),
Config.Cloud.AWS(
Region("eu-central-1"),
Config.Cloud.AWS.SQS("my-queue.fifo", Some(Region("eu-central-1")))
),
None,
Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None),
None,
Config.Schedules(Nil),
Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute, 30.seconds),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.Retries(Config.Strategy.Constant, None, 1.minute, None),
Config.FeatureFlags(addLoadTstampColumn = true),
exampleTelemetry
)
)
.build(targetConfig)
.right
.get

Expand Down

0 comments on commit 1c3c7c5

Please sign in to comment.