Skip to content

Commit 0e8f778

Browse files
authored
feat: switch to caniuse and add cypher version selection (#725)
1 parent a743693 commit 0e8f778

28 files changed

+1371
-848
lines changed

.github/workflows/ci.yml

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,30 @@
1-
name: Java CI with Maven
1+
name: build
22

33
on:
44
push:
5-
branches: [ '**' ]
5+
branches: [ '5.0' ]
66
pull_request:
7-
branches: [ '**' ]
7+
branches: [ '5.0' ]
88

99
jobs:
1010
build:
11-
1211
runs-on: ubuntu-latest
13-
12+
strategy:
13+
fail-fast: false
14+
matrix:
15+
neo4j-image: [ "neo4j:4.4", "neo4j:4.4-enterprise", "neo4j:5", "neo4j:5-enterprise", "neo4j:2025", "neo4j:2025-enterprise" ]
16+
name: Build and test with ${{ matrix.neo4j-image }}
1417
steps:
1518
- uses: actions/checkout@v4
1619

17-
- name: Set up JDK 11
20+
- name: Set up Java
1821
uses: actions/setup-java@v4
1922
with:
2023
distribution: temurin
2124
java-version: 11
2225
cache: 'maven'
2326

24-
- name: Build with Maven
25-
run: mvn -B clean test --file pom.xml --no-transfer-progress
27+
- name: Build and run tests
28+
env:
29+
NEO4J_IMAGE: ${{ matrix.neo4j-image }}
30+
run: mvn -B clean verify --file pom.xml --no-transfer-progress

common/pom.xml

+9
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@
2929
<groupId>org.jetbrains.kotlinx</groupId>
3030
<artifactId>kotlinx-coroutines-core</artifactId>
3131
</dependency>
32+
<dependency>
33+
<groupId>org.neo4j</groupId>
34+
<artifactId>caniuse-core</artifactId>
35+
</dependency>
3236
<dependency>
3337
<groupId>org.apache.kafka</groupId>
3438
<artifactId>kafka-clients</artifactId>
@@ -54,6 +58,11 @@
5458
<artifactId>kotlin-test-junit</artifactId>
5559
<scope>test</scope>
5660
</dependency>
61+
<dependency>
62+
<groupId>org.junit.jupiter</groupId>
63+
<artifactId>junit-jupiter</artifactId>
64+
<scope>test</scope>
65+
</dependency>
5766
<dependency>
5867
<groupId>org.mockito</groupId>
5968
<artifactId>mockito-core</artifactId>

common/src/main/kotlin/streams/service/sink/strategy/CUDIngestionStrategy.kt

+14-10
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.service.sink.strategy
22

3+
import org.neo4j.caniuse.CanIUse.canIUse
4+
import org.neo4j.caniuse.Cypher
5+
import org.neo4j.caniuse.Neo4j
36
import streams.events.EntityType
47
import streams.extensions.quote
58
import streams.utils.JSONUtils
@@ -25,7 +28,7 @@ data class CUDNode(override val op: CUDOperations,
2528
val detach: Boolean = true,
2629
val labels: List<String> = emptyList()): CUD() {
2730
override val type = EntityType.node
28-
31+
2932
fun toMap(): Map<String, Any> {
3033
return when (op) {
3134
CUDOperations.delete -> mapOf("ids" to ids)
@@ -59,7 +62,8 @@ data class CUDRelationship(override val op: CUDOperations,
5962
}
6063

6164

62-
class CUDIngestionStrategy: IngestionStrategy {
65+
class CUDIngestionStrategy(private val neo4j: Neo4j): IngestionStrategy {
66+
private val cypherPrefix = if (canIUse(Cypher.explicitCypher5Selection()).withNeo4j(neo4j)) "CYPHER 5 " else ""
6367

6468
companion object {
6569
@JvmStatic val ID_KEY = "ids"
@@ -87,14 +91,14 @@ class CUDIngestionStrategy: IngestionStrategy {
8791
}
8892

8993
private fun buildNodeCreateStatement(labels: List<String>): String = """
90-
|${StreamsUtils.UNWIND}
94+
|${cypherPrefix}${StreamsUtils.UNWIND}
9195
|CREATE (n${getLabelsAsString(labels)})
9296
|SET n = event.properties
9397
""".trimMargin()
9498

9599
private fun buildRelCreateStatement(from: NodeRelMetadata, to: NodeRelMetadata,
96100
rel_type: String): String = """
97-
|${StreamsUtils.UNWIND}
101+
|${cypherPrefix}${StreamsUtils.UNWIND}
98102
|${buildNodeLookupByIds(keyword = from.getOperation(), ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
99103
|${StreamsUtils.WITH_EVENT_FROM}
100104
|${buildNodeLookupByIds(keyword = to.getOperation(), ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
@@ -103,14 +107,14 @@ class CUDIngestionStrategy: IngestionStrategy {
103107
""".trimMargin()
104108

105109
private fun buildNodeMergeStatement(labels: List<String>, ids: Set<String>): String = """
106-
|${StreamsUtils.UNWIND}
110+
|${cypherPrefix}${StreamsUtils.UNWIND}
107111
|${buildNodeLookupByIds(keyword = "MERGE", ids = ids, labels = labels)}
108112
|SET n += event.properties
109113
""".trimMargin()
110114

111115
private fun buildRelMergeStatement(from: NodeRelMetadata, to: NodeRelMetadata,
112116
rel_type: String): String = """
113-
|${StreamsUtils.UNWIND}
117+
|${cypherPrefix}${StreamsUtils.UNWIND}
114118
|${buildNodeLookupByIds(keyword = from.getOperation(), ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
115119
|${StreamsUtils.WITH_EVENT_FROM}
116120
|${buildNodeLookupByIds(keyword = to.getOperation(), ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
@@ -119,29 +123,29 @@ class CUDIngestionStrategy: IngestionStrategy {
119123
""".trimMargin()
120124

121125
private fun buildNodeUpdateStatement(labels: List<String>, ids: Set<String>): String = """
122-
|${StreamsUtils.UNWIND}
126+
|${cypherPrefix}${StreamsUtils.UNWIND}
123127
|${buildNodeLookupByIds(ids = ids, labels = labels)}
124128
|SET n += event.properties
125129
""".trimMargin()
126130

127131
private fun buildRelUpdateStatement(from: NodeRelMetadata, to: NodeRelMetadata,
128132
rel_type: String): String = """
129-
|${StreamsUtils.UNWIND}
133+
|${cypherPrefix}${StreamsUtils.UNWIND}
130134
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
131135
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
132136
|MATCH ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)
133137
|SET r += event.properties
134138
""".trimMargin()
135139

136140
private fun buildDeleteStatement(labels: List<String>, ids: Set<String>, detach: Boolean): String = """
137-
|${StreamsUtils.UNWIND}
141+
|${cypherPrefix}${StreamsUtils.UNWIND}
138142
|${buildNodeLookupByIds(ids = ids, labels = labels)}
139143
|${if (detach) "DETACH " else ""}DELETE n
140144
""".trimMargin()
141145

142146
private fun buildRelDeleteStatement(from: NodeRelMetadata, to: NodeRelMetadata,
143147
rel_type: String): String = """
144-
|${StreamsUtils.UNWIND}
148+
|${cypherPrefix}${StreamsUtils.UNWIND}
145149
|${buildNodeLookupByIds(ids = from.ids, labels = from.labels, identifier = FROM_KEY, field = FROM_KEY)}
146150
|${buildNodeLookupByIds(ids = to.ids, labels = to.labels, identifier = TO_KEY, field = TO_KEY)}
147151
|MATCH ($FROM_KEY)-[r:${rel_type.quote()}]->($TO_KEY)

common/src/main/kotlin/streams/service/sink/strategy/CypherTemplateStrategy.kt

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package streams.service.sink.strategy
22

3+
import org.neo4j.caniuse.CanIUse.canIUse
4+
import org.neo4j.caniuse.Cypher
5+
import org.neo4j.caniuse.Neo4j
36
import streams.service.StreamsSinkEntity
47
import streams.utils.StreamsUtils
58

6-
class CypherTemplateStrategy(query: String): IngestionStrategy {
7-
private val fullQuery = "${StreamsUtils.UNWIND} $query"
9+
class CypherTemplateStrategy(neo4j: Neo4j, query: String) : IngestionStrategy {
10+
private val cypherPrefix = if (canIUse(Cypher.explicitCypher5Selection()).withNeo4j(neo4j)) "CYPHER 5 " else ""
11+
private val fullQuery = "${cypherPrefix}${StreamsUtils.UNWIND} $query"
12+
813
override fun mergeNodeEvents(events: Collection<StreamsSinkEntity>): List<QueryEvents> {
914
return listOf(QueryEvents(fullQuery, events.mapNotNull { it.value as? Map<String, Any> }))
1015
}

common/src/main/kotlin/streams/service/sink/strategy/NodePatternIngestionStrategy.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.service.sink.strategy
22

3+
import org.neo4j.caniuse.CanIUse.canIUse
4+
import org.neo4j.caniuse.Cypher
5+
import org.neo4j.caniuse.Neo4j
36
import streams.extensions.flatten
47
import streams.utils.JSONUtils
58
import streams.service.StreamsSinkEntity
@@ -8,10 +11,11 @@ import streams.utils.IngestionUtils.getLabelsAsString
811
import streams.utils.IngestionUtils.getNodeMergeKeys
912
import streams.utils.StreamsUtils
1013

11-
class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePatternConfiguration): IngestionStrategy {
14+
class NodePatternIngestionStrategy(neo4j: Neo4j, private val nodePatternConfiguration: NodePatternConfiguration): IngestionStrategy {
15+
private val cypherPrefix = if (canIUse(Cypher.explicitCypher5Selection()).withNeo4j(neo4j)) "CYPHER 5 " else ""
1216

1317
private val mergeNodeTemplate: String = """
14-
|${StreamsUtils.UNWIND}
18+
|${cypherPrefix}${StreamsUtils.UNWIND}
1519
|MERGE (n${getLabelsAsString(nodePatternConfiguration.labels)}{${
1620
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
1721
}})
@@ -20,7 +24,7 @@ class NodePatternIngestionStrategy(private val nodePatternConfiguration: NodePat
2024
""".trimMargin()
2125

2226
private val deleteNodeTemplate: String = """
23-
|${StreamsUtils.UNWIND}
27+
|${cypherPrefix}${StreamsUtils.UNWIND}
2428
|MATCH (n${getLabelsAsString(nodePatternConfiguration.labels)}{${
2529
getNodeMergeKeys("keys", nodePatternConfiguration.keys)
2630
}})

common/src/main/kotlin/streams/service/sink/strategy/RelationshipPatternIngestionStrategy.kt

+7-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.service.sink.strategy
22

3+
import org.neo4j.caniuse.CanIUse.canIUse
4+
import org.neo4j.caniuse.Cypher
5+
import org.neo4j.caniuse.Neo4j
36
import streams.extensions.flatten
47
import streams.utils.JSONUtils
58
import streams.service.StreamsSinkEntity
@@ -8,10 +11,11 @@ import streams.utils.IngestionUtils.getLabelsAsString
811
import streams.utils.IngestionUtils.getNodeMergeKeys
912
import streams.utils.StreamsUtils
1013

11-
class RelationshipPatternIngestionStrategy(private val relationshipPatternConfiguration: RelationshipPatternConfiguration): IngestionStrategy {
14+
class RelationshipPatternIngestionStrategy(neo4j: Neo4j, private val relationshipPatternConfiguration: RelationshipPatternConfiguration): IngestionStrategy {
15+
private val cypherPrefix = if (canIUse(Cypher.explicitCypher5Selection()).withNeo4j(neo4j)) "CYPHER 5 " else ""
1216

1317
private val mergeRelationshipTemplate: String = """
14-
|${StreamsUtils.UNWIND}
18+
|${cypherPrefix}${StreamsUtils.UNWIND}
1519
|MERGE (start${getLabelsAsString(relationshipPatternConfiguration.start.labels)}{${
1620
getNodeMergeKeys("start.keys", relationshipPatternConfiguration.start.keys)
1721
}})
@@ -27,7 +31,7 @@ class RelationshipPatternIngestionStrategy(private val relationshipPatternConfig
2731
""".trimMargin()
2832

2933
private val deleteRelationshipTemplate: String = """
30-
|${StreamsUtils.UNWIND}
34+
|${cypherPrefix}${StreamsUtils.UNWIND}
3135
|MATCH (start${getLabelsAsString(relationshipPatternConfiguration.start.labels)}{${
3236
getNodeMergeKeys("start.keys", relationshipPatternConfiguration.start.keys)
3337
}})

common/src/main/kotlin/streams/service/sink/strategy/SchemaIngestionStrategy.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package streams.service.sink.strategy
22

3+
import org.neo4j.caniuse.CanIUse.canIUse
4+
import org.neo4j.caniuse.Cypher
5+
import org.neo4j.caniuse.Neo4j
36
import streams.events.*
47
import streams.extensions.quote
58
import streams.service.StreamsSinkEntity
@@ -10,7 +13,8 @@ import streams.utils.SchemaUtils.toStreamsTransactionEvent
1013
import streams.utils.StreamsUtils
1114

1215

13-
class SchemaIngestionStrategy: IngestionStrategy {
16+
class SchemaIngestionStrategy(neo4j: Neo4j): IngestionStrategy {
17+
private val cypherPrefix = if (canIUse(Cypher.explicitCypher5Selection()).withNeo4j(neo4j)) "CYPHER 5 " else ""
1418

1519
private fun prepareRelationshipEvents(events: List<StreamsTransactionEvent>, withProperties: Boolean = true): Map<RelationshipSchemaMetadata, List<Map<String, Any>>> = events
1620
.mapNotNull {
@@ -77,7 +81,7 @@ class SchemaIngestionStrategy: IngestionStrategy {
7781
.map {
7882
val label = it.key.label.quote()
7983
val query = """
80-
|${StreamsUtils.UNWIND}
84+
|${cypherPrefix}${StreamsUtils.UNWIND}
8185
|MERGE (start${getLabelsAsString(it.key.startLabels)}{${getNodeKeysAsString("start", it.key.startKeys)}})
8286
|MERGE (end${getLabelsAsString(it.key.endLabels)}{${getNodeKeysAsString("end", it.key.endKeys)}})
8387
|MERGE (start)-[r:$label]->(end)
@@ -94,7 +98,7 @@ class SchemaIngestionStrategy: IngestionStrategy {
9498
.map {
9599
val label = it.key.label.quote()
96100
val query = """
97-
|${StreamsUtils.UNWIND}
101+
|${cypherPrefix}${StreamsUtils.UNWIND}
98102
|MATCH (start${getLabelsAsString(it.key.startLabels)}{${getNodeKeysAsString("start", it.key.startKeys)}})
99103
|MATCH (end${getLabelsAsString(it.key.endLabels)}{${getNodeKeysAsString("end", it.key.endKeys)}})
100104
|MATCH (start)-[r:$label]->(end)
@@ -121,7 +125,7 @@ class SchemaIngestionStrategy: IngestionStrategy {
121125
val labels = it.key.mapNotNull { it.label }
122126
val nodeKeys = it.key.flatMap { it.properties }.toSet()
123127
val query = """
124-
|${StreamsUtils.UNWIND}
128+
|${cypherPrefix}${StreamsUtils.UNWIND}
125129
|MATCH (n${getLabelsAsString(labels)}{${getNodeKeysAsString(keys = nodeKeys)}})
126130
|DETACH DELETE n
127131
""".trimMargin()
@@ -165,7 +169,7 @@ class SchemaIngestionStrategy: IngestionStrategy {
165169
.groupBy({ it.first }, { it.second })
166170
.map { map ->
167171
var query = """
168-
|${StreamsUtils.UNWIND}
172+
|${cypherPrefix}${StreamsUtils.UNWIND}
169173
|MERGE (n${getLabelsAsString(map.key.constraints.mapNotNull { it.label })}{${getNodeKeysAsString(keys = map.key.keys)}})
170174
|SET n = event.properties
171175
""".trimMargin()

0 commit comments

Comments
 (0)