diff --git a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala index d845709e8..d6e2df3f0 100644 --- a/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala +++ b/modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala @@ -197,13 +197,15 @@ object Databricks { OPTIMIZE ${Fragment.const0(qualify(Manifest.Name))} ZORDER BY base""" case Statement.CreateDbSchema => - val schema = tgt.catalog.map(c => s"$c.${tgt.schema}").getOrElse(tgt.schema) - sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}""" + sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(qualifySchemaName)}""" } - private def qualify(tableName: String): String = tgt.catalog match { - case Some(catalog) => s"${catalog}.${tgt.schema}.$tableName" - case None => s"${tgt.schema}.$tableName" + private def qualify(tableName: String): String = + s"$qualifySchemaName.$tableName" + + private def qualifySchemaName: String = tgt.catalog match { + case Some(c) => s"`$c`.${tgt.schema}" + case None => s"${tgt.schema}" } } Right(result) diff --git a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala index 7d58e825a..fbaf51a78 100644 --- a/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala +++ b/modules/databricks-loader/src/test/scala/com/snowplowanalytics/snowplow/loader/databricks/DatabricksSpec.scala @@ -154,6 +154,37 @@ class DatabricksSpec extends Specification { ) } } + + "surround catalog name with backquotes" in { + val toCopy = ColumnsToCopy( + List( + ColumnName("app_id"), + ColumnName("unstruct_event_com_acme_aaa_1"), + ColumnName("contexts_com_acme_xxx_1") + ) + ) + val toSkip = ColumnsToSkip(List()) + val statement = + Statement.EventsCopy( + baseFolder, + Compression.Gzip, + toCopy, + toSkip, + TypesInfo.WideRow(PARQUET, List.empty), + LoadAuthMethod.NoCreds, + () + ) + + val testTarget = Databricks + .build(targetConfig.copy(storage = targetConfig.storage.copy(catalog = Some("test_catalog")))) + .right + .get + testTarget.toFragment(statement).toString must beLike { case sql => + sql must contain( + "COPY INTO `test_catalog`.snowplow.events" + ) + } + } } } @@ -162,38 +193,38 @@ object DatabricksSpec { val baseFolder: BlobStorage.Folder = BlobStorage.Folder.coerce("s3://somewhere/path") + val targetConfig: Config[StorageTarget.Databricks] = Config( + StorageTarget.Databricks( + "host", + None, + "snowplow", + 443, + "some/path", + StorageTarget.PasswordConfig.PlainText("xxx"), + None, + "useragent", + StorageTarget.LoadAuthMethod.NoCreds, + 2.days, + logLevel = 3 + ), + Config.Cloud.AWS( + Region("eu-central-1"), + Config.Cloud.AWS.SQS("my-queue.fifo", Some(Region("eu-central-1"))) + ), + None, + Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None), + None, + Config.Schedules(Nil), + Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute, 30.seconds), + Config.Retries(Config.Strategy.Constant, None, 1.minute, None), + Config.Retries(Config.Strategy.Constant, None, 1.minute, None), + Config.Retries(Config.Strategy.Constant, None, 1.minute, None), + Config.FeatureFlags(addLoadTstampColumn = true), + exampleTelemetry + ) + val target: Target[Unit] = Databricks - .build( - Config( - StorageTarget.Databricks( - "host", - None, - "snowplow", - 443, - "some/path", - StorageTarget.PasswordConfig.PlainText("xxx"), - None, - "useragent", - StorageTarget.LoadAuthMethod.NoCreds, - 2.days, - logLevel = 3 - ), - Config.Cloud.AWS( - Region("eu-central-1"), - Config.Cloud.AWS.SQS("my-queue.fifo", Some(Region("eu-central-1"))) - ), - None, - Config.Monitoring(None, None, Config.Metrics(None, None, 1.minute), None, None, None), - None, - Config.Schedules(Nil), - Config.Timeouts(1.minute, 1.minute, 1.minute, 1.minute, 30.seconds), - Config.Retries(Config.Strategy.Constant, None, 1.minute, None), - Config.Retries(Config.Strategy.Constant, None, 1.minute, None), - Config.Retries(Config.Strategy.Constant, None, 1.minute, None), - Config.FeatureFlags(addLoadTstampColumn = true), - exampleTelemetry - ) - ) + .build(targetConfig) .right .get