|
| 1 | + |
| 2 | +[#neo4j_confluent] |
| 3 | +== Examples with Confluent Platform and Kafka Connect Datagen |
| 4 | + |
| 5 | +=== Confluent and Neo4j in binary format |
| 6 | + |
| 7 | +In this example Neo4j and Confluent will be downloaded in binary format and Neo4j Streams plugin |
| 8 | +will be set up in SINK mode. |
| 9 | +The data consumed by Neo4j will be genereated by the *Kafka Connect Datagen*. Please note the this connector |
| 10 | +should be used just for test purposes and is not suitable for production scenarios. |
| 11 | + |
| 12 | +==== Download and Install Confluent Platform |
| 13 | + |
| 14 | +* Download link:https://www.confluent.io/download/[Confluent Platform] and then choose the desired format `.tar.gz` or `.zip`. |
| 15 | +* Decompress the file in your desired folder |
| 16 | +* Add the install location of the Confluent `bin` directory to your PATH environment variable. |
| 17 | + |
| 18 | +[source, bash] |
| 19 | +---- |
| 20 | +export PATH=<CONFLUENT_HOME_DIR>/bin:$PATH |
| 21 | +---- |
| 22 | + |
| 23 | +* Run Confluent Platform using the following command: |
| 24 | + |
| 25 | +[source, bash] |
| 26 | +---- |
| 27 | +confluent local start |
| 28 | +---- |
| 29 | + |
| 30 | +the output should be something like this: |
| 31 | + |
| 32 | +[source, bash] |
| 33 | +---- |
| 34 | +Starting zookeeper |
| 35 | +zookeeper is [UP] |
| 36 | +Starting kafka |
| 37 | +kafka is [UP] |
| 38 | +Starting schema-registry |
| 39 | +schema-registry is [UP] |
| 40 | +Starting kafka-rest |
| 41 | +kafka-rest is [UP] |
| 42 | +Starting connect |
| 43 | +connect is [UP] |
| 44 | +Starting ksql-server |
| 45 | +ksql-server is [UP] |
| 46 | +Starting control-center |
| 47 | +control-center is [UP] |
| 48 | +---- |
| 49 | + |
| 50 | +==== Download Neo4j |
| 51 | + |
| 52 | +* Download the latest version of Neo4j at the following link https://neo4j.com/download-center/ |
| 53 | + |
| 54 | +* Decompress it in your desired folder |
| 55 | + |
| 56 | +* Install Neo4j Streams plugin by copying the jar in the plugins folder |
| 57 | + |
| 58 | +* Add the following properties to `neo4j.conf` in order to enable Sink functionality |
| 59 | + |
| 60 | +.neo4j.conf |
| 61 | +[source, properties] |
| 62 | +---- |
| 63 | +kafka.zookeeper.connect=localhost:2181 |
| 64 | +kafka.bootstrap.servers=localhost:9092 |
| 65 | +kafka.auto.offset.reset=earliest |
| 66 | +kafka.group.id=neo4j |
| 67 | +kafka.enable.auto.commit=true |
| 68 | +kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer |
| 69 | +kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer |
| 70 | +
|
| 71 | +#******************************************************************** |
| 72 | +# Kafka Consumer |
| 73 | +#******************************************************************** |
| 74 | +streams.sink.enabled=true |
| 75 | +streams.sink.topic.cypher.pageviews=MERGE (n:User {id: event.payload.userid}) MERGE (p:PageView { id: event.payload.pageid }) MERGE (n)-[:VIEWED]->(p) |
| 76 | +---- |
| 77 | + |
| 78 | +Configure deserializer accordingly to the choosen data format: |
| 79 | + |
| 80 | +* _org.apache.kafka.common.serialization.ByteArrayDeserializer_ in case of *JSON* format |
| 81 | +* _io.confluent.kafka.serializers.KafkaAvroDeserializer_ in case of *AVRO* format |
| 82 | + |
| 83 | +If AVRO then a schema registry configuration is also needed: |
| 84 | + |
| 85 | +[source, properties] |
| 86 | +---- |
| 87 | +kafka.schema.registry.url=localhost:8081 |
| 88 | +---- |
| 89 | + |
| 90 | +where 8081 is the default port for the Confluent Schema Registry. |
| 91 | + |
| 92 | +[NOTE] |
| 93 | +If you started Neo4j before adding above properties, you need also to restart Neo4j server. |
| 94 | + |
| 95 | +==== Install Kafka Connect Datagen |
| 96 | + |
| 97 | +Install the link:https://www.confluent.io/hub/confluentinc/kafka-connect-datagen[Kafka Connect Datagen] using the Confluent Hub client. |
| 98 | + |
| 99 | +[source,bash] |
| 100 | +---- |
| 101 | +<CONFLUENT_HOME_DIR>/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest |
| 102 | +---- |
| 103 | + |
| 104 | +the output should be something like this: |
| 105 | + |
| 106 | +[source, bash] |
| 107 | +---- |
| 108 | +Running in a "--no-prompt" mode |
| 109 | +Implicit acceptance of the license below: |
| 110 | +Apache License 2.0 |
| 111 | +https://www.apache.org/licenses/LICENSE-2.0 |
| 112 | +Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /Applications/Development/confluent-5.3.1/share/confluent-hub-components |
| 113 | +... |
| 114 | +Completed |
| 115 | +---- |
| 116 | + |
| 117 | +[[view_results_example]] |
| 118 | +==== View results |
| 119 | + |
| 120 | +Now you can access to the Confluent Control Center at http://localhost:9021, you can create Kafka topics and generate some sample data. |
| 121 | +Follow link:https://docs.confluent.io/current/quickstart/ce-quickstart.html#step-2-create-ak-topics[step 2] and |
| 122 | +link:https://docs.confluent.io/current/quickstart/ce-quickstart.html#step-3-install-a-ak-connector-and-generate-sample-data[step 3] |
| 123 | +of the official link:https://docs.confluent.io/current/quickstart/ce-quickstart.html[Confluent documentation] |
| 124 | + |
| 125 | +When configuring the data generator connectors specify also the `Value converter class` property with the following value: |
| 126 | + |
| 127 | +[source, properties] |
| 128 | +---- |
| 129 | +org.apache.kafka.connect.json.JsonConverter |
| 130 | +---- |
| 131 | + |
| 132 | +Accessing the Neo4j Browser at http://localhost:7474 you can see that Kafka messages generated by the *Kafka Connect Datagen* were |
| 133 | +consumed and converted to nodes and relationship accordingly to the cypher specified in the property `streams.sink.topic.cypher.pageviews`. |
| 134 | +Just execute the following cypher query: |
| 135 | + |
| 136 | +[source, cypher] |
| 137 | +---- |
| 138 | +MATCH p=()-->() RETURN p LIMIT 25 |
| 139 | +---- |
| 140 | + |
| 141 | +The output should be something like: |
| 142 | + |
| 143 | +image::../../images/sink_ouput.png[title="SINK Output", align="center"] |
| 144 | + |
| 145 | +[#confluent_docker_example] |
| 146 | +=== Confluent with Docker, Neo4j in binary format |
| 147 | + |
| 148 | +In this example Neo4j will be installed locally and Confluent Platform will be in a Docker environment |
| 149 | + |
| 150 | +==== Neo4j |
| 151 | + |
| 152 | +Neo4j is installed and configure in the same way as above example |
| 153 | + |
| 154 | +==== Confluent with Docker |
| 155 | + |
| 156 | +In order to have a ready to use Confluent Platform with Docker, please use the following docker-compose file (**please note |
| 157 | +that in the configuration of the `connect` service you have to substitute the `<version>` of kafka-connect-plugin you're going to install**): |
| 158 | + |
| 159 | +.docker-compose.yml |
| 160 | +[source,yaml] |
| 161 | +---- |
| 162 | +version: '2' |
| 163 | +services: |
| 164 | +
|
| 165 | + zookeeper: |
| 166 | + image: confluentinc/cp-zookeeper |
| 167 | + hostname: zookeeper |
| 168 | + container_name: zookeeper |
| 169 | + ports: |
| 170 | + - "2181:2181" |
| 171 | + environment: |
| 172 | + ZOOKEEPER_CLIENT_PORT: 2181 |
| 173 | + ZOOKEEPER_TICK_TIME: 2000 |
| 174 | +
|
| 175 | + broker: |
| 176 | + image: confluentinc/cp-enterprise-kafka |
| 177 | + hostname: broker |
| 178 | + container_name: broker |
| 179 | + depends_on: |
| 180 | + - zookeeper |
| 181 | + ports: |
| 182 | + - "9092:9092" |
| 183 | + expose: |
| 184 | + - "9093" |
| 185 | + environment: |
| 186 | + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092 |
| 187 | + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT |
| 188 | + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092 |
| 189 | + CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093 |
| 190 | +
|
| 191 | + # workaround if we change to a custom name the schema_registry fails to start |
| 192 | + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT |
| 193 | +
|
| 194 | + KAFKA_BROKER_ID: 1 |
| 195 | + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' |
| 196 | + KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter |
| 197 | + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 |
| 198 | + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 |
| 199 | + CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181 |
| 200 | + CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 |
| 201 | + CONFLUENT_METRICS_ENABLE: 'true' |
| 202 | + CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous' |
| 203 | +
|
| 204 | + schema_registry: |
| 205 | + image: confluentinc/cp-schema-registry |
| 206 | + hostname: schema_registry |
| 207 | + container_name: schema_registry |
| 208 | + depends_on: |
| 209 | + - zookeeper |
| 210 | + - broker |
| 211 | + ports: |
| 212 | + - "8081:8081" |
| 213 | + environment: |
| 214 | + SCHEMA_REGISTRY_HOST_NAME: schema_registry |
| 215 | + SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181' |
| 216 | +
|
| 217 | + connect: |
| 218 | + image: confluentinc/kafka-connect-datagen:latest |
| 219 | + hostname: connect |
| 220 | + container_name: connect |
| 221 | + depends_on: |
| 222 | + - zookeeper |
| 223 | + - broker |
| 224 | + - schema_registry |
| 225 | + ports: |
| 226 | + - "8083:8083" |
| 227 | + environment: |
| 228 | + CONNECT_BOOTSTRAP_SERVERS: 'broker:9093' |
| 229 | + CONNECT_REST_ADVERTISED_HOST_NAME: connect |
| 230 | + CONNECT_REST_PORT: 8083 |
| 231 | + CONNECT_GROUP_ID: compose-connect-group |
| 232 | + CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs |
| 233 | + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 |
| 234 | + CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 |
| 235 | + CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets |
| 236 | + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 |
| 237 | + CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status |
| 238 | + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 |
| 239 | + CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter |
| 240 | + CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter |
| 241 | + CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter |
| 242 | + CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter |
| 243 | + CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181' |
| 244 | + CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components |
| 245 | + CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR |
| 246 | + command: |
| 247 | + - bash |
| 248 | + - -c |
| 249 | + - | |
| 250 | + confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:<version> && \ |
| 251 | + confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest |
| 252 | + /etc/confluent/docker/run |
| 253 | +
|
| 254 | + control-center: |
| 255 | + image: confluentinc/cp-enterprise-control-center |
| 256 | + hostname: control-center |
| 257 | + container_name: control-center |
| 258 | + depends_on: |
| 259 | + - zookeeper |
| 260 | + - broker |
| 261 | + - schema_registry |
| 262 | + - connect |
| 263 | + ports: |
| 264 | + - "9021:9021" |
| 265 | + environment: |
| 266 | + CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093' |
| 267 | + CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181' |
| 268 | + CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083' |
| 269 | + CONTROL_CENTER_REPLICATION_FACTOR: 1 |
| 270 | + CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 |
| 271 | + CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 |
| 272 | + CONFLUENT_METRICS_TOPIC_REPLICATION: 1 |
| 273 | + PORT: 9021 |
| 274 | +
|
| 275 | +---- |
| 276 | + |
| 277 | +[NOTE] |
| 278 | +==== |
| 279 | +You must allocate a minimum of 8 GB of Docker memory resource in order to avoid *Exit Code 137 (Out Of Memory Error)* on the connect container |
| 280 | +
|
| 281 | +image::../../images/docker_memory_setting.png[align="center"] |
| 282 | +==== |
| 283 | + |
| 284 | +To see the results follow the instruction explained in above <<view_results_example, View results>> section. |
0 commit comments