Skip to content

Commit

Permalink
Issue 5: Connection pool is being redefined when using journal and sn…
Browse files Browse the repository at this point in the history
…apshot store
  • Loading branch information
dvriend committed Sep 12, 2014
1 parent a834aff commit 99a0c2a
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 59 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package akka.persistence.jdbc.extension

import akka.actor.{ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider}
import akka.persistence.jdbc.common.PluginConfig
import scalikejdbc.{AutoSession, ConnectionPool}

object ScalikeExtension extends ExtensionId[ScalikeExtensionImpl] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): ScalikeExtensionImpl = new ScalikeExtensionImpl(system)

override def lookup() = ScalikeExtension
}

class ScalikeExtensionImpl(system: ExtendedActorSystem) extends Extension {
implicit val session = AutoSession
val cfg = PluginConfig(system)
val poolName = "akka-persistence-jdbc"

Class.forName(cfg.driverClassName)
ConnectionPool.singleton(cfg.url, cfg.username, cfg.password)
}
19 changes: 13 additions & 6 deletions src/main/scala/akka/persistence/jdbc/journal/Journals.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package akka.persistence.jdbc.journal

class PostgresqlSyncWriteJournal extends JdbcSyncWriteJournal with PostgresqlStatements
import akka.persistence.jdbc.extension.ScalikeExtension
import scalikejdbc.DBSession

class MysqlSyncWriteJournal extends JdbcSyncWriteJournal with MySqlStatements
trait GenericJdbcSyncWriteJournal extends JdbcSyncWriteJournal with GenericStatements {
override implicit val session: DBSession = ScalikeExtension(system).session
}

class H2SyncWriteJournal extends JdbcSyncWriteJournal with H2Statements
class PostgresqlSyncWriteJournal extends GenericJdbcSyncWriteJournal with PostgresqlStatements

class OracleSyncWriteJournal extends JdbcSyncWriteJournal with OracleStatements
class MysqlSyncWriteJournal extends GenericJdbcSyncWriteJournal with MySqlStatements

class MSSqlServerSyncWriteJournal extends JdbcSyncWriteJournal with MSSqlServerStatements
class H2SyncWriteJournal extends GenericJdbcSyncWriteJournal with H2Statements

class InformixSyncWriteJournal extends JdbcSyncWriteJournal with InformixStatements
class OracleSyncWriteJournal extends GenericJdbcSyncWriteJournal with OracleStatements

class MSSqlServerSyncWriteJournal extends GenericJdbcSyncWriteJournal with MSSqlServerStatements

class InformixSyncWriteJournal extends GenericJdbcSyncWriteJournal with InformixStatements
9 changes: 5 additions & 4 deletions src/main/scala/akka/persistence/jdbc/journal/Statements.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package akka.persistence.jdbc.journal

import akka.persistence.PersistentRepr
import akka.persistence.jdbc.common.{PluginConfig, ScalikeConnection}
import akka.persistence.jdbc.common.PluginConfig
import akka.persistence.jdbc.util.{Base64, EncodeDecode}
import scalikejdbc._

Expand All @@ -23,9 +23,10 @@ trait JdbcStatements {
def selectMessagesFor(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long): List[PersistentRepr]
}

