Skip to content

Commit cf28724

Browse files
omarlarusjexp
authored andcommitted
Fix #42 - Performance test (#72)
1 parent 0ecc4b2 commit cf28724

File tree

11 files changed

+435
-3
lines changed

11 files changed

+435
-3
lines changed

consumer/src/main/kotlin/streams/StreamsEventSinkQueryExecution.kt

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package streams
33
import org.neo4j.kernel.internal.GraphDatabaseAPI
44
import org.neo4j.logging.Log
55

6-
class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTopicService, private val db: GraphDatabaseAPI) {
6+
class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTopicService, private val db: GraphDatabaseAPI, val log: Log) {
77

88
private val UNWIND: String = "UNWIND {events} AS event"
99

@@ -12,6 +12,10 @@ class StreamsEventSinkQueryExecution(private val streamsTopicService: StreamsTop
1212
if (cypherQuery == null) {
1313
return
1414
}
15+
if(log.isDebugEnabled){
16+
17+
log.debug("Processing ${params.size} events from Kafka")
18+
}
1519
db.execute("$UNWIND $cypherQuery", mapOf("events" to params)).close()
1620
}
1721

consumer/src/main/kotlin/streams/kafka/KafkaEventSink.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ class KafkaEventSink: StreamsEventSink {
4646
log.info("No topic configuration found under streams.sink.topic.*, Kafka Sink will not stared")
4747
return
4848
}
49-
this.queryExecution = StreamsEventSinkQueryExecution(this.streamsTopicService!!, db)
49+
50+
this.queryExecution = StreamsEventSinkQueryExecution(this.streamsTopicService!!, db, log)
5051
createConsumer();
52+
5153
job = createJob()
5254
log.info("Kafka Sink Connector started.")
5355
}

consumer/src/test/kotlin/streams/StreamsEventSinkQueryExecutionTest.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import org.junit.Before
55
import org.junit.Test
66
import org.neo4j.graphdb.GraphDatabaseService
77
import org.neo4j.kernel.internal.GraphDatabaseAPI
8+
import org.neo4j.logging.NullLog
89
import org.neo4j.test.TestGraphDatabaseFactory
910
import streams.kafka.KafkaSinkConfiguration
1011
import kotlin.test.assertEquals
@@ -21,7 +22,7 @@ class StreamsEventSinkQueryExecutionTest {
2122
val kafkaConfig = KafkaSinkConfiguration(streamsSinkConfiguration = StreamsSinkConfiguration(topics = mapOf("shouldWriteCypherQuery" to "MERGE (n:Label {id: event.id})\n" +
2223
" ON CREATE SET n += event.properties")))
2324
val streamsTopicService = StreamsTopicService(db as GraphDatabaseAPI, kafkaConfig.streamsSinkConfiguration)
24-
streamsEventSinkQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db as GraphDatabaseAPI)
25+
streamsEventSinkQueryExecution = StreamsEventSinkQueryExecution(streamsTopicService, db as GraphDatabaseAPI, NullLog.getInstance())
2526
}
2627

2728
@After

