Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java API in PersistenceQuery - add new methods that don't require the unused class param #1768

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/src/main/paradox/persistence-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Java
: @@snip [PersistenceQueryDocTest.java](/docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java) { #basic-usage }

Journal implementers are encouraged to put this identifier in a variable known to the user, such that one can access it via
@scala[@scaladoc[readJournalFor[NoopJournal](NoopJournal.identifier)](pekko.persistence.query.PersistenceQuery#readJournalFor[T%3C:org.apache.pekko.persistence.query.scaladsl.ReadJournal](readJournalPluginId:String):T)]@java[@javadoc[getJournalFor(NoopJournal.class, NoopJournal.identifier)](pekko.persistence.query.PersistenceQuery#getReadJournalFor(java.lang.Class,java.lang.String))], however this is not enforced.
@scala[@scaladoc[readJournalFor[NoopJournal](NoopJournal.identifier)](pekko.persistence.query.PersistenceQuery#readJournalFor[T%3C:org.apache.pekko.persistence.query.scaladsl.ReadJournal](readJournalPluginId:String):T)]@java[@javadoc[getJournalFor(NoopJournal.class, NoopJournal.identifier)](pekko.persistence.query.PersistenceQuery#getReadJournalFor(java.lang.String))], however this is not enforced.

### Predefined queries

Expand Down
24 changes: 8 additions & 16 deletions docs/src/test/java/jdocs/persistence/PersistenceQueryDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ void demonstrateBasicUsage() {
// obtain read journal by plugin id
final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// issue query to journal
Source<EventEnvelope, NotUsed> source =
Expand All @@ -230,8 +229,7 @@ void demonstrateBasicUsage() {
void demonstrateAllPersistenceIdsLive() {
final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #all-persistence-ids-live
readJournal.persistenceIds();
Expand All @@ -243,8 +241,7 @@ void demonstrateNoRefresh() {

final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #all-persistence-ids-snap
readJournal.currentPersistenceIds();
Expand All @@ -256,8 +253,7 @@ void demonstrateRefresh() {

final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #events-by-persistent-id
readJournal.eventsByPersistenceId("user-us-1337", 0L, Long.MAX_VALUE);
Expand All @@ -269,8 +265,7 @@ void demonstrateEventsByTag() {

final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #events-by-tag
// assuming journal is able to work with numeric offsets we can:
Expand Down Expand Up @@ -302,8 +297,7 @@ void demonstrateMaterializedQueryValues() {

final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #advanced-journal-query-usage

Expand Down Expand Up @@ -347,8 +341,7 @@ void demonstrateWritingIntoDifferentStore() {

final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #projection-into-different-store-rs
final ReactiveStreamsCompatibleDBDriver driver = new ReactiveStreamsCompatibleDBDriver();
Expand Down Expand Up @@ -379,8 +372,7 @@ void demonstrateWritingIntoDifferentStoreWithMapAsync() {

final MyJavadslReadJournal readJournal =
PersistenceQuery.get(system)
.getReadJournalFor(
MyJavadslReadJournal.class, "pekko.persistence.query.my-read-journal");
.getReadJournalFor("pekko.persistence.query.my-read-journal");

// #projection-into-different-store-simple
final ExampleStore store = new ExampleStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public void demonstrateReadJournal() {
// #get-read-journal
LeveldbReadJournal queries =
PersistenceQuery.get(system)
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
.getReadJournalFor(LeveldbReadJournal.Identifier());
// #get-read-journal
}

public void demonstrateEventsByPersistenceId() {
// #EventsByPersistenceId
LeveldbReadJournal queries =
PersistenceQuery.get(system)
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
.getReadJournalFor(LeveldbReadJournal.Identifier());

Source<EventEnvelope, NotUsed> source =
queries.eventsByPersistenceId("some-persistence-id", 0, Long.MAX_VALUE);
Expand All @@ -53,7 +53,7 @@ public void demonstrateAllPersistenceIds() {
// #AllPersistenceIds
LeveldbReadJournal queries =
PersistenceQuery.get(system)
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
.getReadJournalFor(LeveldbReadJournal.Identifier());

Source<String, NotUsed> source = queries.persistenceIds();
// #AllPersistenceIds
Expand All @@ -63,7 +63,7 @@ public void demonstrateEventsByTag() {
// #EventsByTag
LeveldbReadJournal queries =
PersistenceQuery.get(system)
.getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
.getReadJournalFor(LeveldbReadJournal.Identifier());

Source<EventEnvelope, NotUsed> source = queries.eventsByTag("green", new Sequence(0L));
// #EventsByTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,37 @@ class PersistenceQuery(system: ExtendedActorSystem)
/**
* Java API: Returns the [[pekko.persistence.query.javadsl.ReadJournal]] specified by the given
* read journal configuration entry.
* @since 1.2.0
*/
final def getReadJournalFor[T <: javadsl.ReadJournal](
readJournalPluginId: String,
readJournalPluginConfig: Config): T =
pluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T]

/**
* Java API: Returns the [[pekko.persistence.query.javadsl.ReadJournal]] specified by the given
* read journal configuration entry.
* @since 1.2.0
*/
final def getReadJournalFor[T <: javadsl.ReadJournal](readJournalPluginId: String): T =
getReadJournalFor[T](readJournalPluginId, ConfigFactory.empty())

/**
* Java API: Returns the [[pekko.persistence.query.javadsl.ReadJournal]] specified by the given
* read journal configuration entry.
*/
@deprecated("Use getReadJournalFor without passing the class param instead", "1.2.0")
final def getReadJournalFor[T <: javadsl.ReadJournal](
@unused clazz: Class[T],
readJournalPluginId: String,
readJournalPluginConfig: Config): T =
pluginFor(readJournalPluginId, readJournalPluginConfig).javadslPlugin.asInstanceOf[T]

/**
* Java API: Returns the [[pekko.persistence.query.javadsl.ReadJournal]] specified by the given
* read journal configuration entry.
*/
@deprecated("Use getReadJournalFor without passing the class param instead", "1.2.0")
final def getReadJournalFor[T <: javadsl.ReadJournal](clazz: Class[T], readJournalPluginId: String): T =
getReadJournalFor[T](clazz, readJournalPluginId, ConfigFactory.empty())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ package org.apache.pekko.persistence.query.javadsl
* Usage:
* {{{
* SomeCoolReadJournal journal =
* PersistenceQuery.get(system).getReadJournalFor(SomeCoolReadJournal.class, queryPluginConfigPath);
* PersistenceQuery.get(system).getReadJournalFor(queryPluginConfigPath);
* Source<EventEnvolope, Unit> events = journal.eventsByTag("mytag", 0L);
* }}}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import pekko.stream.javadsl.Source
* It is retrieved with:
* {{{
* LeveldbReadJournal queries =
* PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.class, LeveldbReadJournal.Identifier());
* PersistenceQuery.get(system).getReadJournalFor(LeveldbReadJournal.Identifier());
* }}}
*
* Corresponding Scala API is in [[pekko.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class PersistenceQueryTest {
@SuppressWarnings("unused")
public void shouldExposeJavaDSLFriendlyQueryJournal() throws Exception {
final DummyJavaReadJournal readJournal =
PersistenceQuery.get(system).getReadJournalFor(DummyJavaReadJournal.class, "noop-journal");
PersistenceQuery.get(system).getReadJournalFor("noop-journal");
final org.apache.pekko.stream.javadsl.Source<String, NotUsed> ids =
readJournal.persistenceIds();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import pekko.persistence.query.javadsl.{
ReadJournal
}
import pekko.persistence.query.typed
import pekko.persistence.query.typed.javadsl.CurrentEventsBySliceQuery
import pekko.stream.javadsl.Source
import pekko.persistence.query.typed.javadsl.{ CurrentEventsBySliceQuery, EventsBySliceQuery }
import pekko.persistence.testkit.query.scaladsl
import pekko.stream.javadsl.Source

object PersistenceTestKitReadJournal {
val Identifier = "pekko.persistence.testkit.query"
Expand All @@ -40,7 +40,8 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
with CurrentEventsByPersistenceIdQuery
with CurrentEventsByTagQuery
with CurrentEventsBySliceQuery
with EventsByTagQuery {
with EventsByTagQuery
with EventsBySliceQuery {

override def eventsByPersistenceId(
persistenceId: String,
Expand All @@ -67,6 +68,13 @@ final class PersistenceTestKitReadJournal(delegate: scaladsl.PersistenceTestKitR
override def eventsByTag(tag: String, offset: Offset): Source[EventEnvelope, NotUsed] =
delegate.eventsByTag(tag, offset).asJava

override def eventsBySlices[Event](
entityType: String,
minSlice: Int,
maxSlice: Int,
offset: Offset): Source[typed.EventEnvelope[Event], NotUsed] =
delegate.eventsBySlices(entityType, minSlice, maxSlice, offset).asJava

override def sliceForPersistenceId(persistenceId: String): Int =
delegate.sliceForPersistenceId(persistenceId)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@

package org.apache.pekko.persistence.testkit.query

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.persistence.Persistence
import pekko.persistence.query.NoOffset
import pekko.persistence.testkit.internal.InMemStorageExtension
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorRef
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.Persistence
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.NoOffset
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.testkit.PersistenceTestKitPlugin
import pekko.persistence.testkit.internal.InMemStorageExtension
import pekko.persistence.testkit.query.javadsl.{ PersistenceTestKitReadJournal => JavaPersistenceTestKitReadJournal }
import pekko.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.persistence.typed.scaladsl.Effect
Expand All @@ -39,6 +38,9 @@ import pekko.stream.testkit.scaladsl.TestSink
import org.scalatest.BeforeAndAfterEach
import org.scalatest.wordspec.AnyWordSpecLike

import scala.collection.immutable.Seq
import scala.concurrent.duration._

object EventsBySliceSpec {
val config = PersistenceTestKitPlugin.config.withFallback(
ConfigFactory.parseString("""
Expand Down Expand Up @@ -81,8 +83,15 @@ class EventsBySliceSpec

implicit val classic: pekko.actor.ActorSystem = system.classicSystem

val queries =
PersistenceQuery(system).readJournalFor[PersistenceTestKitReadJournal](PersistenceTestKitReadJournal.Identifier)
private val persistenceQuery = PersistenceQuery(system)

private val queries =
persistenceQuery.readJournalFor[PersistenceTestKitReadJournal](
PersistenceTestKitReadJournal.Identifier)

private val queriesJava =
persistenceQuery.getReadJournalFor[JavaPersistenceTestKitReadJournal](
JavaPersistenceTestKitReadJournal.Identifier)

def setup(persistenceId: String): ActorRef[Command] = {
val probe = createTestProbe[Done]()
Expand Down Expand Up @@ -131,6 +140,18 @@ class EventsBySliceSpec
probe.expectNext("c-4")
}

"find new events (Java DSL)" in {
val ackProbe = createTestProbe[Done]()
val ref = setup("c")
val src = queriesJava.eventsBySlices[String]("Test", 0, numberOfSlices - 1, NoOffset).asScala
val probe = src.map(_.event).runWith(TestSink.probe[String]).request(5).expectNext("c-1", "c-2", "c-3")

ref ! Command("c-4", ackProbe.ref)
ackProbe.expectMessage(Done)

probe.expectNext("c-4")
}

"find new events after batched setup" in {
val ackProbe = createTestProbe[Done]()
val ref = setupBatched("d")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class EventsByTagSpec
PersistenceTestKitReadJournal.Identifier)

private val queriesJava =
persistenceQuery.getReadJournalFor(
classOf[JavaPersistenceTestKitReadJournal],
persistenceQuery.getReadJournalFor[JavaPersistenceTestKitReadJournal](
JavaPersistenceTestKitReadJournal.Identifier)

def setup(persistenceId: String, tags: Set[String]): ActorRef[Command] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public class EventSourcedBehaviorJavaDslTest extends JUnitSuite {

private PersistenceTestKitReadJournal queries =
PersistenceQuery.get(Adapter.toClassic(testKit.system()))
.getReadJournalFor(
PersistenceTestKitReadJournal.class, PersistenceTestKitReadJournal.Identifier());
.getReadJournalFor(PersistenceTestKitReadJournal.Identifier());

interface Command extends CborSerializable {}

Expand Down
Loading