trait GenericStatements extends JdbcStatements with ScalikeConnection with EncodeDecode {
implicit def executionContext: ExecutionContext
def cfg: PluginConfig
trait GenericStatements extends JdbcStatements with EncodeDecode {
implicit val executionContext: ExecutionContext
implicit val session: DBSession
val cfg: PluginConfig

val schema = cfg.journalSchemaName
val table = cfg.journalTableName
Expand Down
19 changes: 13 additions & 6 deletions src/main/scala/akka/persistence/jdbc/snapshot/SnapshotStores.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package akka.persistence.jdbc.snapshot

class PostgresqlSyncSnapshotStore extends JdbcSyncSnapshotStore with PostgresqlStatements
import akka.persistence.jdbc.extension.ScalikeExtension
import scalikejdbc.DBSession

class MysqlSyncSnapshotStore extends JdbcSyncSnapshotStore with MySqlStatements
trait GenericSyncSnapshotStore extends JdbcSyncSnapshotStore with GenericStatements {
override implicit val session: DBSession = ScalikeExtension(system).session
}

class H2SyncSnapshotStore extends JdbcSyncSnapshotStore with H2Statements
class PostgresqlSyncSnapshotStore extends GenericSyncSnapshotStore with PostgresqlStatements

class OracleSyncSnapshotStore extends JdbcSyncSnapshotStore with OracleStatements
class MysqlSyncSnapshotStore extends GenericSyncSnapshotStore with MySqlStatements

class MSSqlServerSyncSnapshotStore extends JdbcSyncSnapshotStore with MSSqlServerStatements
class H2SyncSnapshotStore extends GenericSyncSnapshotStore with H2Statements

class InformixSyncSnapshotStore extends JdbcSyncSnapshotStore with InformixStatements
class OracleSyncSnapshotStore extends GenericSyncSnapshotStore with OracleStatements

class MSSqlServerSyncSnapshotStore extends GenericSyncSnapshotStore with MSSqlServerStatements

class InformixSyncSnapshotStore extends GenericSyncSnapshotStore with InformixStatements
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package akka.persistence.jdbc.snapshot

import akka.persistence.{SelectedSnapshot, SnapshotSelectionCriteria, SnapshotMetadata}
import akka.persistence.jdbc.common.{PluginConfig, ScalikeConnection}
import akka.persistence.jdbc.common.PluginConfig
import akka.persistence.jdbc.util.{EncodeDecode, Base64}
import akka.persistence.serialization.Snapshot
import scalikejdbc._
Expand All @@ -17,9 +17,10 @@ trait JdbcStatements {
def selectSnapshotsFor(persistenceId: String, criteria: SnapshotSelectionCriteria): List[SelectedSnapshot]
}

trait GenericStatements extends JdbcStatements with ScalikeConnection with EncodeDecode {
implicit def executionContext: ExecutionContext
def cfg: PluginConfig
trait GenericStatements extends JdbcStatements with EncodeDecode {
implicit val executionContext: ExecutionContext
implicit val session: DBSession
val cfg: PluginConfig

val schema = cfg.snapshotSchemaName
val table = cfg.snapshotTableName
Expand Down
23 changes: 14 additions & 9 deletions src/test/scala/akka/persistence/jdbc/actor/ActorTest.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package akka.persistence.jdbc.actor

import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
import akka.persistence.jdbc.common.{PluginConfig, ScalikeConnection}
import akka.persistence.jdbc.common.PluginConfig
import akka.persistence.jdbc.extension.ScalikeExtension
import akka.persistence.jdbc.util._
import akka.persistence.{SnapshotOffer, PersistentActor, SaveSnapshotFailure, SaveSnapshotSuccess}
import akka.testkit.{TestKit, TestProbe}
import com.typesafe.config.ConfigFactory
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FlatSpecLike}

import scalikejdbc.DBSession

object TestActor {
case object Snap
Expand Down Expand Up @@ -52,10 +53,10 @@ class TestActor(testProbe: ActorRef) extends PersistentActor with ActorLogging {
}
}

trait ActorTest extends FlatSpecLike with BeforeAndAfterEach with BeforeAndAfterAll with ScalikeConnection with JdbcInit {
trait ActorTest extends FlatSpecLike with BeforeAndAfterEach with BeforeAndAfterAll with JdbcInit {
import TestActor._
implicit val system: ActorSystem
override def cfg: PluginConfig = PluginConfig(system)
val cfg = PluginConfig(system)

val testProbe = TestProbe()

Expand Down Expand Up @@ -125,12 +126,16 @@ trait ActorTest extends FlatSpecLike with BeforeAndAfterEach with BeforeAndAfter
}
}

class PostgresActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("postgres-application.conf"))) with ActorTest with PostgresqlJdbcInit
trait GenericActorTest extends ActorTest with GenericJdbcInit {
override implicit val session: DBSession = ScalikeExtension(system).session
}

class PostgresActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("postgres-application.conf"))) with GenericActorTest with PostgresqlJdbcInit

class OracleActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("oracle-application.conf"))) with ActorTest with OracleJdbcInit
class OracleActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("oracle-application.conf"))) with GenericActorTest with OracleJdbcInit

class MySqlActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("mysql-application.conf"))) with ActorTest with MysqlJdbcInit
class MySqlActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("mysql-application.conf"))) with GenericActorTest with MysqlJdbcInit

class H2ActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("h2-application.conf"))) with ActorTest with H2JdbcInit
class H2ActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("h2-application.conf"))) with GenericActorTest with H2JdbcInit

class InformixActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("informix-application.conf"))) with ActorTest with InformixJdbcInit
class InformixActorTest extends TestKit(ActorSystem("TestCluster", ConfigFactory.load("informix-application.conf"))) with GenericActorTest with InformixJdbcInit
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package akka.persistence.jdbc.journal

import akka.persistence.jdbc.common.{PluginConfig, ScalikeConnection}
import akka.persistence.jdbc.common.PluginConfig
import akka.persistence.jdbc.extension.ScalikeExtension
import akka.persistence.jdbc.util._
import akka.persistence.journal.LegacyJournalSpec
import com.typesafe.config.ConfigFactory
import scalikejdbc.DBSession

