Skip to content

Commit 2295763

Browse files
committed
fixed distribution
1 parent 2f6087a commit 2295763

File tree

11 files changed

+53
-31
lines changed

11 files changed

+53
-31
lines changed

common/src/main/kotlin/streams/config/StreamsConfigExtensionFactory.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package streams.config
22

33
import org.neo4j.configuration.Config
4+
import org.neo4j.dbms.api.DatabaseManagementService
45
import org.neo4j.kernel.extension.ExtensionFactory
56
import org.neo4j.kernel.extension.ExtensionType
67
import org.neo4j.kernel.extension.context.ExtensionContext
@@ -10,7 +11,7 @@ import org.neo4j.logging.internal.LogService
1011
class StreamsConfigExtensionFactory: ExtensionFactory<StreamsConfigExtensionFactory.Dependencies>(ExtensionType.GLOBAL, StreamsConfig::class.java.simpleName) {
1112
interface Dependencies {
1213
fun log(): LogService
13-
fun config(): Config
14+
fun dbms(): DatabaseManagementService
1415
}
1516

1617
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package streams.extensions
2+
3+
import org.neo4j.dbms.api.DatabaseManagementService
4+
import org.neo4j.kernel.internal.GraphDatabaseAPI
5+
import streams.utils.Neo4jUtils
6+
7+
fun DatabaseManagementService.getSystemDb() = this.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI

common/src/main/kotlin/streams/extensions/GraphDatabaseServerExtensions.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package streams.extensions
22

33
import org.neo4j.graphdb.GraphDatabaseService
44
import org.neo4j.graphdb.Result
5+
import streams.utils.Neo4jUtils
56

67
fun GraphDatabaseService.execute(cypher: String) = this.execute(cypher, emptyMap())
78
fun GraphDatabaseService.execute(cypher: String, params: Map<String, Any>) = this.executeTransactionally(cypher, params)
@@ -12,4 +13,6 @@ fun <T> GraphDatabaseService.execute(cypher: String, params: Map<String, Any>, l
1213
val ret = lambda(result)
1314
it.commit()
1415
ret
15-
}
16+
}
17+
18+
fun GraphDatabaseService.isSystemDb() = this.databaseName() == Neo4jUtils.SYSTEM_DATABASE_NAME

consumer/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
<groupId>org.neo4j</groupId>
3737
<artifactId>neo4j-streams-common</artifactId>
3838
<version>4.0.0</version>
39+
<scope>provided</scope>
3940
</dependency>
4041
<dependency>
4142
<groupId>org.apache.avro</groupId>

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package streams
22

3+
import org.neo4j.dbms.api.DatabaseManagementService
34
import org.neo4j.kernel.availability.AvailabilityGuard
45
import org.neo4j.kernel.availability.AvailabilityListener
5-
import org.neo4j.configuration.Config
6-
import org.neo4j.dbms.api.DatabaseManagementService
7-
import org.neo4j.kernel.extension.ExtensionType
86
import org.neo4j.kernel.extension.ExtensionFactory
7+
import org.neo4j.kernel.extension.ExtensionType
98
import org.neo4j.kernel.extension.context.ExtensionContext
109
import org.neo4j.kernel.internal.GraphDatabaseAPI
1110
import org.neo4j.kernel.lifecycle.Lifecycle
1211
import org.neo4j.kernel.lifecycle.LifecycleAdapter
1312
import org.neo4j.logging.internal.LogService
1413
import streams.config.StreamsConfig
14+
import streams.extensions.getSystemDb
15+
import streams.extensions.isSystemDb
1516
import streams.procedures.StreamsSinkProcedures
1617
import streams.service.TopicUtils
1718
import streams.utils.Neo4jUtils
@@ -41,7 +42,7 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
4142
private lateinit var eventSink: StreamsEventSink
4243