performance/README.md

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
# neo4j-streams performance test
2+
3+
This part of the project aims to give a standard way to understand the performances of the platform and highlight the configuration results.
4+
5+
Currently, it's available only the test of neo4j to neo4j configuration, in remote too.
6+
7+
## Setup
8+
9+
The basic unit of work used to test the performances needs a specific cypher part in order to get the timestamp of creation and receiving of the event. The elapsed time is calculated using these timestamps through cypher queries using:
10+
$$
11+
Elapsed = max(tc) - min(tp)
12+
$$
13+
where *tc* is the instant of insert in the cunsumer and *tp* the instant of insert in the producer.
14+
15+
### Kafka
16+
17+
Default kafka configuration, running on sigle instance on the same machine of the other components
18+
19+
- Kafka 2.11-0.10.1.1
20+
21+
### Producer
22+
23+
- neo4j 3.4.7 community edition
24+
- APOC 3.4.0.3
25+
- 8 GB heap memory
26+
- CREATE INDEX ON :Performance(group)
27+
- default neo4j-streams configuration (without filtering)
28+
- kafka.batch.size=16384
29+
30+
The creation query used by the tester:
31+
32+
UNWIND range(1,{msg}) as ran
33+
CREATE (n:Performance)
34+
SET n.group = {uuid}, n.creation_time = apoc.date.currentTimestamp()
35+
RETURN min(n.creation_time) as creation_time
36+
### Consumer
37+
38+
- neo4j 3.4.7 community edition
39+
- APOC 3.4.0.3
40+
- 2 GB heap memory
41+
- CREATE INDEX ON :Performance(group)
42+
- default neo4j-streams configuration
43+
- kafka.max.poll.records=16384
44+
45+
The acquisition query:
46+
47+
```properties
48+
streams.sink.topic.cypher.neo4j=WITH event.value.payload AS payload, event.value.meta AS meta CALL apoc.do.case( [\
49+
50+
payload.type = 'node' AND meta.operation = 'created', \
51+
52+
'CREATE (x:Performance {received_time: apoc.date.currentTimestamp()}) SET x+=props RETURN count(x)'] \
53+
54+
,'RETURN 0', \
55+
56+
{props: payload.after.properties}) \
57+
58+
YIELD value RETURN count(value)
59+
```
60+
61+
62+
63+
## Run the tests
64+
65+
The tester is written in Python. Check your system for:
66+
67+
- Python 3.6
68+
- py2neo
69+
- matplotlib
70+
71+
### config.ini
72+
73+
To run the test you need to edit the *config.ini* file in order to setup the connection data for the *producer* and *consumer* neo4j instances.
74+
75+
The *unit* part is used as default values to run the tests.
76+
77+
- *repeat* is the number of times each test is executed
78+
- *node* is the number of nodes created for each test
79+
80+
```ini
81+
[producer]
82+
url = bolt://localhost:8687
83+
username = neo4j
84+
password = producer
85+
86+
[consumer]
87+
url = bolt://localhost:7687
88+
username = neo4j
89+
password = consumer
90+
91+
[unit]
92+
repeat = 5
93+
nodes = 1000
94+
```
95+
96+
97+
98+
### start
99+
100+
To check the coniguration or to get a fast result you can run the command
101+
102+
```shell
103+
python3.6 neo4j-streams-pt.py --start
104+
```
105+
106+
The output is a windows that show you the distribution of the test. See result session to better understand the values. You can use the option `--plot-out file.png` to not show the result but save it on file. The details of the execution are dumped on standard output as CSV or you can redirect it on file using the option `--csv-file file.csv`
107+
108+
### baseline
109+
110+
This is the main command you have to test the system. By command line arguments you can specify the series to test. The arguments are the number of node's unit to use.
111+
112+
ES: 3 = 3 * 1000 if 1000 is the value configured as *nodes* in the *[unit]* part.
113+
114+
To get the same result of *start*:
115+
116+
```shell
117+
python3.6 neo4j-streams-pt.py --baseline 1
118+
```
119+
120+
The get more distribution (real case):
121+
122+
```shell
123+
python3.6 neo4j-streams-pt.py --baseline 1 10 100 1000 --plot-out results.png
124+
```
125+
126+
This means the tester execute 5 (if *repeat = 5*) times each series. A series is comped by 1k, 10k, 100k, 1000k nodes (if *nodes = 1000*)
127+
128+
### Results
129+
130+
There're lot of variables to tuning the whole process and lot of scenarios to optimize (such as high or low volumes of events per transaction). We introduce here two cases, both on a local machine, one with docker and one without. The computer is a MacBook Pro with
131+
132+
- CPU 2,2 GHz Intel Core i7
133+
- 16 GB DDR3, 1600 MHz
134+
- SSD
135+
136+
### With Docker
137+
138+
Here's the results with the setup as described above with
139+
140+
- Docker Community Edition 18.06.1-ce-mac73
141+
- docker-compose.yml
142+
143+
![Result on standalone mac](https://github.com/neo4j-contrib/neo4j-streams/blob/master/performance/docker.png)
144+
145+
| Nodes | Executions | Min | Max | Avg | Median | St. Dev |
146+
| ------- | ---------- | -------- | -------- | -------- | -------- | ------------------- |
147+
| 1000 | 5 | 0.325 | 2.351 | 0.8526 | 0.405 | 0.8568589732272167 |
148+
| 10000 | 5 | 0.1352 | 0.2704 | 0.1824 | 0.1521 | 0.06006492320814203 |
149+
| 100000 | 5 | 0.14022 | 0.25575 | 0.200146 | 0.21773 | 0.05124010665484605 |
150+
| 1000000 | 5 | 0.162903 | 0.208513 | 0.185129 | 0.184043 | 0.01629424984465379 |
151+
152+
As you can see, there's not a single number to describe the scenario, but 0.20 ms per node (for the simple CREATE query) colud be a reference time.
153+
154+
### Without Docker
155+
156+
Here's the results with the setup as described above with
157+
158+
- neo4j producer, neo4j consumer, Kafka and Zookeeper run on the same machine, the tester too. The clock is the same, so it's synchronized.
159+
- Kafka and Zookeeper run with default configuration
160+
161+
![Result on standalone mac](https://github.com/neo4j-contrib/neo4j-streams/blob/master/performance/local.png)
162+
163+
| Nodes | Executions | Min | Max | Avg | Median | St. Dev |
164+
| ------- | ---------- | -------- | -------- | -------- | -------- | --------------------- |
165+
| 1000 | 5 | 0.202 | 0.262 | 0.2274 | 0.227 | 0.025510782034269354 |
166+
| 10000 | 5 | 0.1209 | 0.1821 | 0.14404 | 0.1332 | 0.026679261608972618 |
167+
| 100000 | 5 | 0.10559 | 0.12636 | 0.115132 | 0.1099 | 0.01016512518368564 |
168+
| 1000000 | 5 | 0.113919 | 0.128142 | 0.119806 | 0.118307 | 0.0054176914825412505 |
169+
170+
As you can see, there's not a single number to describe the scenario, but 0.13 ms per node (for the simple CREATE query) colud be a reference time.
171+
172+
173+

performance/config.ini

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[producer]
2+
url = bolt://localhost:8687
3+
username = neo4j
4+
password = producer
5+
6+
[consumer]
7+
url = bolt://localhost:7687
8+
username = neo4j
9+
password = consumer
10+
11+
[unit]
12+
repeat = 5
13+
nodes = 1000

performance/docker-compose.yml

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
---
2+
version: '2'
3+
services:
4+
neo4j-producer:
5+
image: neo4j:3.4
6+
hostname: neo4j-producer
7+
container_name: neo4j-producer
8+
depends_on:
9+
- zookeeper
10+
- broker
11+
ports:
12+
- "8474:7474"
13+
- "8687:7687"
14+
volumes:
15+
- $HOME/app/neo4-streams/neo4j-community-3.4.7-producer/data:/data
16+
- $HOME/app/neo4-streams/neo4j-community-3.4.7-producer/plugins:/plugins
17+
environment:
18+
NEO4J_kafka_zookeeper_connect: zookeeper:2181
19+
NEO4J_kafka_bootstrap_servers: broker:9092
20+
NEO4J_AUTH: neo4j/producer
21+
NEO4J_dbms_memory_heap_max__size: 8G
22+
# NEO4J_dbms_logs_debug_level: DEBUG
23+
NEO4J_kafka_batch_size: 16384
24+
25+
neo4j-consumer:
26+
image: neo4j:3.4
27+
hostname: neo4j-consumer
28+
container_name: neo4j-consumer
29+
depends_on:
30+
- neo4j-producer
31+
ports:
32+
- "7474:7474"
33+
- "7687:7687"
34+
volumes:
35+
- $HOME/app/neo4-streams/neo4j-community-3.4.7-consumer/data:/data
36+
- $HOME/app/neo4-streams/neo4j-community-3.4.7-consumer/plugins:/plugins
37+
environment:
38+
NEO4J_kafka_zookeeper_connect: zookeeper:2181
39+
NEO4J_kafka_bootstrap_servers: broker:9092
40+
NEO4J_AUTH: neo4j/consumer
41+
NEO4J_dbms_memory_heap_max__size: 2G
42+
NEO4J_kafka_max_poll_records: 16384
43+
NEO4J_streams_sink_topic_cypher_neo4j: "WITH event.value.payload AS payload, event.value.meta AS meta CALL apoc.do.case( [
44+
payload.type = 'node' AND meta.operation = 'created', \
45+
'CREATE (x:Performance {received_time: apoc.date.currentTimestamp()}) SET x+=props RETURN count(x)']
46+
,'RETURN 0',
47+
{props: payload.after.properties})
48+
YIELD value RETURN count(value)"
49+
# NEO4J_dbms_logs_debug_level: DEBUG
50+
51+
zookeeper:
52+
image: confluentinc/cp-zookeeper:5.0.0
53+
hostname: zookeeper
54+
container_name: zookeeper
55+
ports:
56+
- "2181:2181"
57+
environment:
58+
ZOOKEEPER_CLIENT_PORT: 2181
59+
ZOOKEEPER_TICK_TIME: 2000
60+
61+
broker:
62+
image: confluentinc/cp-enterprise-kafka:5.0.0
63+
hostname: broker
64+
container_name: broker
65+
depends_on:
66+
- zookeeper
67+
ports:
68+
- "9092:9092"
69+
- "29092:29092"
70+
environment:
71+
KAFKA_BROKER_ID: 1
72+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
73+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
74+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
75+
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
76+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
77+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
78+
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9092
79+
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
80+
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
81+
CONFLUENT_METRICS_ENABLE: 'true'
82+
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
83+
#volumes:
84+
# mi2: {}