abstract class JdbcSyncJournalSpec extends LegacyJournalSpec with ScalikeConnection with JdbcInit {
override def cfg: PluginConfig = PluginConfig(system)
abstract class JdbcSyncJournalSpec extends LegacyJournalSpec with JdbcInit {
val cfg = PluginConfig(system)
lazy val config = ConfigFactory.load("application.conf")

override def beforeAll() {
Expand All @@ -20,22 +22,26 @@ abstract class JdbcSyncJournalSpec extends LegacyJournalSpec with ScalikeConnect
}
}

class H2SyncJournalSpec extends JdbcSyncJournalSpec with H2JdbcInit {
trait GenericJdbcJournalSpec extends JdbcSyncJournalSpec {
implicit val session: DBSession = ScalikeExtension(system).session
}

class H2SyncJournalSpec extends GenericJdbcJournalSpec with H2JdbcInit {
override lazy val config = ConfigFactory.load("h2-application.conf")
}

class PostgresqlSyncJournalSpec extends JdbcSyncJournalSpec with PostgresqlJdbcInit {
class PostgresqlSyncJournalSpec extends GenericJdbcJournalSpec with PostgresqlJdbcInit {
override lazy val config = ConfigFactory.load("postgres-application.conf")
}

class MysqlSyncJournalSpec extends JdbcSyncJournalSpec with MysqlJdbcInit {
class MysqlSyncJournalSpec extends GenericJdbcJournalSpec with MysqlJdbcInit {
override lazy val config = ConfigFactory.load("mysql-application.conf")
}

class OracleSyncJournalSpec extends JdbcSyncJournalSpec with OracleJdbcInit {
class OracleSyncJournalSpec extends GenericJdbcJournalSpec with OracleJdbcInit {
override lazy val config = ConfigFactory.load("oracle-application.conf")
}

class InformixSyncJournalSpec extends JdbcSyncJournalSpec with InformixJdbcInit {
class InformixSyncJournalSpec extends GenericJdbcJournalSpec with InformixJdbcInit {
override lazy val config = ConfigFactory.load("informix-application.conf")
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package akka.persistence.jdbc.snapshot

import akka.persistence.jdbc.extension.ScalikeExtension
import akka.persistence.{SaveSnapshotSuccess, SnapshotMetadata}
import akka.persistence.SnapshotProtocol.SaveSnapshot
import akka.persistence.jdbc.common.{PluginConfig, ScalikeConnection}
import akka.persistence.jdbc.common.PluginConfig
import akka.persistence.jdbc.util._
import akka.persistence.snapshot.SnapshotStoreSpec
import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory
import scalikejdbc.DBSession

trait JdbcSyncSnapshotStoreSpec extends SnapshotStoreSpec with ScalikeConnection with JdbcInit {
override def cfg: PluginConfig = PluginConfig(system)
trait JdbcSyncSnapshotStoreSpec extends SnapshotStoreSpec with JdbcInit {
val cfg = PluginConfig(system)
lazy val config = ConfigFactory.load("application.conf")

"The snapshot store must also" must {
Expand All @@ -34,22 +36,26 @@ trait JdbcSyncSnapshotStoreSpec extends SnapshotStoreSpec with ScalikeConnection
}
}

class H2JdbcSyncSnapshotStoreSpec extends JdbcSyncSnapshotStoreSpec with H2JdbcInit {
trait GenericSyncSnapshotStoreSpec extends JdbcSyncSnapshotStoreSpec {
implicit val session: DBSession = ScalikeExtension(system).session
}

class H2JdbcSyncSnapshotStoreSpec extends GenericSyncSnapshotStoreSpec with H2JdbcInit {
override lazy val config = ConfigFactory.load("h2-application.conf")
}

class PostgresqlJdbcSyncSnapshotStoreSpec extends JdbcSyncSnapshotStoreSpec with PostgresqlJdbcInit {
class PostgresqlJdbcSyncSnapshotStoreSpec extends GenericSyncSnapshotStoreSpec with PostgresqlJdbcInit {
override lazy val config = ConfigFactory.load("postgres-application.conf")
}

class MysqlSyncSnapshotStoreSpec extends JdbcSyncSnapshotStoreSpec with MysqlJdbcInit {
class MysqlSyncSnapshotStoreSpec extends GenericSyncSnapshotStoreSpec with MysqlJdbcInit {
override lazy val config = ConfigFactory.load("mysql-application.conf")
}

class OracleSyncSnapshotStoreSpec extends JdbcSyncSnapshotStoreSpec with OracleJdbcInit {
class OracleSyncSnapshotStoreSpec extends GenericSyncSnapshotStoreSpec with OracleJdbcInit {
override lazy val config = ConfigFactory.load("oracle-application.conf")
}

class InformixSyncSnapshotStoreSpec extends JdbcSyncSnapshotStoreSpec with InformixJdbcInit {
class InformixSyncSnapshotStoreSpec extends GenericSyncSnapshotStoreSpec with InformixJdbcInit {
override lazy val config = ConfigFactory.load("informix-application.conf")
}
5 changes: 2 additions & 3 deletions src/test/scala/akka/persistence/jdbc/util/JdbcInit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import scalikejdbc._
import scala.util.Try

trait JdbcInit {
implicit def session: DBSession

def createJournalTable(): Unit

def dropJournalTable(): Unit
Expand All @@ -22,7 +20,8 @@ trait JdbcInit {
}

trait GenericJdbcInit extends JdbcInit {
def cfg: PluginConfig
val cfg: PluginConfig
implicit val session: DBSession

def createJournalTable(): Unit = SQL( s"""
CREATE TABLE IF NOT EXISTS ${cfg.journalSchemaName}${cfg.journalTableName} (
Expand Down

0 comments on commit 99a0c2a

Please sign in to comment.