Skip to content

Commit 2f6087a

Browse files
committed
fixed travis
1 parent 3dd0c50 commit 2f6087a

File tree

8 files changed

+84
-37
lines changed

8 files changed

+84
-37
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
language: java
22
jdk:
3-
- openjdk8
3+
- openjdk11
44

55
sudo: required
66
services:

consumer/src/main/kotlin/streams/StreamsTopicService.kt

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,16 @@ import streams.service.STREAMS_TOPIC_KEY
77
import streams.service.TopicType
88
import streams.service.Topics
99
import streams.utils.Neo4jUtils
10+
import java.lang.IllegalArgumentException
1011

1112
class StreamsTopicService(private val db: GraphDatabaseAPI) {
1213

14+
init {
15+
if (db.databaseName() != Neo4jUtils.SYSTEM_DATABASE_NAME) {
16+
throw IllegalArgumentException("GraphDatabaseAPI must be an instance of ${Neo4jUtils.SYSTEM_DATABASE_NAME} database")
17+
}
18+
}
19+
1320
private fun isEmpty(data: Any, excOnError: Exception) = when (data) {
1421
is Map<*, *> -> data.isEmpty()
1522
is Collection<*> -> data.isEmpty()
@@ -78,9 +85,9 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
7885
else -> throw runtimeException
7986
}
8087
if (isEmpty(filteredData, runtimeException)) {
81-
node.removeProperty(topicType.key)
88+
node.removeProperty("data")
8289
} else {
83-
node.setProperty(topicType.key, filteredData)
90+
node.setProperty("data", JSONUtils.writeValueAsString(filteredData))
8491
}
8592
it.commit()
8693
}
@@ -138,7 +145,12 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
138145
if (!findNodes.hasNext()) {
139146
null
140147
} else {
141-
val data = JSONUtils.readValue<Map<String, String>>(findNodes.next().getProperty("data"))
148+
val node = findNodes.next()
149+
val data = if (node.hasProperty("data")) {
150+
JSONUtils.readValue<Map<String, String>>(node.getProperty("data"))
151+
} else {
152+
emptyMap()
153+
}
142154
data[topic]
143155
}
144156
}
@@ -147,12 +159,17 @@ class StreamsTopicService(private val db: GraphDatabaseAPI) {
147159
fun getAll() = db.beginTx().use { tx ->
148160
TopicType.values()
149161
.mapNotNull {
150-
val topicTypeLabel = Label.label(TopicType.CYPHER.key)
162+
val topicTypeLabel = Label.label(it.key)
151163
val findNodes = tx.findNodes(topicTypeLabel)
152164
if (!findNodes.hasNext()) {
153165
null
154166
} else {
155-
it to JSONUtils.readValue<Any>(findNodes.next().getProperty(it.key))
167+
val node = findNodes.next()
168+
if (node.hasProperty("data")) {
169+
it to JSONUtils.readValue<Any>(node.getProperty("data"))
170+
} else {
171+
null
172+
}
156173
}
157174
}
158175
.toMap()

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkCommit.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit
1818

1919
class KafkaEventSinkCommit : KafkaEventSinkBase() {
2020
@Test
21-
fun shouldWriteLastOffsetWithNoAutoCommit() = runBlocking {
21+
fun `should write last offset with auto commit false`() {
2222
val topic = UUID.randomUUID().toString()
2323
graphDatabaseBuilder.setConfig("streams.sink.topic.cypher.$topic", cypherQueryTemplate)
2424
graphDatabaseBuilder.setConfig("kafka.${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}", "false")
@@ -49,7 +49,7 @@ class KafkaEventSinkCommit : KafkaEventSinkBase() {
4949
}
5050

5151
@Test
52-
fun `should fix issue 186 with auto commit false`() = runBlocking {
52+
fun `should fix issue 186 with auto commit false`() {
5353
val product = "product" to "MERGE (p:Product {id: event.id}) ON CREATE SET p.name = event.name"
5454
val customer = "customer" to "MERGE (c:Customer {id: event.id}) ON CREATE SET c.name = event.name"
5555
val bought = "bought" to """

consumer/src/test/kotlin/integrations/kafka/KafkaEventSinkNoConfigurationIT.kt

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package integrations.kafka
22

33
import io.confluent.kafka.serializers.KafkaAvroDeserializer
4+
import org.junit.After
45
import org.junit.Test
56
import org.neo4j.test.rule.ImpermanentDbmsRule
67
import org.testcontainers.containers.GenericContainer
78
import streams.extensions.execute
89
import streams.setConfig
10+
import streams.shutdownSilently
11+
import streams.start
912
import kotlin.test.assertEquals
1013

1114

@@ -23,12 +26,19 @@ class KafkaEventSinkNoConfigurationIT {
2326

2427
private val topic = "no-config"
2528

29+
private val db = ImpermanentDbmsRule()
30+
31+
@After
32+
fun tearDown() {
33+
db.shutdownSilently()
34+
}
35+
2636
@Test
2737
fun `the db should start even with no bootstrap servers provided()`() {
28-
val db = ImpermanentDbmsRule()
29-
.setConfig("kafka.bootstrap.servers", "")
30-
.setConfig("streams.sink.enabled", "true")
31-
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})") as ImpermanentDbmsRule
38+
db.setConfig("kafka.bootstrap.servers", "")
39+
.setConfig("streams.sink.enabled", "true")
40+
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
41+
.start()
3242
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count") { it.columnAs<Long>("count").next() }
3343
assertEquals(0L, count)
3444
}
@@ -38,13 +48,13 @@ class KafkaEventSinkNoConfigurationIT {
3848
val fakeWebServer = FakeWebServer()
3949
fakeWebServer.start()
4050
val url = fakeWebServer.getUrl().replace("http://", "")
41-
val db = ImpermanentDbmsRule()
42-
.setConfig("kafka.bootstrap.servers", url)
43-
.setConfig("kafka.zookeeper.connect", url)
44-
.setConfig("streams.sink.enabled", "true")
45-
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
46-
.setConfig("kafka.key.deserializer", KafkaAvroDeserializer::class.java.name)
47-
.setConfig("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name) as ImpermanentDbmsRule
51+
db.setConfig("kafka.bootstrap.servers", url)
52+
.setConfig("kafka.zookeeper.connect", url)
53+
.setConfig("streams.sink.enabled", "true")
54+
.setConfig("streams.sink.topic.cypher.$topic", "CREATE (p:Place{name: event.name, coordinates: event.coordinates, citizens: event.citizens})")
55+
.setConfig("kafka.key.deserializer", KafkaAvroDeserializer::class.java.name)
56+
.setConfig("kafka.value.deserializer", KafkaAvroDeserializer::class.java.name)
57+
.start()
4858
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count") { it.columnAs<Long>("count").next() }
4959
assertEquals(0L, count)
5060
fakeWebServer.stop()

consumer/src/test/kotlin/streams/StreamsEventSinkQueryExecutionTest.kt

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,45 +5,49 @@ import org.junit.Before
55
import org.junit.Test
66
import org.neo4j.kernel.internal.GraphDatabaseAPI
77
import org.neo4j.logging.NullLog
8+
import org.neo4j.test.rule.DbmsRule
89
import org.neo4j.test.rule.ImpermanentDbmsRule
910
import streams.extensions.execute
1011
import streams.kafka.KafkaSinkConfiguration
1112
import streams.service.StreamsSinkEntity
1213
import streams.service.TopicType
1314
import streams.service.Topics
15+
import streams.utils.Neo4jUtils
1416
import kotlin.test.assertEquals
1517

1618
class StreamsEventSinkQueryExecutionTest {
17-
private lateinit var db: ImpermanentDbmsRule
19+
private lateinit var db: DbmsRule
1820
private lateinit var streamsEventSinkQueryExecution: StreamsEventSinkQueryExecution
1921

2022
@Before
2123
fun setUp() {
22-
db = ImpermanentDbmsRule()
24+
db = ImpermanentDbmsRule().start()
2325
val kafkaConfig = KafkaSinkConfiguration(streamsSinkConfiguration = StreamsSinkConfiguration(topics = Topics(cypherTopics = mapOf("shouldWriteCypherQuery" to "MERGE (n:Label {id: event.id})\n" +
2426
" ON CREATE SET n += event.properties"))))
25-
val streamsTopicService = StreamsTopicService(db as GraphDatabaseAPI)
27+
val streamsTopicService = StreamsTopicService(db.managementService.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI)
2628
streamsTopicService.set(TopicType.CYPHER, kafkaConfig.streamsSinkConfiguration.topics.cypherTopics)
2729
streamsEventSinkQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db as GraphDatabaseAPI,
2830
NullLog.getInstance(), emptyMap())
2931
}
3032

3133
@After
3234
fun tearDown() {
33-
db.shutdown()
35+
db.shutdownSilently()
3436
}
3537

3638
@Test
3739
fun shouldWriteCypherQuery() {
40+
// given
3841
val first = mapOf("id" to "1", "properties" to mapOf("a" to 1))
3942
val second = mapOf("id" to "2", "properties" to mapOf("a" to 1))
43+
44+
// when
4045
streamsEventSinkQueryExecution.writeForTopic("shouldWriteCypherQuery", listOf(StreamsSinkEntity(first, first),
4146
StreamsSinkEntity(second, second)))
4247

43-
db.execute("MATCH (n:Label) RETURN count(n) AS count")
44-
{ it.columnAs<Long>("count") }
45-
.use { assertEquals(2, it.next()) }
46-
48+
// then
49+
db.execute("MATCH (n:Label) RETURN count(n) AS count") { it.columnAs<Long>("count").next() }
50+
.let { assertEquals(2, it) }
4751
}
4852

4953
}

consumer/src/test/kotlin/streams/StreamsTopicServiceTest.kt

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import org.junit.Test
66
import org.neo4j.dbms.api.DatabaseManagementService
77
import org.neo4j.graphdb.GraphDatabaseService
88
import org.neo4j.kernel.internal.GraphDatabaseAPI
9+
import org.neo4j.test.rule.DbmsRule
910
import org.neo4j.test.rule.ImpermanentDbmsRule
1011
import streams.kafka.KafkaSinkConfiguration
1112
import streams.service.TopicType
@@ -15,13 +16,13 @@ import kotlin.test.assertEquals
1516

1617
class StreamsTopicServiceTest {
1718

18-
private lateinit var db: ImpermanentDbmsRule
19+
private lateinit var db: DbmsRule
1920
private lateinit var streamsTopicService: StreamsTopicService
2021
private lateinit var kafkaConfig: KafkaSinkConfiguration
2122

2223
@Before
2324
fun setUp() {
24-
db = ImpermanentDbmsRule()
25+
db = ImpermanentDbmsRule().start()
2526
kafkaConfig = KafkaSinkConfiguration(streamsSinkConfiguration = StreamsSinkConfiguration(topics = Topics(cypherTopics = mapOf("shouldWriteCypherQuery" to "MERGE (n:Label {id: event.id})\n" +
2627
" ON CREATE SET n += event.properties"))))
2728
streamsTopicService = StreamsTopicService(db.managementService.database(Neo4jUtils.SYSTEM_DATABASE_NAME) as GraphDatabaseAPI)
@@ -30,7 +31,7 @@ class StreamsTopicServiceTest {
3031

3132
@After
3233
fun tearDown() {
33-
db.shutdown()
34+
db.shutdownSilently()
3435
}
3536

3637
private fun assertProperty(entry: Map.Entry<String, String>) {
@@ -44,10 +45,14 @@ class StreamsTopicServiceTest {
4445

4546
@Test
4647
fun shouldStoreTopicsAndCypherTemplate() {
48+
// given
4749
val map = mapOf("topic1" to "MERGE (n:Label1 {id: event.id})",
4850
"topic2" to "MERGE (n:Label2 {id: event.id})")
51+
52+
// when
4953
streamsTopicService.set(TopicType.CYPHER, map)
5054

55+
// then
5156
val allTopics = map.plus(kafkaConfig.streamsSinkConfiguration.topics.cypherTopics)
5257
allTopics.forEach { assertProperty(it) }
5358

@@ -59,15 +64,19 @@ class StreamsTopicServiceTest {
5964

6065
@Test
6166
fun `should remove topics`() {
67+
// given
6268
val topicToRemove = "topic2"
6369
val map = mapOf("topic1" to "MERGE (n:Label1 {id: event.id})",
6470
topicToRemove to "MERGE (n:Label2 {id: event.id})")
65-
streamsTopicService.set(TopicType.CYPHER, map)
6671

72+
streamsTopicService.set(TopicType.CYPHER, map)
6773
val allTopics = map.plus(kafkaConfig.streamsSinkConfiguration.topics.cypherTopics)
6874
allTopics.forEach { assertProperty(it) }
6975

76+
// when
7077
streamsTopicService.remove(TopicType.CYPHER, topicToRemove)
78+
79+
// then
7180
val remainingTopics = allTopics.filterKeys { it != topicToRemove }
7281

7382
assertEquals(remainingTopics, streamsTopicService.getAll().getValue(TopicType.CYPHER))

producer/src/test/kotlin/streams/integrations/KafkaEventRouterNoTopicAutocreationIT.kt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import streams.extensions.execute
1616
import streams.kafka.KafkaConfiguration
1717
import streams.kafka.KafkaTestUtils
1818
import streams.setConfig
19+
import streams.shutdownSilently
1920
import streams.start
2021
import streams.utils.StreamsUtils
2122
import kotlin.test.assertEquals
@@ -73,13 +74,14 @@ class KafkaEventRouterNoTopicAutocreationIT {
7374
val db = ImpermanentDbmsRule()
7475
.setConfig("kafka.bootstrap.servers", kafka.bootstrapServers)
7576
.setConfig("streams.source.topic.nodes.personNotDefined", "Person{*}")
77+
.start()
7678

7779
// then
7880
val count = db.execute("MATCH (n) RETURN COUNT(n) AS count") {
79-
it.columnAs<Long>("count")
80-
.next()
81+
it.columnAs<Long>("count").next()
8182
}
8283
assertEquals(0L, count)
84+
db.shutdownSilently()
8385
}
8486

8587
@Test

producer/src/test/kotlin/streams/integrations/StreamsTransactionEventHandlerIT.kt

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,26 @@ import streams.events.RelationshipPayload
1212
import streams.extensions.execute
1313
import streams.mocks.MockStreamsEventRouter
1414
import streams.setConfig
15+
import streams.shutdownSilently
16+
import streams.start
1517
import kotlin.test.assertEquals
1618
import kotlin.test.assertNotNull
1719

1820
@Suppress("DEPRECATION")
1921
class StreamsTransactionEventHandlerIT {
2022

21-
@Rule
22-
@JvmField
23-
val db: DbmsRule = ImpermanentDbmsRule().startLazily()
23+
val db: DbmsRule = ImpermanentDbmsRule()
2424
.setConfig("streams.router", "streams.mocks.MockStreamsEventRouter")
2525

2626
@Before
2727
fun setUp() {
2828
MockStreamsEventRouter.reset()
29-
db.ensureStarted()
29+
db.start()
30+
}
31+
32+
@After
33+
fun tearDown() {
34+
db.shutdownSilently()
3035
}
3136

3237
@Test fun testNodes(){

0 commit comments

Comments
 (0)