4344
override fun start() {
44-
if (db.databaseName() == Neo4jUtils.SYSTEM_DATABASE_NAME) {
45+
if (db.isSystemDb()) {
4546
return
4647
}
4748
dependencies.availabilityGuard().addListener(object: AvailabilityListener {
@@ -51,7 +52,7 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
5152
try {
5253
streamsLog.info("Initialising the Streams Sink module")
5354
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration.config)
54-
val streamsTopicService = StreamsTopicService(dbms.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI)
55+
val streamsTopicService = StreamsTopicService(dbms.getSystemDb())
5556
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
5657
streamsSinkConfiguration.sourceIdStrategyConfig)
5758
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@ package streams
22

33
import org.neo4j.graphdb.Label
44
import org.neo4j.kernel.internal.GraphDatabaseAPI
5+
import streams.extensions.isSystemDb
56
import streams.serialization.JSONUtils
67
import streams.service.STREAMS_TOPIC_KEY
78
import streams.service.TopicType
89
import streams.service.Topics
910
import streams.utils.Neo4jUtils
1011
import java.lang.IllegalArgumentException
1112

12-
class StreamsTopicService(private val db: GraphDatabaseAPI) {
13+
class StreamsTopicService(private val systemDb: GraphDatabaseAPI) {
1314

1415
init {
15-
if (db.databaseName() != Neo4jUtils.SYSTEM_DATABASE_NAME) {
16+
if (!systemDb.isSystemDb()) {
1617
throw IllegalArgumentException("GraphDatabaseAPI must be an instance of ${Neo4jUtils.SYSTEM_DATABASE_NAME} database")
1718
}
1819
}
@@ -24,18 +25,18 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
2425
}
2526

2627
fun clearAll() { // TODO move to Neo4jUtils#executeInWriteableInstance
27-
if (!Neo4jUtils.isWriteableInstance(db)) {
28+
if (!Neo4jUtils.isWriteableInstance(systemDb)) {
2829
return
2930
}
30-
return db.beginTx().use {
31+
return systemDb.beginTx().use {
3132
it.findNodes(Label.label(STREAMS_TOPIC_KEY))
3233
.forEach { it.delete() }
3334
it.commit()
3435
}
3536
}
3637

37-
fun set(topicType: TopicType, data: Any) = Neo4jUtils.executeInWriteableInstance(db) {
38-
db.beginTx().use {
38+
fun set(topicType: TopicType, data: Any) = Neo4jUtils.executeInWriteableInstance(systemDb) {
39+
systemDb.beginTx().use {
3940
val topicTypeLabel = Label.label(topicType.key)
4041
val findNodes = it.findNodes(topicTypeLabel)
4142
val node = if (findNodes.hasNext()) {
@@ -65,8 +66,8 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
6566
}
6667
}
6768

68-
fun remove(topicType: TopicType, topic: String) = Neo4jUtils.executeInWriteableInstance(db) {
69-
db.beginTx().use {
69+
fun remove(topicType: TopicType, topic: String) = Neo4jUtils.executeInWriteableInstance(systemDb) {
70+
systemDb.beginTx().use {
7071
val topicTypeLabel = Label.label(topicType.key)
7172
val findNodes = it.findNodes(topicTypeLabel)
7273
val node = if (findNodes.hasNext()) {
@@ -93,8 +94,8 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
9394
}
9495
}
9596

96-
fun getTopicType(topic: String) = Neo4jUtils.executeInWriteableInstance(db) {
97-
db.beginTx().use { tx ->
97+
fun getTopicType(topic: String) = Neo4jUtils.executeInWriteableInstance(systemDb) {
98+
systemDb.beginTx().use { tx ->
9899
TopicType.values()
99100
.find {
100101
val topicTypeLabel = Label.label(it.key)
@@ -114,7 +115,7 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
114115
}
115116
}
116117

117-
fun getTopics() = db.beginTx().use { tx ->
118+
fun getTopics() = systemDb.beginTx().use { tx ->
118119
TopicType.values()
119120
.flatMap {
120121
val topicTypeLabel = Label.label(it.key)
@@ -138,8 +139,8 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
138139
}
139140
}
140141

141-
fun getCypherTemplate(topic: String) = db.beginTx().use {
142-
db.beginTx().use {
142+
fun getCypherTemplate(topic: String) = systemDb.beginTx().use {
143+
systemDb.beginTx().use {
143144
val topicTypeLabel = Label.label(TopicType.CYPHER.key)
144145
val findNodes = it.findNodes(topicTypeLabel)
145146
if (!findNodes.hasNext()) {
@@ -156,7 +157,7 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
156157
}
157158
}
158159

159-
fun getAll() = db.beginTx().use { tx ->
160+
fun getAll() = systemDb.beginTx().use { tx ->
160161
TopicType.values()
161162
.mapNotNull {
162163
val topicTypeLabel = Label.label(it.key)

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCommit.kt

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,19 @@ class KafkaEventSinkCommit : KafkaEventSinkBase() {
3232
val resp = kafkaProducer.send(producerRecord).get()
3333

3434
Assert.assertEventually(ThrowingSupplier<Boolean, Exception> {
35-
val query = "MATCH (n:Label) RETURN count(*) AS count"
36-
3735
val kafkaConsumer = createConsumer<String, ByteArray>(
3836
kafka = KafkaEventSinkSuiteIT.kafka,
3937
schemaRegistry = KafkaEventSinkSuiteIT.schemaRegistry)
4038
val offsetAndMetadata = kafkaConsumer.committed(TopicPartition(topic, partition))
4139
kafkaConsumer.close()
42-
43-
db.execute(query) {
44-
val result = it.columnAs<Long>("count")
45-
result.hasNext() && result.next() == 2L && !result.hasNext() && resp.offset() + 1 == offsetAndMetadata.offset()
40+
if (offsetAndMetadata == null) {
41+
false
42+
} else {
43+
val query = "MATCH (n:Label) RETURN count(*) AS count"
44+
db.execute(query) {
45+
val result = it.columnAs<Long>("count")
46+
result.hasNext() && result.next() == 2L && !result.hasNext() && resp.offset() + 1 == offsetAndMetadata.offset()
47+
}
4648
}
4749
}, Matchers.equalTo(true), 30, TimeUnit.SECONDS)
4850

distribution/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>neo4j-streams-consumer</artifactId>
2929
<version>${project.version}</version>
3030
</dependency>
31+
<dependency>
32+
<groupId>org.neo4j</groupId>
33+
<artifactId>neo4j-streams-common</artifactId>
34+
<version>4.0.0</version>
35+
</dependency>
3136
</dependencies>
3237

3338
<build>

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@
287287
<include>**/*Test.*</include>
288288
<include>**/*IT.*</include>
289289
</includes>
290+
<trimStackTrace>false</trimStackTrace>
290291
</configuration>
291292
</plugin>
292293
</plugins>

producer/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
<groupId>org.neo4j</groupId>
2323
<artifactId>neo4j-streams-common</artifactId>
2424
<version>4.0.0</version>
25+
<scope>provided</scope>
2526
</dependency>
2627

2728
</dependencies>

producer/src/main/kotlin/streams/StreamsExtensionFactory.kt

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package streams
22

3-
import org.neo4j.configuration.Config
43
import org.neo4j.dbms.api.DatabaseManagementService
54
import org.neo4j.kernel.availability.AvailabilityGuard
65
import org.neo4j.kernel.availability.AvailabilityListener
@@ -12,8 +11,8 @@ import org.neo4j.kernel.lifecycle.Lifecycle
1211
import org.neo4j.kernel.lifecycle.LifecycleAdapter
1312
import org.neo4j.logging.internal.LogService
1413
import streams.config.StreamsConfig
14+
import streams.extensions.isSystemDb
1515
import streams.procedures.StreamsProcedures
16-
import streams.utils.Neo4jUtils
1716
import streams.utils.StreamsUtils
1817

1918
class StreamsExtensionFactory : ExtensionFactory<StreamsExtensionFactory.Dependencies>(ExtensionType.DATABASE,"Streams.Producer") {
@@ -48,7 +47,7 @@ class StreamsEventRouterLifecycle(private val db: GraphDatabaseAPI,
4847

4948
override fun start() {
5049
try {
51-
if (db.databaseName() == Neo4jUtils.SYSTEM_DATABASE_NAME) {
50+
if (db.isSystemDb()) {
5251
return
5352
}
5453
streamsLog.info("Initialising the Streams Source module")
@@ -83,7 +82,7 @@ class StreamsEventRouterLifecycle(private val db: GraphDatabaseAPI,
8382
private fun unregisterTransactionEventHandler() {
8483
if (streamsEventRouterConfiguration.enabled) {
8584
StreamsUtils.ignoreExceptions({ streamsConstraintsService.close() }, UninitializedPropertyAccessException::class.java)
86-
databaseManagementService.unregisterTransactionEventListener(db.databaseName(), txHandler)
85+
StreamsUtils.ignoreExceptions({ databaseManagementService.unregisterTransactionEventListener(db.databaseName(), txHandler) }, UninitializedPropertyAccessException::class.java)
8786
}
8887
}
8988

0 commit comments

Comments
 (0)