Skip to content

Commit 3dd0c50

Browse files
committed
fixed tests
1 parent 776d912 commit 3dd0c50

27 files changed

+308
-242
lines changed

common/src/main/kotlin/streams/config/StreamsConfig.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package streams.config
22

3-
import org.neo4j.configuration.Config
43
import org.neo4j.kernel.lifecycle.LifecycleAdapter
54
import org.neo4j.logging.internal.LogService
65
import java.io.FileInputStream
@@ -10,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap
109
import java.util.regex.Matcher
1110
import java.util.regex.Pattern
1211

13-
class StreamsConfig(private val neo4jConfig: Config, logService: LogService) : LifecycleAdapter() {
12+
class StreamsConfig(logService: LogService) : LifecycleAdapter() {
1413

1514
val config = ConcurrentHashMap<String, String>()
1615

@@ -28,10 +27,15 @@ class StreamsConfig(private val neo4jConfig: Config, logService: LogService) : L
2827
}
2928

3029
override fun init() {
31-
log.debug("Init StreamsConfig")
30+
if (log.isDebugEnabled) {
31+
log.debug("Init StreamsConfig...")
32+
}
3233
loadConfiguration()
3334
afterInitListeners.forEach { it(config) }
34-
println("Neo4j Streams configuration initialised: $config")
35+
}
36+
37+
override fun stop() {
38+
afterInitListeners.clear()
3539
}
3640

3741
private fun loadConfiguration() {
@@ -60,6 +64,7 @@ class StreamsConfig(private val neo4jConfig: Config, logService: LogService) : L
6064
config.putAll(filteredValues)
6165
}
6266

67+
// Taken from ApocConfig.java
6368
private fun determineNeo4jConfFolder(): String? { // sun.java.command=com.neo4j.server.enterprise.CommercialEntryPoint --home-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02 --config-dir=/home/myid/neo4j-enterprise-4.0.0-alpha09mr02/conf
6469
val command = System.getProperty(SUN_JAVA_COMMAND)
6570
val matcher: Matcher = CONF_DIR_PATTERN.matcher(command)

common/src/main/kotlin/streams/config/StreamsConfigExtensionFactory.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package streams.config
22

33
import org.neo4j.configuration.Config
4-
import org.neo4j.dbms.api.DatabaseManagementService
54
import org.neo4j.kernel.extension.ExtensionFactory
65
import org.neo4j.kernel.extension.ExtensionType
76
import org.neo4j.kernel.extension.context.ExtensionContext
@@ -15,6 +14,6 @@ class StreamsConfigExtensionFactory: ExtensionFactory<StreamsConfigExtensionFact
1514
}
1615

1716
override fun newInstance(context: ExtensionContext, dependencies: Dependencies): Lifecycle {
18-
return StreamsConfig(dependencies.config(), dependencies.log())
17+
return StreamsConfig(dependencies.log())
1918
}
2019
}

common/src/main/kotlin/streams/extensions/GraphDatabaseServerExtensions.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ fun GraphDatabaseService.execute(cypher: String, params: Map<String, Any>) = thi
88

99
fun <T> GraphDatabaseService.execute(cypher: String, lambda: ((Result) -> T)) = this.execute(cypher, emptyMap(), lambda)
1010
fun <T> GraphDatabaseService.execute(cypher: String, params: Map<String, Any>, lambda: ((Result) -> T)) = this.beginTx().use {
11-
val resp = it.execute(cypher, params)
11+
val result = it.execute(cypher, params)
12+
val ret = lambda(result)
1213
it.commit()
13-
lambda.let { it(resp) }
14+
ret
1415
}

