Skip to content

Commit 3914d7a

Browse files
committed
Loader: Create the database schema on startup (close #1266)
1 parent 5e88175 commit 3914d7a

File tree

6 files changed

+57
-6
lines changed

6 files changed

+57
-6
lines changed

modules/databricks-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/databricks/Databricks.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ object Databricks {
196196
case Statement.VacuumManifest => sql"""
197197
OPTIMIZE ${Fragment.const0(qualify(Manifest.Name))}
198198
ZORDER BY base"""
199+
case Statement.CreateDbSchema =>
200+
val schema = tgt.catalog.map(c => s"$c.${tgt.schema}").getOrElse(tgt.schema)
201+
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""
199202
}
200203

201204
private def qualify(tableName: String): String = tgt.catalog match {

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/Loader.scala

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
*/
1313
package com.snowplowanalytics.snowplow.rdbloader
1414

15+
import java.sql.SQLException
1516
import scala.concurrent.duration._
1617
import cats.{Apply, Monad, MonadThrow}
1718
import cats.implicits._
@@ -112,9 +113,15 @@ object Loader {
112113
val noOperationPrepare = NoOperation.prepare(config.schedules.noOperation, control.makePaused) *>
113114
Logging[F].info("No operation prepare step is completed")
114115

115-
val eventsTableInit = initRetry(createEventsTable[F, C](target)).onError { case t: Throwable =>
116-
Monitoring[F].alert(Alert.FailedToCreateEventsTable(t))
117-
} *> Logging[F].info("Events table initialization is completed")
116+
val dbSchemaInit = createDbSchema[F, C]
117+
.onError { case t: Throwable =>
118+
Monitoring[F].alert(Alert.FailedToCreateDatabaseSchema(t))
119+
} *> Logging[F].info("Database schema initialization is completed")
120+
121+
val eventsTableInit = createEventsTable[F, C](target)
122+
.onError { case t: Throwable =>
123+
Monitoring[F].alert(Alert.FailedToCreateEventsTable(t))
124+
} *> Logging[F].info("Events table initialization is completed")
118125

119126
val manifestInit = initRetry(Manifest.initialize[F, C, I](config.storage, target)).onError { case t: Throwable =>
120127
Monitoring[F].alert(Alert.FailedToCreateManifestTable(t))
@@ -123,7 +130,10 @@ object Loader {
123130
val addLoadTstamp = addLoadTstampColumn[F, C](config.featureFlags.addLoadTstampColumn, config.storage) *>
124131
Logging[F].info("Adding load_tstamp column is completed")
125132

126-
val init: F[I] = blockUntilReady *> noOperationPrepare *> eventsTableInit *> manifestInit *> addLoadTstamp *> initQuery[F, C, I](target)
133+
val init: F[I] =
134+
blockUntilReady *> noOperationPrepare *> dbSchemaInit *> eventsTableInit *> manifestInit *> addLoadTstamp *> initQuery[F, C, I](
135+
target
136+
)
127137

128138
val process = Stream.eval(init).flatMap { initQueryResult =>
129139
loading(initQueryResult)
@@ -281,8 +291,37 @@ object Loader {
281291
Monitoring[F].alert(Alert.FailedInitialConnection(t))
282292
}
283293

284-
private def createEventsTable[F[_]: Transaction[*[_], C], C[_]: DAO: Monad](target: Target[_]): F[Unit] =
285-
Transaction[F, C].transact(DAO[C].executeUpdate(target.getEventTable, DAO.Purpose.NonLoading).void)
294+
private def isSQLPermissionError(t: Throwable): Boolean =
295+
t match {
296+
case s: SQLException =>
297+
Option(s.getSQLState) match {
298+
case Some("42501") => true // Sql state for insufficient permissions for Databricks, Snowflake and Redshift
299+
case _ => false
300+
}
301+
case _ => false
302+
}
303+
304+
private def createEventsTable[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow: Logging](target: Target[_]): F[Unit] =
305+
Transaction[F, C].transact {
306+
DAO[C]
307+
.executeUpdate(target.getEventTable, DAO.Purpose.NonLoading)
308+
.void
309+
.recoverWith {
310+
case t: Throwable if isSQLPermissionError(t) =>
311+
Logging[C].warning(s"Failed to create events table due to permission error: ${getErrorMessage(t)}")
312+
}
313+
}
314+
315+
private def createDbSchema[F[_]: Transaction[*[_], C], C[_]: DAO: MonadThrow: Logging]: F[Unit] =
316+
Transaction[F, C].transact {
317+
DAO[C]
318+
.executeUpdate(Statement.CreateDbSchema, DAO.Purpose.NonLoading)
319+
.void
320+
.recoverWith {
321+
case t: Throwable if isSQLPermissionError(t) =>
322+
Logging[C].warning(s"Failed to create database schema due to permission error: ${getErrorMessage(t)}")
323+
}
324+
}
286325

287326
/**
288327
* Last level of failure handling, called when non-loading stream fail. Called on an application

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/db/Statement.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ object Statement {
119119
case class CreateTable(ddl: Fragment) extends Statement
120120
case class AlterTable(ddl: Fragment) extends Statement
121121
case class DdlFile(ddl: Fragment) extends Statement
122+
case object CreateDbSchema extends Statement
122123

123124
// Optimize (housekeeping i.e. vacuum in redshift, optimize in databricks)
124125
case object VacuumManifest extends Statement

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/rdbloader/dsl/Alert.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ object Alert {
3636
case class FailedInitialConnection(cause: Throwable) extends Alert
3737
case class FailedToCreateEventsTable(cause: Throwable) extends Alert
3838
case class FailedToCreateManifestTable(cause: Throwable) extends Alert
39+
case class FailedToCreateDatabaseSchema(cause: Throwable) extends Alert
3940
case class FailedHealthCheck(cause: Throwable) extends Alert
4041
case class FailedFolderMonitoring(cause: Throwable, numFailures: Int) extends Alert
4142

@@ -52,6 +53,7 @@ object Alert {
5253
case FailedInitialConnection(_) => None
5354
case FailedToCreateEventsTable(_) => None
5455
case FailedToCreateManifestTable(_) => None
56+
case FailedToCreateDatabaseSchema(_) => None
5557
case FailedHealthCheck(_) => None
5658
case FailedFolderMonitoring(_, _) => None
5759
}
@@ -74,6 +76,7 @@ object Alert {
7476
case FailedInitialConnection(_) => Severity.Error
7577
case FailedToCreateEventsTable(_) => Severity.Error
7678
case FailedToCreateManifestTable(_) => Severity.Error
79+
case FailedToCreateDatabaseSchema(_) => Severity.Error
7780
}
7881

7982
def getMessage(am: Alert): String = {
@@ -88,6 +91,7 @@ object Alert {
8891
case FailedInitialConnection(t) => show"Failed to get connection at startup: $t"
8992
case FailedToCreateEventsTable(t) => show"Failed to create events table: $t"
9093
case FailedToCreateManifestTable(t) => show"Failed to create manifest table: $t"
94+
case FailedToCreateDatabaseSchema(t) => show"Failed to create database schema: $t"
9195
case FailedHealthCheck(t) => show"DB failed health check: $t"
9296
case FailedFolderMonitoring(t, numFailures) => show"Folder monitoring failed $numFailures times in a row: $t"
9397
}

modules/redshift-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/redshift/Redshift.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,8 @@ object Redshift {
257257
case Statement.AddLoadTstampColumn =>
258258
sql"""ALTER TABLE ${Fragment.const0(EventsTable.withSchema(schema))}
259259
ADD COLUMN load_tstamp TIMESTAMP DEFAULT GETDATE() NULL"""
260+
case Statement.CreateDbSchema =>
261+
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""
260262

261263
case Statement.CreateTable(ddl) =>
262264
ddl

modules/snowflake-loader/src/main/scala/com/snowplowanalytics/snowplow/loader/snowflake/Snowflake.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ object Snowflake {
283283
case Statement.AddLoadTstampColumn =>
284284
sql"""ALTER TABLE ${Fragment.const0(EventsTable.withSchema(schema))}
285285
ADD COLUMN load_tstamp TIMESTAMP NULL"""
286+
case Statement.CreateDbSchema =>
287+
sql"""CREATE SCHEMA IF NOT EXISTS ${Fragment.const0(schema)}"""
286288

287289
case Statement.CreateTable(ddl) =>
288290
ddl

0 commit comments

Comments
 (0)