Skip to content

Commit

Permalink
Merge branch 'mwkohout-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
dvriend committed Sep 16, 2014
2 parents 3e6e21f + 473d8ab commit 042ab7a
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 113 deletions.
26 changes: 20 additions & 6 deletions src/main/scala/akka/persistence/jdbc/snapshot/Statements.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package akka.persistence.jdbc.snapshot

import akka.persistence.{SelectedSnapshot, SnapshotSelectionCriteria, SnapshotMetadata}
import akka.persistence.jdbc.common.PluginConfig
import akka.persistence.jdbc.util.{EncodeDecode, Base64}
import akka.persistence.jdbc.util.{Base64, EncodeDecode}
import akka.persistence.serialization.Snapshot
import akka.persistence.{SelectedSnapshot, SnapshotMetadata, SnapshotSelectionCriteria}
import scalikejdbc._

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -45,9 +45,9 @@ trait GenericStatements extends JdbcStatements with EncodeDecode {
SQL(s"SELECT * FROM $schema$table WHERE persistence_id = ? AND sequence_nr <= ? ORDER BY sequence_nr DESC")
.bind(persistenceId, criteria.maxSequenceNr)
.map { rs =>
SelectedSnapshot(SnapshotMetadata(rs.string("persistence_id"), rs.long("sequence_nr"), rs.long("created")),
SelectedSnapshot(SnapshotMetadata(rs.string("persistence_id"), rs.long("sequence_nr"), rs.long("created")),
Snapshot.fromBytes(Base64.decodeBinary(rs.string("snapshot"))).data)
}
}
.list()
.apply()
.filterNot(snap => snap.metadata.timestamp > criteria.maxTimestamp)
Expand All @@ -59,10 +59,24 @@ trait MySqlStatements extends GenericStatements

trait H2Statements extends GenericStatements

trait OracleStatements extends GenericStatements
trait OracleStatements extends GenericStatements {
override def writeSnapshot(metadata: SnapshotMetadata, snapshot: Snapshot): Unit = {
val snapshotData = Base64.encodeString(Snapshot.toBytes(snapshot))
import metadata._

SQL( s"""MERGE INTO $schema$table snapshot
USING (SELECT {persistenceId} AS persistence_id, {sequenceNr} AS seq_nr from DUAL) val
ON (snapshot.persistence_id = val.persistence_id and snapshot.sequence_nr = val.seq_nr)
WHEN MATCHED THEN
UPDATE SET snapshot={snap}
WHEN NOT MATCHED THEN
INSERT (PERSISTENCE_ID, SEQUENCE_NR, SNAPSHOT, CREATED) VALUES ({persistenceId}, {sequenceNr}, {snap}, {created})""")
.bindByName('persistenceId -> persistenceId, 'sequenceNr -> sequenceNr, 'created -> timestamp, 'snap -> snapshotData).execute().apply
}
}

trait MSSqlServerStatements extends GenericStatements

trait DB2Statements extends GenericStatements

trait InformixStatements extends GenericStatements
trait InformixStatements extends GenericStatements
4 changes: 4 additions & 0 deletions src/test/scala/akka/persistence/jdbc/actor/ActorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ trait ActorTest extends FlatSpecLike with BeforeAndAfterEach with BeforeAndAfter


override protected def beforeAll(): Unit = {
dropJournalTable()
createJournalTable()
dropSnapshotTable()
createSnapshotTable()
super.beforeAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ abstract class JdbcSyncJournalSpec extends LegacyJournalSpec with JdbcInit {
override def beforeAll() {
dropJournalTable()
createJournalTable()
dropSnapshotTable()
createSnapshotTable()
super.beforeAll()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ trait JdbcSyncSnapshotStoreSpec extends SnapshotStoreSpec with JdbcInit {
}

override def beforeAll() {
dropJournalTable()
createJournalTable()
dropSnapshotTable()
createSnapshotTable()
super.beforeAll()
Expand Down
248 changes: 141 additions & 107 deletions src/test/scala/akka/persistence/jdbc/util/JdbcInit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,136 +6,170 @@ import scalikejdbc._
import scala.util.Try

trait JdbcInit {
def createJournalTable(): Unit
def createJournalTable(): Try[Int]

def dropJournalTable(): Unit
def dropJournalTable(): Try[Int]

def clearJournalTable(): Unit
def clearJournalTable(): Try[Int]

def createSnapshotTable(): Unit
def createSnapshotTable(): Try[Int]

def dropSnapshotTable(): Unit
def dropSnapshotTable(): Try[Int]

def clearSnapshotTable(): Unit
def clearSnapshotTable(): Try[Int]
}

trait GenericJdbcInit extends JdbcInit {
val cfg: PluginConfig
implicit val session: DBSession

def createJournalTable(): Unit = SQL( s"""
CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply

def dropJournalTable(): Unit = SQL(s"DROP TABLE IF EXISTS ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply

def clearJournalTable(): Unit = SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply

def createSnapshotTable(): Unit = SQL( s"""
CREATE TABLE IF NOT EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply

def dropSnapshotTable(): Unit = SQL(s"DROP TABLE IF EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
def createJournalTable() = Try {
SQL( s"""CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply
}

def clearSnapshotTable(): Unit = SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
def dropJournalTable() = Try {
SQL(s"DROP TABLE IF EXISTS ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

def clearJournalTable() = Try {
SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

def createSnapshotTable() = Try {
SQL(s"""CREATE TABLE IF NOT EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply
}

def dropSnapshotTable() = Try {
SQL(s"DROP TABLE IF EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
}

def clearSnapshotTable() = Try {
SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
}
}

trait PostgresqlJdbcInit extends GenericJdbcInit

trait H2JdbcInit extends GenericJdbcInit

trait MysqlJdbcInit extends GenericJdbcInit {
override def createJournalTable(): Unit = SQL(s"""
CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply

override def dropJournalTable(): Unit = SQL(s"DROP TABLE IF EXISTS ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply

override def clearJournalTable(): Unit = SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply

override def createSnapshotTable(): Unit = SQL(s"""
CREATE TABLE IF NOT EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply

override def dropSnapshotTable(): Unit = SQL(s"DROP TABLE IF EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply

override def clearSnapshotTable(): Unit = SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
override def createJournalTable() = Try {
SQL( s"""CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
marker VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply
}

override def dropJournalTable() = Try {
SQL(s"DROP TABLE IF EXISTS ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

override def clearJournalTable() = Try {
SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

override def createSnapshotTable() = Try {
SQL( s"""CREATE TABLE IF NOT EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr BIGINT NOT NULL,
snapshot TEXT NOT NULL,
created BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply
}

override def dropSnapshotTable() = Try {
SQL(s"DROP TABLE IF EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
}

override def clearSnapshotTable() = Try {
SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
}
}

trait OracleJdbcInit extends GenericJdbcInit {
override def createJournalTable(): Unit = SQL(s"""
CREATE TABLE ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number NUMERIC NOT NULL,
marker VARCHAR(255) NOT NULL,
message CLOB NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply

override def dropJournalTable(): Unit = SQL(s"DROP TABLE ${cfg.journalSchemaName}${cfg.journalTableName} CASCADE CONSTRAINT").update().apply

override def clearJournalTable(): Unit = SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply

override def createSnapshotTable(): Unit = SQL(s"""
CREATE TABLE ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr NUMERIC NOT NULL,
snapshot CLOB NOT NULL,
created NUMERIC NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply

override def dropSnapshotTable(): Unit = SQL(s"DROP TABLE ${cfg.snapshotSchemaName}${cfg.snapshotTableName} CASCADE CONSTRAINT").update().apply

override def clearSnapshotTable(): Unit = SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
override def createJournalTable() = Try {
SQL( s"""CREATE TABLE ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number NUMERIC NOT NULL,
marker VARCHAR(255) NOT NULL,
message CLOB NOT NULL,
created TIMESTAMP NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply
}

override def dropJournalTable() = Try {
SQL(s"DROP TABLE ${cfg.journalSchemaName}${cfg.journalTableName} CASCADE CONSTRAINT").update().apply
}

override def clearJournalTable() = Try {
SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

override def createSnapshotTable() = Try {
SQL( s"""CREATE TABLE ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr NUMERIC NOT NULL,
snapshot CLOB NOT NULL,
created NUMERIC NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply
}

override def dropSnapshotTable() = Try {
SQL(s"DROP TABLE ${cfg.snapshotSchemaName}${cfg.snapshotTableName} CASCADE CONSTRAINT").update().apply
}

override def clearSnapshotTable() = Try {
SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
}
}

trait InformixJdbcInit extends GenericJdbcInit {
override def createJournalTable(): Unit = SQL(s"""
CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number NUMERIC NOT NULL,
marker VARCHAR(255) NOT NULL,
message CLOB NOT NULL,
created DATETIME YEAR TO FRACTION(5) NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply

override def dropJournalTable(): Unit =
Try(SQL(s"DROP TABLE ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply).recover {
case ex: Exception => createJournalTable()
}

override def clearJournalTable(): Unit = SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply

override def createSnapshotTable(): Unit = SQL( s"""
CREATE TABLE IF NOT EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr NUMERIC NOT NULL,
snapshot CLOB NOT NULL,
created NUMERIC NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply

override def dropSnapshotTable(): Unit =
Try(SQL(s"DROP TABLE ${cfg.snapshotSchemaName}${cfg.snapshotTableName}").update().apply).recover {
case ex: Exception => createSnapshotTable()
}

override def clearSnapshotTable(): Unit = SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
override def createJournalTable() = Try {
SQL( s"""CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_number NUMERIC NOT NULL,
marker VARCHAR(255) NOT NULL,
message CLOB NOT NULL,
created DATETIME YEAR TO FRACTION(5) NOT NULL,
PRIMARY KEY(persistence_id, sequence_number))""").update().apply
}

override def dropJournalTable() = Try {
SQL(s"DROP TABLE ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

override def clearJournalTable() = Try {
SQL(s"DELETE FROM ${cfg.journalSchemaName}${cfg.journalTableName}").update().apply
}

override def createSnapshotTable() = Try {
SQL( s"""CREATE TABLE IF NOT EXISTS ${cfg.snapshotSchemaName}${cfg.snapshotTableName} (
persistence_id VARCHAR(255) NOT NULL,
sequence_nr NUMERIC NOT NULL,
snapshot CLOB NOT NULL,
created NUMERIC NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr))""").update().apply
}

override def dropSnapshotTable() = Try {
SQL(s"DROP TABLE ${cfg.snapshotSchemaName}${cfg.snapshotTableName}").update().apply
}

override def clearSnapshotTable() = Try {
SQL(s"DELETE FROM ${cfg.snapshotSchemaName}${cfg.snapshotTableName} ").update().apply
}
}

0 comments on commit 042ab7a

Please sign in to comment.