consumer/src/main/kotlin/streams/StreamsEventSink.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ object StreamsEventSinkFactory {
2929
fun getStreamsEventSink(config: Map<String, String>, streamsQueryExecution: StreamsEventSinkQueryExecution,
3030
streamsTopicService: StreamsTopicService, log: Log, db: GraphDatabaseAPI): StreamsEventSink {
3131
return Class.forName(config.getOrDefault("streams.sink", "streams.kafka.KafkaEventSink"))
32-
.getConstructor(Config::class.java,
32+
.getConstructor(Map::class.java,
3333
StreamsEventSinkQueryExecution::class.java,
3434
StreamsTopicService::class.java,
3535
Log::class.java,

consumer/src/main/kotlin/streams/StreamsEventSinkExtensionFactory.kt

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streams
33
import org.neo4j.kernel.availability.AvailabilityGuard
44
import org.neo4j.kernel.availability.AvailabilityListener
55
import org.neo4j.configuration.Config
6+
import org.neo4j.dbms.api.DatabaseManagementService
67
import org.neo4j.kernel.extension.ExtensionType
78
import org.neo4j.kernel.extension.ExtensionFactory
89
import org.neo4j.kernel.extension.context.ExtensionContext
@@ -24,60 +25,65 @@ class StreamsEventSinkExtensionFactory : ExtensionFactory<StreamsEventSinkExtens
2425

2526
interface Dependencies {
2627
fun graphdatabaseAPI(): GraphDatabaseAPI
28+
fun dbms(): DatabaseManagementService
2729
fun log(): LogService
2830
fun streamsConfig(): StreamsConfig
2931
fun availabilityGuard(): AvailabilityGuard
3032
}
3133

3234
class StreamsEventLifecycle(private val dependencies: Dependencies): LifecycleAdapter() {
3335
private val db = dependencies.graphdatabaseAPI()
36+
private val dbms = dependencies.dbms()
3437
private val logService = dependencies.log()
3538
private val configuration = dependencies.streamsConfig()
3639
private var streamsLog = logService.getUserLog(StreamsEventLifecycle::class.java)
3740

3841
private lateinit var eventSink: StreamsEventSink
3942

4043
override fun start() {
41-
dependencies.availabilityGuard().addListener(object: AvailabilityListener {
42-
override fun unavailable() {}
43-
44-
override fun available() {
45-
try {
46-
streamsLog.info("Initialising the Streams Sink module")
47-
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration.config)
48-
val streamsTopicService = StreamsTopicService(db)
49-
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
50-
streamsSinkConfiguration.sourceIdStrategyConfig)
51-
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
52-
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
53-
strategyMap)
44+
if (db.databaseName() == Neo4jUtils.SYSTEM_DATABASE_NAME) {
45+
return
46+
}
47+
dependencies.availabilityGuard().addListener(object: AvailabilityListener {
48+
override fun unavailable() {}
5449

55-
// Create the Sink
56-
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
57-
eventSink = StreamsEventSinkFactory
58-
.getStreamsEventSink(configuration.config,
59-
streamsQueryExecution,
60-
streamsTopicService,
61-
log,
62-
db)
63-
// start the Sink
64-
if (Neo4jUtils.isCluster(db)) {
65-
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
66-
Neo4jUtils.waitForTheLeader(db, log) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
67-
} else {
68-
// check if is writeable instance
69-
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
70-
}
50+
override fun available() {
51+
try {
52+
streamsLog.info("Initialising the Streams Sink module")
53+
val streamsSinkConfiguration = StreamsSinkConfiguration.from(configuration.config)
54+
val streamsTopicService = StreamsTopicService(dbms.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI)
55+
val strategyMap = TopicUtils.toStrategyMap(streamsSinkConfiguration.topics,
56+
streamsSinkConfiguration.sourceIdStrategyConfig)
57+
val streamsQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db,
58+
logService.getUserLog(StreamsEventSinkQueryExecution::class.java),
59+
strategyMap)
7160

72-
// Register required services for the Procedures
73-
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
74-
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
75-
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
76-
} catch (e: Exception) {
77-
streamsLog.error("Error initializing the streaming sink:", e)
61+
// Create the Sink
62+
val log = logService.getUserLog(StreamsEventSinkFactory::class.java)
63+
eventSink = StreamsEventSinkFactory
64+
.getStreamsEventSink(configuration.config,
65+
streamsQueryExecution,
66+
streamsTopicService,
67+
log,
68+
db)
69+
// start the Sink
70+
if (Neo4jUtils.isCluster(db)) {
71+
log.info("The Sink module is running in a cluster, checking for the ${Neo4jUtils.LEADER}")
72+
Neo4jUtils.waitForTheLeader(db, log) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
73+
} else {
74+
// check if is writeable instance
75+
Neo4jUtils.executeInWriteableInstance(db) { initSinkModule(streamsTopicService, streamsSinkConfiguration) }
7876
}
77+
78+
// Register required services for the Procedures
79+
StreamsSinkProcedures.registerStreamsSinkConfiguration(streamsSinkConfiguration)
80+
StreamsSinkProcedures.registerStreamsEventConsumerFactory(eventSink.getEventConsumerFactory())
81+
StreamsSinkProcedures.registerStreamsEventSinkConfigMapper(eventSink.getEventSinkConfigMapper())
82+
} catch (e: Exception) {
83+
streamsLog.error("Error initializing the streaming sink:", e)
7984
}
80-
})
85+
}
86+
})
8187
}
8288

8389
private fun initSinkModule(streamsTopicService: StreamsTopicService, streamsSinkConfiguration: StreamsSinkConfiguration) {

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,9 @@ class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTop
2525

2626
override fun write(query: String, params: Collection<Any>) {
2727
if (Neo4jUtils.isWriteableInstance(db)) {
28-
db.execute(query, mapOf("events" to params)) { result ->
29-
if (log.isDebugEnabled) {
30-
log.debug("Query statistics:\n${result.queryStatistics}")
31-
}
32-
result.close()
28+
val queryStatistics = db.execute(query, mapOf("events" to params)) { it.queryStatistics }
29+
if (log.isDebugEnabled) {
30+
log.debug("Query statistics:\n$queryStatistics")
3331
}
3432
} else {
3533
if (log.isDebugEnabled) {

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,20 @@ import streams.utils.Neo4jUtils
1010

1111
class StreamsTopicService(private val db: GraphDatabaseAPI) {
1212

13+
private fun isEmpty(data: Any, excOnError: Exception) = when (data) {
14+
is Map<*, *> -> data.isEmpty()
15+
is Collection<*> -> data.isEmpty()
16+
else -> throw excOnError
17+
}
18+
1319
fun clearAll() { // TODO move to Neo4jUtils#executeInWriteableInstance
1420
if (!Neo4jUtils.isWriteableInstance(db)) {
1521
return
1622
}
1723
return db.beginTx().use {
1824
it.findNodes(Label.label(STREAMS_TOPIC_KEY))
1925
.forEach { it.delete() }
26+
it.commit()
2027
}
2128
}
2229

@@ -29,13 +36,25 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
2936
} else {
3037
it.createNode(Label.label(STREAMS_TOPIC_KEY), topicTypeLabel)
3138
}
32-
val oldData = JSONUtils.readValue<Any>(node.getProperty("data"))
39+
val runtimeException = RuntimeException("Unsupported data $data for topic type $topicType")
40+
val oldData = if (node.hasProperty("data")) {
41+
JSONUtils.readValue<Any>(node.getProperty("data"))
42+
} else {
43+
when (data) {
44+
is Map<*, *> -> emptyMap<String, Any?>()
45+
is Collection<*> -> emptyList<String>()
46+
else -> throw runtimeException
47+
}
48+
}
3349
val newData = when (oldData) {
3450
is Map<*, *> -> oldData + (data as Map<String, Any?>)
3551
is Collection<*> -> oldData + (data as Collection<String>)
36-
else -> throw RuntimeException("Unsupported data $data for topic type $topicType")
52+
else -> throw runtimeException
53+
}
54+
if (!isEmpty(newData, runtimeException)) {
55+
node.setProperty("data", JSONUtils.writeValueAsString(newData))
56+
it.commit()
3757
}
38-
node.setProperty("data", newData)
3958
}
4059
}
4160

@@ -48,22 +67,22 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
4867
} else {
4968
return@executeInWriteableInstance
5069
}
70+
if (!node.hasProperty("data")) {
71+
return@executeInWriteableInstance
72+
}
5173
val topicData = JSONUtils.readValue<Any>(node.getProperty("data"))
74+
val runtimeException = RuntimeException("Unsupported data $topicData for topic type $topicType")
5275
val filteredData = when (topicData) {
5376
is Map<*, *> -> topicData.filterKeys { it.toString() != topic }
5477
is Collection<*> -> topicData.filter { it.toString() != topic }
55-
else -> throw RuntimeException("Unsupported data $topicData for topic type $topicType")
78+
else -> throw runtimeException
5679
}
57-
val isEmpty = when (filteredData) {
58-
is Map<*, *> -> filteredData.isEmpty()
59-
is Collection<*> -> filteredData.isEmpty()
60-
else -> throw RuntimeException("Unsupported data $topicData for topic type $topicType")
61-
}
62-
if (isEmpty) {
80+
if (isEmpty(filteredData, runtimeException)) {
6381
node.removeProperty(topicType.key)
6482
} else {
6583
node.setProperty(topicType.key, filteredData)
6684
}
85+
it.commit()
6786
}
6887
}
6988

@@ -94,16 +113,15 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
94113
val topicTypeLabel = Label.label(it.key)
95114
val findNodes = tx.findNodes(topicTypeLabel)
96115
if (!findNodes.hasNext()) {
97-
emptySet<Any>()
116+
emptySet<String>()
98117
} else {
99118
val data = JSONUtils.readValue<Any>(findNodes.next().getProperty("data"))
100119
when (data) {
101120
is Map<*, *> -> data.keys
102121
is Collection<*> -> data.toSet()
103-
else -> emptySet()
122+
else -> emptySet<String>()
104123
}
105124
}
106-
107125
}.toSet() as Set<String>
108126
}
109127

consumer/src/main/kotlin/streams/kafka/KafkaAutoCommitEventConsumer.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ open class KafkaAutoCommitEventConsumer(private val config: KafkaSinkConfigurati
7171

7272
override fun stop() {
7373
consumer.close()
74-
// doesn't make sense to close it if you don't own it
75-
errorService.close()
7674
}
7775

7876
fun readSimple(action: (String, List<StreamsSinkEntity>) -> Unit): Map<TopicPartition, OffsetAndMetadata> {

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,21 +49,31 @@ class KafkaEventSink(private val config: Map<String, String>,
4949
"schemaRegistryUrl" to "kafka.schema.registry.url",
5050
"groupId" to "kafka.${ConsumerConfig.GROUP_ID_CONFIG}")
5151

52+
private lateinit var errorService: ErrorService
53+
5254
override fun getEventConsumerFactory(): StreamsEventConsumerFactory {
5355
return object: StreamsEventConsumerFactory() {
5456
override fun createStreamsEventConsumer(config: Map<String, String>, log: Log): StreamsEventConsumer {
5557
val kafkaConfig = KafkaSinkConfiguration.from(config)
56-
57-
val errorService = KafkaErrorService(kafkaConfig.asProperties(), ErrorService.ErrorConfig.from(kafkaConfig.streamsSinkConfiguration.errorConfig),{ s, e -> log.error(s,e as Throwable)})
5858
return if (kafkaConfig.enableAutoCommit) {
59-
KafkaAutoCommitEventConsumer(kafkaConfig, log, errorService)
59+
KafkaAutoCommitEventConsumer(kafkaConfig, log, getErrorService())
6060
} else {
61-
KafkaManualCommitEventConsumer(kafkaConfig, log, errorService)
61+
KafkaManualCommitEventConsumer(kafkaConfig, log, getErrorService())
6262
}
6363
}
6464
}
6565
}
6666

67+
private fun getErrorService(): ErrorService {
68+
if (!this::errorService.isInitialized) {
69+
val kafkaConfig = KafkaSinkConfiguration.create(config)
70+
this.errorService = KafkaErrorService(kafkaConfig.asProperties(),
71+
ErrorService.ErrorConfig.from(kafkaConfig.streamsSinkConfiguration.errorConfig),
72+
{ s, e -> log.error(s,e as Throwable) })
73+
}
74+
return this.errorService
75+
}
76+
6777
override fun start() { // TODO move to the abstract class
6878
val streamsConfig = StreamsSinkConfiguration.from(config)
6979
val topics = streamsTopicService.getTopics()
@@ -92,12 +102,17 @@ class KafkaEventSink(private val config: Map<String, String>,
92102
log.info("Kafka Sink started")
93103
}
94104

95-
override fun stop() = runBlocking { // TODO move to the abstract class
105+
override fun stop() { // TODO move to the abstract class
96106
log.info("Stopping Kafka Sink daemon Job")
97-
try {
98-
job.cancelAndJoin()
99-
log.info("Kafka Sink daemon Job stopped")
100-
} catch (e : UninitializedPropertyAccessException) { /* ignoring this one only */ }
107+
StreamsUtils.ignoreExceptions({ errorService.close() }, UninitializedPropertyAccessException::class.java)
108+
StreamsUtils.ignoreExceptions({
109+
runBlocking {
110+
if (job.isActive) {
111+
job.cancelAndJoin()
112+
}
113+
log.info("Kafka Sink daemon Job stopped")
114+
}
115+
}, UninitializedPropertyAccessException::class.java)
101116
}
102117

103118
private fun createJob(): Job {
@@ -123,9 +138,14 @@ class KafkaEventSink(private val config: Map<String, String>,
123138
delay(timeMillis)
124139
}
125140
eventConsumer.stop()
126-
} catch (e: Throwable) {
127-
val message = e.message ?: "Generic error, please check the stack trace: "
128-
log.error(message, e)
141+
} catch (e: Exception) {
142+
when (e) {
143+
is kotlinx.coroutines.CancellationException -> null
144+
else -> {
145+
val message = e.message ?: "Generic error, please check the stack trace: "
146+
log.error(message, e)
147+
}
148+
}
129149
eventConsumer.stop()
130150
}
131151
}

0 commit comments

Comments
 (0)