Skip to content

Commit 935fa92

Browse files
conker84jexp
authored andcommitted
Fixes #86: Remove unrestricted access for the procedure (#87)
1 parent 01d2cfd commit 935fa92

File tree

11 files changed

+460
-412
lines changed

11 files changed

+460
-412
lines changed

doc/asciidoc/producer/index.adoc

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,6 @@ include::patterns.adoc[]
1313
=== Procedures
1414

1515
The producer comes out with a list of procedures.
16-
Because of they use internal Neo4j API you must allow them with the following configuration paramater
17-
18-
dbms.security.procedures.unrestricted=streams.*
19-
20-
If you are using them via Docker, pass this simple environment parameter:
21-
22-
NEO4J_dbms_security_procedures_unrestricted: streams.*
23-
2416

2517
==== streams.publish
2618

@@ -53,6 +45,8 @@ Input Parameters:
5345

5446
You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.
5547

48+
In case of nodes or relationships if the topic is defined in the patterns provided by the configuration their properties will be filtered in according with the configuration.
49+
5650
=== Transaction Event Handler
5751

5852
The transaction event handler is the core of the Stream Producer and allows to stream database changes.

producer/src/main/kotlin/streams/Extensions.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ fun Relationship.toMap(): Map<String, Any?> {
1616
"start" to RelationshipNodeChange(startNode.id.toString(), startNode.labelNames()),
1717
"end" to RelationshipNodeChange(endNode.id.toString(), endNode.labelNames()),
1818
"type" to EntityType.relationship)
19+
}
20+
21+
fun Node.labelNames() : List<String> {
22+
return this.labels.map { it.name() }
1923
}

producer/src/main/kotlin/streams/StreamsExtensionFactory.kt

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
package streams
22

3-
import org.neo4j.graphdb.Node
43
import org.neo4j.kernel.configuration.Config
54
import org.neo4j.kernel.extension.KernelExtensionFactory
6-
import org.neo4j.kernel.impl.core.EmbeddedProxySPI
7-
import org.neo4j.kernel.impl.core.GraphProperties
85
import org.neo4j.kernel.impl.logging.LogService
96
import org.neo4j.kernel.impl.spi.KernelContext
107
import org.neo4j.kernel.internal.GraphDatabaseAPI
118
import org.neo4j.kernel.lifecycle.Lifecycle
129
import org.neo4j.kernel.lifecycle.LifecycleAdapter
10+
import streams.procedures.StreamsProcedures
1311

