Skip to content

Commit 01d2cfd

Browse files
conker84jexp
authored andcommitted
fixes #84: Add a stream.publish(topic, payload) procedure (#85)
1 parent cf28724 commit 01d2cfd

27 files changed

+1099
-262
lines changed

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTop
1313
return
1414
}
1515
if(log.isDebugEnabled){
16-
1716
log.debug("Processing ${params.size} events from Kafka")
1817
}
1918
db.execute("$UNWIND $cypherQuery", mapOf("events" to params)).close()

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ class KafkaEventSink: StreamsEventSink {
2727

2828
private lateinit var job: Job
2929
private lateinit var queryExecution: StreamsEventSinkQueryExecution
30-
private lateinit var kafkaConsumer: KafkaConsumer<Long, ByteArray>
30+
private lateinit var kafkaConsumer: KafkaConsumer<String, ByteArray>
3131

3232
override var streamsTopicService: StreamsTopicService? = null
3333

@@ -84,7 +84,7 @@ class KafkaEventSink: StreamsEventSink {
8484
}
8585
}
8686

87-
private fun consume(records: ConsumerRecords<Long, ByteArray>) {
87+
private fun consume(records: ConsumerRecords<String, ByteArray>) {
8888
streamsTopicService!!.getTopics().forEach {
8989
if (log.isDebugEnabled) {
9090
log.debug("Reading data from topic $it")

consumer/src/main/kotlin/streams/kafka/KafkaSinkConfiguration.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package streams.kafka
33
import org.apache.kafka.clients.consumer.ConsumerConfig
44
import org.apache.kafka.common.serialization.ByteArrayDeserializer
55
import org.apache.kafka.common.serialization.LongDeserializer
6+
import org.apache.kafka.common.serialization.StringDeserializer
67
import org.codehaus.jackson.map.ObjectMapper
78
import org.neo4j.kernel.configuration.Config
89
import streams.StreamsSinkConfiguration
@@ -67,7 +68,7 @@ data class KafkaSinkConfiguration(val zookeeperConnect: String = "localhost:2181
6768

6869
private fun addDeserializers() : Properties {
6970
val props = Properties()
70-
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = LongDeserializer::class.java
71+
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
7172
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = ByteArrayDeserializer::class.java
7273
return props
7374
}

consumer/src/test/kotlin/integrations/KafkaEventSinkIT.kt

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
1010
import org.apache.kafka.clients.producer.ProducerConfig
1111
import org.apache.kafka.clients.producer.ProducerRecord
1212
import org.apache.kafka.common.serialization.ByteArraySerializer
13-
import org.apache.kafka.common.serialization.LongSerializer
13+
import org.apache.kafka.common.serialization.StringSerializer
1414
import org.codehaus.jackson.map.ObjectMapper
1515
import org.junit.*
1616
import org.junit.rules.TestName
@@ -59,7 +59,7 @@ class KafkaEventSinkIT {
5959

6060
private val kafkaProperties = Properties()
6161

62-
private lateinit var kafkaProducer: KafkaProducer<Long, ByteArray>
62+
private lateinit var kafkaProducer: KafkaProducer<String, ByteArray>
6363

6464
// Test data
6565
private val dataProperties = mapOf("prop1" to "foo", "bar" to 1)
@@ -77,7 +77,7 @@ class KafkaEventSinkIT {
7777

7878
kafkaProperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
7979
kafkaProperties["group.id"] = "neo4j"
80-
kafkaProperties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = LongSerializer::class.java
80+
kafkaProperties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
8181
kafkaProperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
8282

8383
AdminClient.create(kafkaProperties).use { client -> client.createTopics(
@@ -94,7 +94,8 @@ class KafkaEventSinkIT {
9494
fun shouldWriteDataFromSink() = runBlocking {
9595
val job = GlobalScope.launch {
9696
var partition = ThreadLocalRandom.current().nextInt(1)
97-
var producerRecord = ProducerRecord(topics[0], partition, System.currentTimeMillis(), 1L,
97+
var producerRecord = ProducerRecord(topics[0], partition, System.currentTimeMillis(),
98+
UUID.randomUUID().toString(),
9899
objectMapper.writeValueAsBytes(data))
99100
kafkaProducer.send(producerRecord).get()
100101
delay(5000)
@@ -126,7 +127,8 @@ class KafkaEventSinkIT {
126127
fun shouldNotWriteDataFromSinkWithNoTopicLoaded() = runBlocking {
127128
val job = GlobalScope.launch {
128129
var partition = ThreadLocalRandom.current().nextInt(1)
129-
var producerRecord = ProducerRecord(topics[0], partition, System.currentTimeMillis(), 1L,
130+
var producerRecord = ProducerRecord(topics[0], partition, System.currentTimeMillis(),
131+
UUID.randomUUID().toString(),
130132
objectMapper.writeValueAsBytes(data))
131133
kafkaProducer.send(producerRecord).get()
132134
delay(5000)

doc/asciidoc/producer/index.adoc

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,62 @@ include::configuration.adoc[]
1010

1111
include::patterns.adoc[]
1212

13-
=== Events
13+
=== Procedures
14+
15+
The producer comes out with a list of procedures.
16+
Because of they use internal Neo4j API you must allow them with the following configuration paramater
17+
18+
dbms.security.procedures.unrestricted=streams.*
19+
20+
If you are using them via Docker, pass this simple environment parameter:
21+
22+
NEO4J_dbms_security_procedures_unrestricted: streams.*
23+
24+
25+
==== streams.publish
26+
27+
This procedure allows custom message streaming from Neo4j to the configured environment
28+
29+
Uses:
30+
31+
`CALL streams.publish('my-topic', 'Hello World from Neo4j!')`
32+
33+
The message retrieved from the Consumer is the following:
34+
35+
`{"payload":"Hello world from Neo4j!"}`
36+
37+
Input Parameters:
38+
39+
[cols="3*",options="header"]
40+
|===
41+
|Variable Name
42+
|Type
43+
|Description
44+
|`topic`
45+
|String
46+
|The topic where you want to publish the data
47+
48+
|`payload`
49+
|Object
50+
|The data that you want to stream
51+
52+
|===
53+
54+
You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.
55+
56+
=== Transaction Event Handler
57+
58+
The transaction event handler is the core of the Stream Producer and allows to stream database changes.
59+
60+
==== Events
1461

1562
The Producer streams three kind of events:
1663

1764
* *created*: when a node/relation/property is created
1865
* *updated*: when a node/relation/property is updated
1966
* *deleted*: when a node/relation/property is deleted
2067

21-
==== Created
68+
===== Created
2269

2370
Following an example of the node creation event:
2471

@@ -30,7 +77,7 @@ include::data/node.created.json[]
3077
include::data/relationship.created.json[]
3178
```
3279

33-
==== Updated
80+
===== Updated
3481

3582
Following an example of the node update event:
3683

@@ -42,7 +89,7 @@ include::data/node.updated.json[]
4289
include::data/relationship.updated.json[]
4390
```
4491

45-
==== Deleted
92+
===== Deleted
4693

4794
Following an example of the node creation event:
4895

@@ -54,7 +101,7 @@ include::data/node.deleted.json[]
54101
include::data/relationship.deleted.json[]
55102
```
56103

57-
=== Meta
104+
==== Meta
58105

59106
The *meta* field contains the metadata related to the transaction event:
60107

@@ -93,7 +140,7 @@ The *meta* field contains the metadata related to the transaction event:
93140
|Contains the information about the source
94141
|===
95142

96-
==== Source
143+
===== Source
97144

98145
[cols="3*",options="header"]
99146
|===
@@ -107,7 +154,7 @@ The *meta* field contains the metadata related to the transaction event:
107154
|===
108155

109156

110-
=== Payload
157+
==== Payload
111158

112159
The *payload* field contains the information about the the data related to the event
113160

@@ -134,11 +181,11 @@ The *payload* field contains the information about the the data related to the e
134181
|The data after the transaction event
135182
|===
136183

137-
==== Payload: before and after
184+
===== Payload: before and after
138185

139186
We must distinguish two cases:
140187

141-
===== Nodes
188+
====== Nodes
142189

143190
[cols="3*",options="header"]
144191
|===
@@ -155,7 +202,7 @@ We must distinguish two cases:
155202
|List of properties attached to the node, the Key is the property name
156203
|===
157204

158-
===== Relationships
205+
====== Relationships
159206

160207
[cols="3*",options="header"]
161208
|===
@@ -198,7 +245,7 @@ We must distinguish two cases:
198245
|===
199246

200247

201-
=== Schema
248+
==== Schema
202249

203250
[cols="3*",options="header"]
204251
|===
@@ -215,7 +262,7 @@ We must distinguish two cases:
215262
|The schema after the transaction event
216263
|===
217264

218-
==== Schema: before and after
265+
===== Schema: before and after
219266

220267
[cols="3*",options="header"]
221268
|===
@@ -234,7 +281,7 @@ We must distinguish two cases:
234281

235282
We must distinguish two cases:
236283

237-
===== Nodes
284+
====== Nodes
238285

239286
[cols="3*",options="header"]
240287
|===
@@ -253,8 +300,9 @@ We must distinguish two cases:
253300

254301
====== Constraints
255302

256-
A node can have a list of constraints attached to it:
303+
Nodes and Relationships can have a list of constraints attached to them:
257304

305+
.Node Constraints
258306
[cols="3*",options="header"]
259307
|===
260308
|Field
@@ -274,8 +322,7 @@ A node can have a list of constraints attached to it:
274322
|List of node properties involved in the constraint
275323
|===
276324

277-
===== Relationships
278-
325+
.Relationship constraints
279326
[cols="3*",options="header"]
280327
|===
281328
|Field

performance/README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ The creation query used by the tester:
4545
The acquisition query:
4646

4747
```properties
48-
streams.sink.topic.cypher.neo4j=WITH event.value.payload AS payload, event.value.meta AS meta CALL apoc.do.case( [\
48+
streams.sink.topic.cypher.neo4j=WITH event.payload AS payload, event.meta AS meta CALL apoc.do.case( [\
4949

5050
payload.type = 'node' AND meta.operation = 'created', \
5151

@@ -100,10 +100,10 @@ nodes = 1000
100100
To check the coniguration or to get a fast result you can run the command
101101

102102
```shell
103-
python3.6 neo4j-streams-pt.py --start
103+
python3 neo4j-streams-pt.py --start
104104
```
105105

106-
The output is a windows that show you the distribution of the test. See result session to better understand the values. You can use the option `--plot-out file.png` to not show the result but save it on file. The details of the execution are dumped on standard output as CSV or you can redirect it on file using the option `--csv-file file.csv`
106+
The output is a windows that show you the distribution of the test. See result session to better understand the values. You can use the option `--plot-out file.png` to not show the result but save it on file. The details of the execution are dumped on standard output as CSV or you can redirect it on file using the option `--csv-out file.csv`
107107

108108
### baseline
109109

@@ -114,13 +114,13 @@ ES: 3 = 3 * 1000 if 1000 is the value configured as *nodes* in the *[unit]* part
114114
To get the same result of *start*:
115115

116116
```shell
117-
python3.6 neo4j-streams-pt.py --baseline 1
117+
python3 neo4j-streams-pt.py --baseline 1
118118
```
119119

120120
The get more distribution (real case):
121121

122122
```shell
123-
python3.6 neo4j-streams-pt.py --baseline 1 10 100 1000 --plot-out results.png
123+
python3 neo4j-streams-pt.py --baseline 1 10 100 1000 --plot-out results.png
124124
```
125125

126126
This means the tester execute 5 (if *repeat = 5*) times each series. A series is comped by 1k, 10k, 100k, 1000k nodes (if *nodes = 1000*)

performance/docker-compose.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ services:
1212
- "8474:7474"
1313
- "8687:7687"
1414
volumes:
15-
- $HOME/app/neo4-streams/neo4j-community-3.4.7-producer/data:/data
16-
- $HOME/app/neo4-streams/neo4j-community-3.4.7-producer/plugins:/plugins
15+
- $HOME/app/neo4j-streams/neo4j-community-3.4.7-producer/data:/data
16+
- $HOME/app/neo4j-streams/neo4j-community-3.4.7-producer/plugins:/plugins
1717
environment:
1818
NEO4J_kafka_zookeeper_connect: zookeeper:2181
1919
NEO4J_kafka_bootstrap_servers: broker:9092
@@ -32,15 +32,15 @@ services:
3232
- "7474:7474"
3333
- "7687:7687"
3434
volumes:
35-
- $HOME/app/neo4-streams/neo4j-community-3.4.7-consumer/data:/data
36-
- $HOME/app/neo4-streams/neo4j-community-3.4.7-consumer/plugins:/plugins
35+
- $HOME/app/neo4j-streams/neo4j-community-3.4.7-consumer/data:/data
36+
- $HOME/app/neo4j-streams/neo4j-community-3.4.7-consumer/plugins:/plugins
3737
environment:
3838
NEO4J_kafka_zookeeper_connect: zookeeper:2181
3939
NEO4J_kafka_bootstrap_servers: broker:9092
4040
NEO4J_AUTH: neo4j/consumer
4141
NEO4J_dbms_memory_heap_max__size: 2G
4242
NEO4J_kafka_max_poll_records: 16384
43-
NEO4J_streams_sink_topic_cypher_neo4j: "WITH event.value.payload AS payload, event.value.meta AS meta CALL apoc.do.case( [
43+
NEO4J_streams_sink_topic_cypher_neo4j: "WITH event.payload AS payload, event.meta AS meta CALL apoc.do.case( [
4444
payload.type = 'node' AND meta.operation = 'created', \
4545
'CREATE (x:Performance {received_time: apoc.date.currentTimestamp()}) SET x+=props RETURN count(x)']
4646
,'RETURN 0',
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package streams
2+
3+
import org.neo4j.graphdb.Node
4+
import org.neo4j.graphdb.Relationship
5+
import streams.events.EntityType
6+
import streams.events.RelationshipNodeChange
7+
8+
fun Map<String,String>.getInt(name:String, defaultValue: Int) = this.get(name)?.toInt() ?: defaultValue
9+
10+
fun Node.toMap(): Map<String, Any?> {
11+
return mapOf("id" to id.toString(), "properties" to allProperties, "labels" to labelNames(), "type" to EntityType.node)
12+
}
13+
14+
fun Relationship.toMap(): Map<String, Any?> {
15+
return mapOf("id" to id.toString(), "properties" to allProperties, "label" to type,
16+
"start" to RelationshipNodeChange(startNode.id.toString(), startNode.labelNames()),
17+
"end" to RelationshipNodeChange(endNode.id.toString(), endNode.labelNames()),
18+
"type" to EntityType.relationship)
19+
}

0 commit comments

Comments
 (0)