performance/docker.csv

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Nodes,avg,min,max,median,stdev,0,1,2,3,4
2+
1000,0.8526,0.325,2.351,0.405,0.8568589732272167,2.351,0.786,0.405,0.396,0.325
3+
10000,0.1824,0.1352,0.2704,0.1521,0.06006492320814203,0.2704,0.2189,0.1521,0.1354,0.1352
4+
100000,0.200146,0.14022,0.25575,0.21773,0.05124010665484605,0.25575,0.21773,0.15215,0.14022,0.23488
5+
1000000,0.18512900000000002,0.162903,0.208513,0.184043,0.01629424984465379,0.181929,0.162903,0.184043,0.188257,0.208513

performance/docker.png

15.9 KB
Loading

performance/local.csv

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Nodes,avg,min,max,median,stdev,0,1,2,3,4
2+
1000,0.2274,0.202,0.262,0.227,0.025510782034269354,0.242,0.227,0.262,0.204,0.202
3+
10000,0.14404,0.1209,0.1821,0.1332,0.026679261608972618,0.1821,0.1332,0.1228,0.1612,0.1209
4+
100000,0.115132,0.10559,0.12636,0.1099,0.01016512518368564,0.12592,0.12636,0.10789,0.10559,0.1099
5+
1000000,0.119806,0.113919,0.128142,0.118307,0.0054176914825412505,0.113919,0.117033,0.118307,0.121629,0.128142

performance/local.png

20.1 KB
Loading

0 commit comments

Comments
 (0)