Skip to content

Commit

Permalink
Snowflake loader: make on_error continue when type of the incoming da…
Browse files Browse the repository at this point in the history
…ta is parquet (close #970)
  • Loading branch information
spenes committed Jul 18, 2022
1 parent ba9913b commit b0a5a1a
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 35 deletions.
6 changes: 0 additions & 6 deletions config/snowflake.config.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@
# The S3 path used as stage location
"location": "s3://bucket/monitoring/"
}
# Optional, the default value is "CONTINUE".
# Specifies the strategy when some error occurred during load operation.
# Possible values are CONTINUE, ABORT_STATEMENT.
# Detailed information can be found from the following link:
# https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions
"onError": "ABORT_STATEMENT"
# An optional host name that will take a priority over automatically derived
"jdbcHost": "acme.eu-central-1.snowflake.com"
# Optional, default method is 'NoCreds'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ object StorageTarget {
transformedStage: Option[Snowflake.Stage],
appName: String,
folderMonitoringStage: Option[Snowflake.Stage],
onError: Snowflake.OnError,
jdbcHost: Option[String],
loadAuthMethod: LoadAuthMethod) extends StorageTarget {

Expand Down Expand Up @@ -203,13 +202,6 @@ object StorageTarget {
}

object Snowflake {
// https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions
// As specified in the above link, CONTINUE and SKIP_FILE have same behavior for semi-structured
// data files such as JSON, Parquet, Avro. Therefore, SKIP_FILE isn't specified separately.
sealed trait OnError
case object Continue extends OnError
case object AbortStatement extends OnError

case class Stage(name: String, location: Option[S3.Folder])
}

Expand Down Expand Up @@ -349,19 +341,6 @@ object StorageTarget {
implicit def parameterStoreConfigDecoder: Decoder[ParameterStoreConfig] =
deriveDecoder[ParameterStoreConfig]

implicit def snowflakeOnErrorDecoder: Decoder[Snowflake.OnError] =
Decoder[String].map(_.toLowerCase.replace("_", "")).emap {
case "abortstatement" => Snowflake.AbortStatement.asRight
case "continue" => Snowflake.Continue.asRight
case other => s"$other cannot be used as onError type. Available choices: CONTINUE, ABORT_STATEMENT".asLeft
}

implicit def snowflakeOnErrorEncoder: Encoder[Snowflake.OnError] =
Encoder[String].contramap {
case Snowflake.Continue => "CONTINUE"
case Snowflake.AbortStatement => "ABORT_STATEMENT"
}

implicit def loadAuthMethodDecoder: Decoder[LoadAuthMethod] =
Decoder.instance { cur =>
val typeCur = cur.downField("type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ object ConfigSpec {
Some(exampleTransformedStage),
"Snowplow_OSS",
None,
StorageTarget.Snowflake.Continue,
None,
StorageTarget.LoadAuthMethod.NoCreds
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
"storage": {
"type": "snowflake",
"appName": "Snowplow_OSS",
"onError": "CONTINUE"
"loadAuthMethod": {
"type": "NoCreds"
"roleSessionName": "rdb_loader"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object Snowflake {
val frPath = Fragment.const0(s"@${qualify(stage.name)}/$afterStage/output=good/")
val frCopy = Fragment.const0(s"${qualify(EventsTable.MainName)}(${columnsForCopy(columns)})")
val frSelectColumns = Fragment.const0(columnsForSelect(columns))
val frOnError = Fragment.const0(s"ON_ERROR = ${tgt.onError.asJson.noSpaces}")
val frOnError = buildErrorFragment(typesInfo)
val frFileFormat = buildFileFormatFragment(typesInfo)
sql"""|COPY INTO $frCopy
|FROM (
Expand All @@ -165,7 +165,7 @@ object Snowflake {
val frCopy = Fragment.const0(s"${qualify(s.table)}($TempTableColumn)")
val frPath = Fragment.const0(s.path)
val frCredentials = loadAuthMethodFragment(s.tempCreds)
val frOnError = Fragment.const0(s"ON_ERROR = ${tgt.onError.asJson.noSpaces}")
val frOnError = buildErrorFragment(s.typesInfo)
val frFileFormat = buildFileFormatFragment(s.typesInfo)
sql"""|COPY INTO $frCopy
|FROM '$frPath' $frCredentials
Expand Down Expand Up @@ -263,10 +263,27 @@ object Snowflake {
case TypesInfo.WideRow(JSON, _) =>
Fragment.const0("FILE_FORMAT = (TYPE = JSON TIMESTAMP_FORMAT = 'YYYY-MM-DD HH24:MI:SS.FF')")
case TypesInfo.WideRow(PARQUET, _) =>
Fragment.const0("FILE_FORMAT = (TYPE = PARQUET")
Fragment.const0("FILE_FORMAT = (TYPE = PARQUET)")
}
}

/**
* Build ON_ERROR fragment according to the file format.
* If file format is JSON, ON_ERROR will be ABORT_STATEMENT.
* If file format is PARQUET, ON_ERROR will be 'CONTINUE'.
* This is because loading Parquet transformed file with ABORT_STATEMENT fails
* due to empty files in the transformed folder.
*/
private def buildErrorFragment(typesInfo: TypesInfo): Fragment =
typesInfo match {
case TypesInfo.Shredded(_) =>
throw new IllegalStateException("Shredded type is not supported for Snowflake")
case TypesInfo.WideRow(JSON, _) =>
Fragment.const0(s"ON_ERROR = ABORT_STATEMENT")
case TypesInfo.WideRow(PARQUET, _) =>
Fragment.const0(s"ON_ERROR = CONTINUE")
}

private def loadAuthMethodFragment(loadAuthMethod: LoadAuthMethod): Fragment =
loadAuthMethod match {
case LoadAuthMethod.NoCreds =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class ConfigSpec extends Specification {
val storage = exampleSnowflake
.copy(password = StorageTarget.PasswordConfig.EncryptedKey(StorageTarget.EncryptedConfig(StorageTarget.ParameterStoreConfig("snowplow.snowflake.password"))))
.copy(jdbcHost = Some("acme.eu-central-1.snowflake.com"))
.copy(onError = StorageTarget.Snowflake.AbortStatement)
.copy(folderMonitoringStage = Some(StorageTarget.Snowflake.Stage("snowplow_folders_stage", Some(S3.Folder.coerce("s3://bucket/monitoring/")))))
.copy(transformedStage = Some(StorageTarget.Snowflake.Stage("snowplow_stage", Some(S3.Folder.coerce("s3://bucket/transformed/")))))
val result = getConfig("/snowflake.config.reference.hocon", Config.fromString[IO])
Expand Down Expand Up @@ -58,7 +57,10 @@ class ConfigSpec extends Specification {
defaultMonitoring,
exampleQueueName,
None,
exampleSnowflake.copy(transformedStage = Some(StorageTarget.Snowflake.Stage("snowplow_stage", Some(S3.Folder.coerce("s3://bucket/transformed/"))))),
exampleSnowflake
.copy(
transformedStage = Some(StorageTarget.Snowflake.Stage("snowplow_stage", Some(S3.Folder.coerce("s3://bucket/transformed/"))))
),
emptySchedules,
exampleTimeouts,
exampleRetries.copy(cumulativeBound = None),
Expand All @@ -81,7 +83,6 @@ class ConfigSpec extends Specification {
transformedStage = None,
appName = "Snowplow_OSS",
folderMonitoringStage = None,
onError = StorageTarget.Snowflake.Continue,
jdbcHost = None,
StorageTarget.LoadAuthMethod.NoCreds)
exampleSnowflake.host must beRight("acme.snowflakecomputing.com")
Expand Down

0 comments on commit b0a5a1a

Please sign in to comment.