14-
class StreamsExtensionFactory : KernelExtensionFactory<StreamsExtensionFactory.Dependencies>("StreamsProcedures") {
12+
class StreamsExtensionFactory : KernelExtensionFactory<StreamsExtensionFactory.Dependencies>("Streams.Producer") {
1513
override fun newInstance(context: KernelContext, dependencies: Dependencies): Lifecycle {
1614
val db = dependencies.graphdatabaseAPI()
1715
val log = dependencies.log()
@@ -36,6 +34,8 @@ class StreamsEventRouterLifecycle(val db: GraphDatabaseAPI, val streamHandler: S
3634

3735
override fun start() {
3836
try {
37+
StreamsProcedures.registerEventRouter(eventRouter = streamHandler)
38+
StreamsProcedures.registerEventRouterConfiguration(eventRouterConfiguration = streamsEventRouterConfiguration)
3939
streamHandler.start()
4040
registerTransactionEventHandler()
4141
} catch (e: Exception) {
@@ -58,7 +58,3 @@ class StreamsEventRouterLifecycle(val db: GraphDatabaseAPI, val streamHandler: S
5858
streamHandler.stop()
5959
}
6060
}
61-
62-
fun Node.labelNames() : List<String> {
63-
return this.labels.map { it.name() }
64-
}

producer/src/main/kotlin/streams/StreamsTransactionEventHandler.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class StreamsTransactionEventHandler(val router : StreamsEventRouter, val config
3333
.build()
3434
val schema = SchemaBuilder().build()
3535

36-
val builder = StreamsEventBuilder()
36+
val builder = StreamsTransactionEventBuilder()
3737
.withMeta(meta)
3838
.withPayload(payload)
3939
.withSchema(schema)

producer/src/main/kotlin/streams/events/StreamsEventBuilder.kt

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
package streams.events
22

3+
import org.neo4j.graphdb.Node
4+
import org.neo4j.graphdb.Path
5+
import org.neo4j.graphdb.Relationship
6+
import streams.NodeRoutingConfiguration
7+
import streams.RelationshipRoutingConfiguration
8+
import streams.toMap
9+
310

411
class StreamsEventMetaBuilder(){
512

@@ -168,28 +175,98 @@ class SchemaBuilder() {
168175
}
169176
}
170177

171-
class StreamsEventBuilder(){
178+
class StreamsTransactionEventBuilder {
172179

173180
private var meta: Meta? = null
174181
private var payload: Payload? = null
175182
private var schema: Schema? = null
176183

177-
fun withMeta(meta : Meta): StreamsEventBuilder{
184+
fun withMeta(meta : Meta): StreamsTransactionEventBuilder {
178185
this.meta = meta
179186
return this
180187
}
181188

182-
fun withPayload(payload : Payload) : StreamsEventBuilder{
189+
fun withPayload(payload : Payload): StreamsTransactionEventBuilder {
183190
this.payload = payload
184191
return this
185192
}
186193

187-
fun withSchema(schema : Schema) : StreamsEventBuilder{
194+
fun withSchema(schema : Schema): StreamsTransactionEventBuilder {
188195
this.schema = schema
189196
return this
190197
}
191198

192-
fun build() : StreamsTransactionEvent{
199+
fun build(): StreamsTransactionEvent {
193200
return StreamsTransactionEvent(meta!!, payload!!, schema!!)
194201
}
202+
}
203+
204+
class StreamsEventBuilder {
205+
206+
private lateinit var payload: Any
207+
private lateinit var topic: String
208+
private var nodeRoutingConfiguration: NodeRoutingConfiguration? = null
209+
private var relationshipRoutingConfiguration: RelationshipRoutingConfiguration? = null
210+
211+
fun withPayload(payload: Any): StreamsEventBuilder {
212+
this.payload = payload
213+
return this
214+
}
215+
216+
fun withTopic(topic: String): StreamsEventBuilder {
217+
this.topic = topic
218+
return this
219+
}
220+
221+
fun withNodeRoutingConfiguration(nodeRoutingConfiguration: NodeRoutingConfiguration?): StreamsEventBuilder {
222+
this.nodeRoutingConfiguration = nodeRoutingConfiguration
223+
return this
224+
}
225+
226+
fun withRelationshipRoutingConfiguration(relationshipRoutingConfiguration: RelationshipRoutingConfiguration?): StreamsEventBuilder {
227+
this.relationshipRoutingConfiguration = relationshipRoutingConfiguration
228+
return this
229+
}
230+
231+
private fun buildPayload(topic: String, payload: Any?): Any? {
232+
if (payload == null) {
233+
return null
234+
}
235+
return when (payload) {
236+
is Node -> {
237+
if (nodeRoutingConfiguration != null) {
238+
nodeRoutingConfiguration!!.filter(payload)
239+
} else {
240+
payload.toMap()
241+
}
242+
}
243+
is Relationship -> {
244+
if (relationshipRoutingConfiguration != null) {
245+
relationshipRoutingConfiguration!!.filter(payload)
246+
} else {
247+
payload.toMap()
248+
}
249+
}
250+
is Path -> {
251+
val length = payload.length()
252+
val rels = payload.relationships().map { buildPayload(topic, it) }
253+
val nodes = payload.nodes().map { buildPayload(topic, it) }
254+
mapOf("length" to length, "rels" to rels, "nodes" to nodes)
255+
}
256+
is Map<*, *> -> {
257+
payload.mapValues { buildPayload(topic, it.value) }
258+
}
259+
is List<*> -> {
260+
payload.map { buildPayload(topic, it) }
261+
}
262+
else -> {
263+
payload
264+
}
265+
}
266+
}
267+
268+
fun build(): StreamsEvent {
269+
return StreamsEvent(buildPayload(topic, payload)!!)
270+
}
271+
195272
}
Lines changed: 38 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,71 +1,55 @@
11
package streams.procedures
22

3-
import org.neo4j.graphdb.Node
4-
import org.neo4j.graphdb.Path
5-
import org.neo4j.graphdb.Relationship
6-
import org.neo4j.kernel.internal.GraphDatabaseAPI
3+
import org.apache.commons.lang3.StringUtils
4+
import org.neo4j.logging.Log
75
import org.neo4j.procedure.*
8-
import streams.StreamsEventRouterLifecycle
9-
import streams.events.StreamsEvent
10-
import streams.toMap
6+
import streams.StreamsEventRouter
7+
import streams.StreamsEventRouterConfiguration
8+
import streams.events.StreamsEventBuilder
119

1210
class StreamsProcedures {
1311

14-
@JvmField @Context var db: GraphDatabaseAPI? = null
15-
12+
@JvmField @Context var log: Log? = null
1613

1714
@Procedure(mode = Mode.SCHEMA, name = "streams.publish")
1815
@Description("streams.publish(topic, config) - Allows custom streaming from Neo4j to the configured stream environment")
19-
fun publish(@Name("topic") topic: String, @Name("payload") payload: Any,
16+
fun publish(@Name("topic") topic: String?, @Name("payload") payload: Any?,
2017
@Name(value = "config", defaultValue = "{}") config: Map<String, Any>?) {
21-
val newPayload = buildPayload(topic, payload)
22-
val streamsEvent = StreamsEvent(newPayload!!)
23-
getStreamsEventRouterLifecycle()?.streamHandler?.sendEvents(topic, listOf(streamsEvent))
18+
19+
if (topic == null || topic == StringUtils.EMPTY) {
20+
log?.info("Topic empty, no message sent")
21+
return
22+
}
23+
if (payload == null) {
24+
log?.info("Payload empty, no message sent")
25+
return
26+
}
27+
val streamsEvent = StreamsEventBuilder()
28+
.withPayload(payload)
29+
.withNodeRoutingConfiguration(StreamsProcedures.eventRouterConfiguration
30+
.nodeRouting
31+
.filter { it.topic == topic }
32+
.firstOrNull())
33+
.withRelationshipRoutingConfiguration(StreamsProcedures.eventRouterConfiguration
34+
.relRouting
35+
.filter { it.topic == topic }
36+
.firstOrNull())
37+
.withTopic(topic)
38+
.build()
39+
StreamsProcedures.eventRouter.sendEvents(topic, listOf(streamsEvent))
2440
}
2541

26-
private fun getStreamsEventRouterLifecycle() =
27-
db?.dependencyResolver?.resolveDependency(StreamsEventRouterLifecycle::class.java)
42+
companion object {
43+
private lateinit var eventRouter: StreamsEventRouter
44+
private lateinit var eventRouterConfiguration: StreamsEventRouterConfiguration
2845

29-
fun buildPayload(topic: String, payload: Any?): Any? {
30-
if (payload == null) {
31-
return null
46+
fun registerEventRouter(eventRouter: StreamsEventRouter) {
47+
this.eventRouter = eventRouter
3248
}
33-
return when (payload) {
34-
is Node -> {
35-
val routingConfiguration = getStreamsEventRouterLifecycle()?.streamsEventRouterConfiguration?.nodeRouting
36-
?.filter { it.topic == topic}
37-
?.firstOrNull()
38-
if (routingConfiguration != null) {
39-
routingConfiguration.filter(payload)
40-
} else {
41-
payload.toMap()
42-
}
43-
}
44-
is Relationship -> {
45-
val routingConfiguration = getStreamsEventRouterLifecycle()?.streamsEventRouterConfiguration?.relRouting
46-
?.filter { it.topic == topic}
47-
?.firstOrNull()
48-
if (routingConfiguration != null) {
49-
routingConfiguration.filter(payload)
50-
} else {
51-
payload.toMap()
52-
}
53-
}
54-
is Path -> {
55-
val length = payload.length()
56-
val rels = payload.relationships().map { buildPayload(topic, it) }
57-
val nodes = payload.nodes().map { buildPayload(topic, it) }
58-
mapOf("length" to length, "rels" to rels, "nodes" to nodes)
59-
}
60-
is Map<*, *> -> {
61-
payload.mapValues { buildPayload(topic, it.value) }
62-
}
63-
is List<*> -> {
64-
payload.map { buildPayload(topic, it) }
65-
}
66-
else -> {
67-
payload
68-
}
49+
50+
fun registerEventRouterConfiguration(eventRouterConfiguration: StreamsEventRouterConfiguration) {
51+
this.eventRouterConfiguration = eventRouterConfiguration
6952
}
7053
}
54+
7155
}

producer/src/test/kotlin/streams/RoutingConfigurationTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ class RoutingConfigurationTest {
129129
fun shouldFilterAndRouteNodeEvents() {
130130
// TODO add more tests like a Label removed
131131
// Given
132-
val streamsEvent = StreamsEventBuilder()
132+
val streamsEvent = StreamsTransactionEventBuilder()
133133
.withMeta(StreamsEventMetaBuilder()
134134
.withOperation(OperationType.created)
135135
.withTimestamp(System.currentTimeMillis())
@@ -191,7 +191,7 @@ class RoutingConfigurationTest {
191191
@Test
192192
fun shouldFilterAndRouteRelationshipEvents() {
193193
// Given
194-
val streamsEvent = StreamsEventBuilder()
194+
val streamsEvent = StreamsTransactionEventBuilder()
195195
.withMeta(StreamsEventMetaBuilder()
196196
.withOperation(OperationType.created)
197197
.withTimestamp(System.currentTimeMillis())

0 commit comments

Comments
 (0)